SpringBatch的Step默认使用同步方式批量处理数据,也可以通过配置将读数改为同步,处理和写入改为异步方式。
1、同步处理Step
SpringBatch的Step一般由ItemReader、ItemProcessor和ItemWriter组成,其中ItemProcessor是可选的。他的设计思路的通过ItemReader读取一条数据之后,汇总到inputs中,当达到chunkSize数量时,使用ItemProcessor处理数据,然后使用ItemWriter写入。
这个过程都是同步的操作,不存在异步的过程。实际业务处理过程中,数据一般来源于数据库,如果每次只读取一条数据,效率比较低,可以采用批量读取数据,单条返回的方式提高效率。如:DataReader通过游标批量读取数据
public class DataReader implements ItemReader<InputType> {// 游标记录上个批次读取的数据private long lastRowId = 0;// 单次数据库读取的数据量private int batchSize;// 数据缓存迭代器private Iterator<InputType> cacheIterator;public DataReader(int batchSize) {this.batchSize = batchSize;}@Overridepublic InputType read() throws Exception {if (cacheIterator == null || !cacheIterator.hasNext()) {// 使用游标方式批量查询数据库List<InputType> batchList = ....if (batchList == null || batchList.isEmpty()) {return null; // 读取结束}// 更新lastRowId为当前批次最后一条的rowIdlastRowId = batchList.get(batchList.size() - 1).getRowId();cacheIterator = batchList.iterator();}// 迭代器返回一条数据return cacheIterator.next();}
}
DataProcessor将读取的数据做业务处理,转化为OutputType类型数据传递给ItemWriter
public class DataProcessor implements ItemProcessor<InputType, OutputType> {@Overridepublic OutputType process(InputType item) throws Exception {// 处理item,转换为OutputTypereturn output;}
}
DataWriter中写入OutputType类型数据到数据库
public class DataWriter implements ItemWriter<OutputType> {@Overridepublic void write(List<? extends OutputType> items) throws Exception {// 写入数据都数据库。。。}
}
配置同步Step
return stepBuilderFactory.get("step1").<InputType, OutputType> chunk(100) // 每100条数据为一个批次,执行processor和writer.reader(new DataReader(1000)) // 数据库每次读取1000条.processor(new DataProcessor()).writer(new DataWriter()).build();
2、异步处理Step
在大数据量的批处理系统中,希望尽可能地提高性能,这时可以将ItemProcossor和ItemWriter环节采用异步多线程的方式进行优化,这是需要将ItemProcossor和ItemWriter分别包装为AsyncItemProcessor和AsyncItemWriter,如下方法可以实现包装:
private <I, O> AsyncItemProcessor<I, O> wrapAsyncProcessor(ItemProcessor<I, O> processor,TaskExecutor taskExecutor) {AsyncItemProcessor<I, O> asyncItemProcessor = new AsyncItemProcessor<>();asyncItemProcessor.setDelegate(processor);asyncItemProcessor.setTaskExecutor(taskExecutor);return asyncItemProcessor;
}private <O> AsyncItemWriter<O> wrapAsyncWriter(ItemWriter<O> writer) {AsyncItemWriter<O> asyncItemWriter = new AsyncItemWriter<>();asyncItemWriter.setDelegate(writer);return asyncItemWriter;
}
配置异步Step
private Step step2() {AsyncItemProcessor<PayOrderPo, PayOrderPo> asyncItemProcessor =wrapAsyncProcessor(new DataProcessor(), getAsyncExecutor("TestJobPool"));AsyncItemWriter<PayOrderPo> asyncItemWriter = wrapAsyncWriter(new DataWriter());return stepBuilderFactory.get("step2").<PayOrderPo, Future<PayOrderPo>> chunk(500).reader(new DataReader(1000)).processor(asyncItemProcessor).writer(asyncItemWriter).build();
}
private TaskExecutor getAsyncExecutor(String threadPoolName) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix(threadPoolName + "-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setAllowCoreThreadTimeOut(true);executor.initialize();return executor;
}
AsyncItemProcessor使用了代理模式,内部代理到ItemProcessor进行实际数据处理,通过taskExecutor线程池异步高性能处理数据
public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {// 代理的ItemProcessorprivate ItemProcessor<I, O> delegate;private TaskExecutor taskExecutor = new SyncTaskExecutor();public void afterPropertiesSet() throws Exception {Assert.notNull(delegate, "The delegate must be set.");}@Nullablepublic Future<O> process(final I item) throws Exception {final StepExecution stepExecution = getStepExecution();FutureTask<O> task = new FutureTask<>(new Callable<O>() {public O call() throws Exception {if (stepExecution != null) {StepSynchronizationManager.register(stepExecution);}try {// 代理的processor实际处理数据return delegate.process(item);}finally {if (stepExecution != null) {StepSynchronizationManager.close();}}}});// 提交异步任务taskExecutor.execute(task);return task;}
}
处理的过程如下:
- 创建FutureTask,在其线程中实际调用
process(item)
方法进行数据处理。 - 通过
TaskExecutor
异步执行任务FutureTask - 返回
Future
对象给Writer来跟踪异步结果。
AsyncItemWriter同样使用了代理模式,代理到实际处理数据的ItemWriter,主要通过两个步骤进行:
1、获取processor环境的异步处理结果
2、汇总结果到实际的ItemWriter进行数据写入
public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {// 代理的ItemWriterprivate ItemWriter<T> delegate;public void write(List<? extends Future<T>> items) throws Exception {// 用于保存异步结果List<T> list = new ArrayList<>();// 获取异步结果for (Future<T> future : items) {try {T item = future.get();if(item != null) {list.add(future.get());}}catch (ExecutionException e) {Throwable cause = e.getCause();if(cause != null && cause instanceof Exception) {logger.debug("An exception was thrown while processing an item", e);throw (Exception) cause;}else {throw e;}}}// 代理到实际的ItemWriter进行数据写入delegate.write(list);}
}