通过对SpringBatch基础概念的了解,参考:SpringBatch使用介绍
任何技术用起来之后,再去探究内部细节的原理,才会事半功倍。下面记录一下笔者在SpringBoot项目中集成SpringBatch,并且通过一个小的实例展示如何简单使用它进行批处理。
1、依赖与配置
pom依赖
需要引入以下依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
我这里用的SpringBoot版本是2.7.18,引入spring-boot-starter-batch时不需要再指定版本号,他会自动匹配对应版本,拉去maven依赖。
基础配置
需要新建配置类SpringBatchConfig,用于配置基础的JobRepository 、JobLauncher、JobBuilderFactory 、StepBuilderFactory 等 ,虽然官方文档说可以自动配置,但实际在项目中自定义配置可控性更强一些。
@Configuration
@EnableBatchProcessing // 开启SpringBatch
public class SpringBatchConfig {/*** 创建JobRepository. 用于存储SpringBatch job运行的元信息** @param dataSource 数据源,这里自动注入的是MySQL的数据源* @param transactionManager 事务管理器* @return JobRepository* @throws Exception 异常*/@Beanpublic JobRepository createJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();// 支持多种数据源类型,如:MySQL、H2、DB2、Oracle等jobRepositoryFactoryBean.setDatabaseType("MySQL");jobRepositoryFactoryBean.setTransactionManager(transactionManager);jobRepositoryFactoryBean.setDataSource(dataSource);return jobRepositoryFactoryBean.getObject();}/*** 创建JobLauncher用于启动Job** @return JobLauncher*/@Beanpublic SimpleJobLauncher createJobLauncher(JobRepository jobRepository) {SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(jobRepository);return jobLauncher;}/*** 创建JobBuilderFactory,用于创建Job** @param jobRepository JobRepository* @return JobBuilderFactory*/@Bean@Primarypublic JobBuilderFactory createJobBuilderFactory(JobRepository jobRepository) {return new JobBuilderFactory(jobRepository) {@Overridepublic JobBuilder get(String name) {return super.get(name).listener(createTimeWatchJobListener());}};}/*** 创建Job种的StepBuilderFactory,用于创建Step** @param jobRepository JobRepository* @param transactionManager 事务管理器* @return StepBuilderFactory*/@Bean@Primarypublic StepBuilderFactory createStepBuilderFactory(JobRepository jobRepository, PlatformTransactionManager transactionManager) {return new StepBuilderFactory(jobRepository, transactionManager);}/*** 注册job监听器用于监听Job执行时间** @return TimeWatchJobListener*/@Beanpublic TimeWatchJobListener createTimeWatchJobListener() {return new TimeWatchJobListener();}
}
TimeWatchJobListener是一个自定义的Job运行监听器,这里只是监听JOB运行的时间,提示开始和结束
@Slf4j
public class TimeWatchJobListener implements JobExecutionListener {private static final Map<Long, StopWatch> STOP_WATCH_MAP = new ConcurrentHashMap<>();@Overridepublic void beforeJob(JobExecution jobExecution) {StopWatch stopWatch = new StopWatch();stopWatch.start();STOP_WATCH_MAP.put(jobExecution.getJobId(), stopWatch);log.info("job start, jobId={}", jobExecution.getJobId());}@Overridepublic void afterJob(JobExecution jobExecution) {StopWatch stopWatch = STOP_WATCH_MAP.get(jobExecution.getJobId());if (Objects.nonNull(stopWatch)) {stopWatch.stop();double seconds = stopWatch.getTotalTimeSeconds();log.info("job end, time cost={}s for jobId={}", seconds, jobExecution.getJobId());STOP_WATCH_MAP.remove(jobExecution.getJobId());}log.info("job end, jobId={}", jobExecution.getJobId());}
}
配置文件中主要配置启用springbatch和自动建框架表
spring:application:name: springbatch-learning# 数据源datasource:name: local-dstype: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=GMT%2B8&useSSL=falseusername: xxxxpassword: xxxxdruid: #druid相关配置filters: stat #监控统计拦截的filtersinitial-size: 1 #配置初始化大小/最小/最大min-idle: 1max-active: 20max-wait: 60000 #获取连接等待超时时间time-between-eviction-runs-millis: 60000 #间隔多久进行一次检测,检测需要关闭的空闲连接min-evictable-idle-time-millis: 300000 #一个连接在池中最小生存的时间validation-query: SELECT 'x'test-while-idle: truetest-on-borrow: falsetest-on-return: falsepool-prepared-statements: false #打开PSCache,并指定每个连接上PSCache的大小。oracle设为true,mysql设为false。分库分表较多推荐设置为falsemax-pool-prepared-statement-per-connection-size: 20batch:jdbc:initialize-schema: always # 自动创建Spring Batch的表结构job:enabled: false
上文中配置了JobRepository数据持久化为MySQL数据库,并且配置了属性spring.batch.jdbc.initialize-schema=always,则首次启动应用会在数据库新建SpringBatch的框架表,表之间的ER图如下:
2、入门实例
业务背景
用户下单之后,每天定时从订单表中获取待发货数据,批处理之后生成发货信息,并且更新订单表的状态。每天数据量50W
实践
定义数据读取器DataReader
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;import java.util.Iterator;
import java.util.List;@Slf4j
public class DataReader implements ItemReader<OrderPo> {// 游标记录上个批次读取的数据private long lastRowId = 0;// 单次数据库读取的数据量private final int batchSize;// 数据缓存迭代器private Iterator<OrderPo> cacheIterator;public DataReader(int batchSize) {this.batchSize = batchSize;}@Overridepublic OrderPo read() throws Exception {if (cacheIterator == null || !cacheIterator.hasNext()) {// 使用游标方式批量查询数据库OrderMapper orderMapper = AppContextUtil.getBean(OrderMapper.class);QueryWrapper<OrderPo> cond = new QueryWrapper<>();cond.eq("status", 3);cond.gt("row_id", lastRowId);cond.last("limit " + batchSize);List<OrderPo> batchList = orderMapper.selectList(cond);if (batchList == null || batchList.isEmpty()) {return null; // 读取结束}// 更新lastRowId为当前批次最后一条的rowIdlastRowId = Long.parseLong(batchList.get(batchList.size() - 1).getRowId());log.info("lastRowId={},read data size={}", lastRowId, batchList.size());cacheIterator = batchList.iterator();}// 迭代器返回一条数据return cacheIterator.next();}
}
定义数据处理器,用于构造发货数据
import org.springframework.batch.item.ItemProcessor;public class DataProcessor implements ItemProcessor<OrderPo, OrderDelivery> {@Overridepublic OrderDelivery process(OrderPo item) throws Exception {// 处理itemitem.setStatus(4);DeliveryPo deliveryPo = new DeliveryPo();deliveryPo.setDeliveryId(SnowflakeIdGenerator.generatedIdStr());deliveryPo.setStatus(1);deliveryPo.setReceiveInfo("收件人:张三");deliveryPo.setSendInfo("发件人:李四");deliveryPo.fillCreateInfo();item.setDeliveryId(deliveryPo.getDeliveryId());return OrderDelivery.builder().orderPo(item).deliveryPo(deliveryPo).build();}
}
定义数据写入器
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;import java.util.List;
import java.util.stream.Collectors;@Slf4j
public class DataWriter implements ItemWriter<OrderDelivery> {@Overridepublic void write(List<? extends OrderDelivery> items) throws Exception {OrderMapper orderMapper = AppContextUtil.getBean(OrderMapper.class);DeliveryMapper deliveryMapper = AppContextUtil.getBean(DeliveryMapper.class);List<OrderPo> orderPoList = items.stream().map(OrderDelivery::getOrderPo).collect(Collectors.toList());List<DeliveryPo> deliveryPoList = items.stream().map(OrderDelivery::getDeliveryPo).collect(Collectors.toList());deliveryMapper.insertBatchSomeColumn(deliveryPoList);orderPoList.forEach(orderMapper::updateById);log.info("write deliveryPoList.size={},orderPoList.size={}", deliveryPoList.size(), orderPoList.size());}
}
创建任务OrderJob ,主要用于构建Job和对应的Step,在Step中对上述定义的读取、处理、写入进行组合
@Component
public class OrderJob {@Autowiredprivate JobBuilderFactory jobBuilderFactory; // 用于创建Job@Autowiredprivate StepBuilderFactory stepBuilderFactory; // 用于创建Steppublic Job createJob() {return jobBuilderFactory.get("order-delivery-"+ SnowflakeIdGenerator.generatedIdStr()) // 定义作业名称.start(orderDeliveryStep()) // 定义作业的起始步骤.build();}public Step orderDeliveryStep() {return stepBuilderFactory.get("orderDeliveryStep").<OrderPo, OrderDelivery>chunk(2000).reader(new DataReader(500)).processor(new DataProcessor()).writer(new DataWriter()).build();}
}
构建测试用例
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
@Slf4j
class JobCreatorTest {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate OrderJob orderJob;@Testpublic void runJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException, InterruptedException {Job job = orderJob.createJob();jobLauncher.run(job,new JobParameters());Thread.sleep(30*1000);}
}
补充mybatis-plus相关类
新建表
drop table if exists order_t;
CREATE TABLE `order_t`
(`row_id` varchar(32) NOT NULL COMMENT 'row_id主键',`order_no` varchar(64) DEFAULT NULL COMMENT '订单编号',`trade_date` date NOT NULL COMMENT '交易日期',`pay_money` decimal(16, 3) NOT NULL COMMENT '支付金额',`status` tinyint(4) DEFAULT '1' COMMENT '状态,1:已提交,2:已付款,3:待发货,4:已发货,5:已收货,6:已完成',`delivery_id` varchar(32) DEFAULT NULL COMMENT '物流ID',`create_by` varchar(64) DEFAULT NULL COMMENT '创建人',`creation_date` datetime DEFAULT NULL COMMENT '创建时间',`last_update_by` varchar(64) DEFAULT NULL COMMENT '修改人',`last_update_date` datetime DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`row_id`),KEY `idx_orderDate` (`trade_date`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8 COMMENT ='订单表';drop table if exists delivery_t;
CREATE TABLE `delivery_t`
(`delivery_id` varchar(32) NOT NULL COMMENT 'delivery_id主键',`receive_info` varchar(512) DEFAULT NULL COMMENT '收货信息',`send_info` varchar(512) DEFAULT NULL COMMENT '发货信息',`status` tinyint(4) DEFAULT '1' COMMENT '状态,1:揽件中,2:已发货,3:运输中,4:已收货,5:拒收',`create_by` varchar(64) DEFAULT NULL COMMENT '创建人',`creation_date` datetime DEFAULT NULL COMMENT '创建时间',`last_update_by` varchar(64) DEFAULT NULL COMMENT '修改人',`last_update_date` datetime DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`delivery_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8 COMMENT ='发货表';
订单PO
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("order_t")
public class OrderPo extends BasePo{private static final long serialVersionUID = 6895153403831336415L;@TableId("row_id")private String rowId;@TableField("order_no")private String orderNo;@TableField("trade_date")private Date tradeDate;@TableField("pay_money")private BigDecimal payMoney;/*** '状态,1:已提交,2:已付款,3:待发货,4:已发货,5:已收货,6:已完成'*/@TableField("status")private int status;@TableField("delivery_id")private String deliveryId;
}
发货PO
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("delivery_t")
public class DeliveryPo extends BasePo{private static final long serialVersionUID = -3711161041800162057L;@TableId("delivery_id")private String deliveryId;@TableField("receive_info")private String receiveInfo;@TableField("send_info")private String sendInfo;/*** 状态,1:揽件中,2:已发货,3:运输中,4:已收货,5:拒收*/@TableField("status")private int status;
}
BasePo
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BasePo implements Serializable {private static final long serialVersionUID = -1914144713260446591L;@TableField("create_by")private String createBy;@TableField("creation_date")private Date creationDate;@TableField("last_update_by")private String lastUpdateBy;@TableField("last_update_date")private Date lastUpdateDate;public void fillCreateInfo() {this.createBy="system";this.creationDate = new Date();this.lastUpdateBy = "system";this.lastUpdateDate = new Date();}public void fillUpdateInfo() {this.lastUpdateBy = "system";this.lastUpdateDate = new Date();}
}
mapper相关
public interface OrderMapper extends CommonMapper<OrderPo> {
}public interface DeliveryMapper extends CommonMapper<DeliveryPo> {
}public interface CommonMapper<T> extends BaseMapper<T> {Integer insertBatchSomeColumn(List<T> list);
}
批量插入mybatis-plus配置
@Configuration
public class MyBatisPlusConfig {@Beanpublic DefaultSqlInjector insertBatchSqlInject() {return new DefaultSqlInjector() {@Overridepublic List<AbstractMethod> getMethodList(org.apache.ibatis.session.Configuration configuration, Class<?> mapperClass, TableInfo tableInfo) {List<AbstractMethod> methodList = super.getMethodList(configuration,mapperClass, tableInfo);methodList.add(new InsertBatchSomeColumn());return methodList;}};}
}