通过例子讲解Spring Batch入门,优秀的批处理框架
source link: https://segmentfault.com/a/1190000024439130
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
1 前言
欢迎访问 南瓜慢说 www.pkslow.com 获取更多精彩文章!
Spring相关文章:
Springboot-Cloud相关
Spring Batch
是一个轻量级的、完善的批处理框架,作为 Spring
体系中的一员,它拥有灵活、方便、生产可用的特点。在应对高效处理大量信息、定时处理大量数据等场景十分简便。
结合调度框架能更大地发挥 Spring Batch
的作用。
2 Spring Batch的概念知识
2.1 分层架构
Spring Batch
的分层架构图如下:
可以看到它分为三层,分别是:
-
Application
应用层:包含了所有任务batch jobs
和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。 -
Batch Core
核心层:包含启动和管理任务的运行环境类,如JobLauncher
等。 -
Batch Infrastructure
基础层:上面两层是建立在基础层之上的,包含基础的读入reader
和写出writer
、重试框架等。
2.2 关键概念
理解下图所涉及的概念至关重要,不然很难进行后续开发和问题分析。
2.2.1 JobRepository
专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以 Spring Batch
是需要依赖数据库来管理的。
2.2.2 任务启动器JobLauncher
负责启动任务 Job
。
2.2.3 任务Job
Job
是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个 Job
所定义的内容。
上图介绍了 Job
的一些相关概念:
-
Job
:封装处理实体,定义过程逻辑。 -
JobInstance
:Job
的运行实例,不同的实例,参数不同,所以定义好一个Job
后可以通过不同参数运行多次。 -
JobParameters
:与JobInstance
相关联的参数。 -
JobExecution
:代表Job
的一次实际执行,可能成功、可能失败。
所以,开发人员要做的事情,就是定义 Job
。
2.2.4 步骤Step
Step
是对 Job
某个过程的封装,一个 Job
可以包含一个或多个 Step
,一步步的 Step
按特定逻辑执行,才代表 Job
执行完成。
通过定义 Step
来组装 Job
可以更灵活地实现复杂的业务逻辑。
2.2.5 输入——处理——输出
所以,定义一个 Job
关键是定义好一个或多个 Step
,然后把它们组装好即可。而定义 Step
有多种方法,但有一种常用的模型就是 输入——处理——输出
,即 Item Reader
、 Item Processor
和 Item Writer
。比如通过 Item Reader
从文件输入数据,然后通过 Item Processor
进行业务处理和数据转换,最后通过 Item Writer
写到数据库中去。
Spring Batch
为我们提供了许多开箱即用的 Reader
和 Writer
,非常方便。
3 代码实例
理解了基本概念后,就直接通过代码来感受一下吧。整个项目的功能是从多个 csv
文件中读数据,处理后输出到一个 csv
文件。
3.1 基本框架
添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加 Spring Batch
的依赖,同时使用 H2
作为内存数据库比较方便,实际生产肯定是要使用外部的数据库,如 Oracle
、 PostgreSQL
。
入口主类:
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很简单,只是在 Springboot
的基础上添加注解 @EnableBatchProcessing
。
领域实体类 Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
对应的 csv
文件内容如下:
id,firstName,lastName 1,Lokesh,Gupta 2,Amit,Mishra 3,Pankaj,Kumar 4,David,Miller
3.2 输入——处理——输出
3.2.1 读取ItemReader
因为有多个输入文件,所以定义如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳过csv文件第一行,为表头 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //转换化后的目标类 setTargetType(Employee.class); } }); } }); return reader; }
这里使用了 FlatFileItemReader
,方便我们从文件读取数据。
3.2.2 处理ItemProcessor
为了简单演示,处理很简单,就是把最后一列转为大写:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
3.2.3 输出ItremWriter
比较简单,代码及注释如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否为追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //设置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //设置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
3.3 Step
有了 Reader-Processor-Writer
后,就可以定义 Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
这里有一个 chunk
的设置,值为 5
,意思是5条记录后再提交输出,可以根据自己需求定义。
3.4 Job
完成了 Step
的编码,定义 Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
3.5 运行
完成以上编码后,执行程序,结果如下:
成功读取数据,并将最后字段转为大写,并输出到 outputData.csv
文件。
4 监听Listener
可以通过 Listener
接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。
我们分别对 Read
、 Process
和 Write
事件进行监听,对应分别要实现 ItemReadListener
接口、 ItemProcessListener
接口和 ItemWriteListener
接口。因为代码比较简单,就是打印一下日志,这里只贴出 ItemWriteListener
的实现代码:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把实现的监听器 listener
整合到 Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
执行后看一下日志:
这里就能明显看到之前设置的 chunk
的作用了。 Writer
每次是处理5条记录,如果一条输出一次,会对 IO
造成压力。
5 总结
Spring Batch
还有许多优秀的特性,如面对大量数据时的并行处理。本文主要入门介绍为主,不一一介绍,后续会专门讲解。
欢迎关注微信公众号< 南瓜慢说 >,将持续为你更新...
多读书,多分享;多写作,多整理。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK