13

通过例子讲解Spring Batch入门,优秀的批处理框架

 3 years ago
source link: https://segmentfault.com/a/1190000024439130
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1 前言

欢迎访问 南瓜慢说 www.pkslow.com 获取更多精彩文章!

Spring相关文章: Springboot-Cloud相关

Spring Batch 是一个轻量级的、完善的批处理框架,作为 Spring 体系中的一员,它拥有灵活、方便、生产可用的特点。在应对高效处理大量信息、定时处理大量数据等场景十分简便。

结合调度框架能更大地发挥 Spring Batch 的作用。

2 Spring Batch的概念知识

2.1 分层架构

Spring Batch 的分层架构图如下:

zARrMr.png!mobile

可以看到它分为三层,分别是:

  • Application 应用层:包含了所有任务 batch jobs 和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core 核心层:包含启动和管理任务的运行环境类,如 JobLauncher 等。
  • Batch Infrastructure 基础层:上面两层是建立在基础层之上的,包含基础的 读入reader写出writer 、重试框架等。

2.2 关键概念

理解下图所涉及的概念至关重要,不然很难进行后续开发和问题分析。

RZF7neb.png!mobile

2.2.1 JobRepository

专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以 Spring Batch 是需要依赖数据库来管理的。

2.2.2 任务启动器JobLauncher

负责启动任务 Job

2.2.3 任务Job

Job 是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个 Job 所定义的内容。

zqmYnaM.png!mobile

上图介绍了 Job 的一些相关概念:

  • Job :封装处理实体,定义过程逻辑。
  • JobInstanceJob 的运行实例,不同的实例,参数不同,所以定义好一个 Job 后可以通过不同参数运行多次。
  • JobParameters :与 JobInstance 相关联的参数。
  • JobExecution :代表 Job 的一次实际执行,可能成功、可能失败。

所以,开发人员要做的事情,就是定义 Job

2.2.4 步骤Step

Step 是对 Job 某个过程的封装,一个 Job 可以包含一个或多个 Step ,一步步的 Step 按特定逻辑执行,才代表 Job 执行完成。

euUzQ3a.png!mobile

通过定义 Step 来组装 Job 可以更灵活地实现复杂的业务逻辑。

2.2.5 输入——处理——输出

所以,定义一个 Job 关键是定义好一个或多个 Step ,然后把它们组装好即可。而定义 Step 有多种方法,但有一种常用的模型就是 输入——处理——输出 ,即 Item ReaderItem ProcessorItem Writer 。比如通过 Item Reader 从文件输入数据,然后通过 Item Processor 进行业务处理和数据转换,最后通过 Item Writer 写到数据库中去。

Spring Batch 为我们提供了许多开箱即用的 ReaderWriter ,非常方便。

3 代码实例

理解了基本概念后,就直接通过代码来感受一下吧。整个项目的功能是从多个 csv 文件中读数据,处理后输出到一个 csv 文件。

3.1 基本框架

添加依赖:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

需要添加 Spring Batch 的依赖,同时使用 H2 作为内存数据库比较方便,实际生产肯定是要使用外部的数据库,如 OraclePostgreSQL

入口主类:

@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {
    public static void main(String[] args) {
        SpringApplication.run(PkslowBatchJobMain.class, args);
    }
}

也很简单,只是在 Springboot 的基础上添加注解 @EnableBatchProcessing

领域实体类 Employee

package com.pkslow.batch.entity;
public class Employee {
    String id;
    String firstName;
    String lastName;
}

对应的 csv 文件内容如下:

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller

3.2 输入——处理——输出

3.2.1 读取ItemReader

因为有多个输入文件,所以定义如下:

@Value("input/inputData*.csv")
private Resource[] inputResources;

@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader()
{
  MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
  resourceItemReader.setResources(inputResources);
  resourceItemReader.setDelegate(reader());
  return resourceItemReader;
}

@Bean
public FlatFileItemReader<Employee> reader()
{
  FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
  //跳过csv文件第一行,为表头
  reader.setLinesToSkip(1);
  reader.setLineMapper(new DefaultLineMapper() {
    {
      setLineTokenizer(new DelimitedLineTokenizer() {
        {
          //字段名
          setNames(new String[] { "id", "firstName", "lastName" });
        }
      });
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
        {
          //转换化后的目标类
          setTargetType(Employee.class);
        }
      });
    }
  });
  return reader;
}

这里使用了 FlatFileItemReader ,方便我们从文件读取数据。

3.2.2 处理ItemProcessor

为了简单演示,处理很简单,就是把最后一列转为大写:

public ItemProcessor<Employee, Employee> itemProcessor() {
  return employee -> {
    employee.setLastName(employee.getLastName().toUpperCase());
    return employee;
  };
}

3.2.3 输出ItremWriter

比较简单,代码及注释如下:

private Resource outputResource = new FileSystemResource("output/outputData.csv");

@Bean
public FlatFileItemWriter<Employee> writer()
{
  FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
  writer.setResource(outputResource);
  //是否为追加模式
  writer.setAppendAllowed(true);
  writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
    {
      //设置分割符
      setDelimiter(",");
      setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
        {
          //设置字段
          setNames(new String[] { "id", "firstName", "lastName" });
        }
      });
    }
  });
  return writer;
}

3.3 Step

有了 Reader-Processor-Writer 后,就可以定义 Step 了:

@Bean
public Step csvStep() {
  return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
    .reader(multiResourceItemReader())
    .processor(itemProcessor())
    .writer(writer())
    .build();
}

这里有一个 chunk 的设置,值为 5 ,意思是5条记录后再提交输出,可以根据自己需求定义。

3.4 Job

完成了 Step 的编码,定义 Job 就容易了:

@Bean
public Job pkslowCsvJob() {
  return jobBuilderFactory
    .get("pkslowCsvJob")
    .incrementer(new RunIdIncrementer())
    .start(csvStep())
    .build();
}

3.5 运行

完成以上编码后,执行程序,结果如下:

AJvAre.png!mobile

成功读取数据,并将最后字段转为大写,并输出到 outputData.csv 文件。

4 监听Listener

可以通过 Listener 接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。

我们分别对 ReadProcessWrite 事件进行监听,对应分别要实现 ItemReadListener 接口、 ItemProcessListener 接口和 ItemWriteListener 接口。因为代码比较简单,就是打印一下日志,这里只贴出 ItemWriteListener 的实现代码:

public class PkslowWriteListener implements ItemWriteListener<Employee> {
    private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
    @Override
    public void beforeWrite(List<? extends Employee> list) {
        logger.info("beforeWrite: " + list);
    }

    @Override
    public void afterWrite(List<? extends Employee> list) {
        logger.info("afterWrite: " + list);
    }

    @Override
    public void onWriteError(Exception e, List<? extends Employee> list) {
        logger.info("onWriteError: " + list);
    }
}

把实现的监听器 listener 整合到 Step 中去:

@Bean
public Step csvStep() {
  return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
    .reader(multiResourceItemReader())
    .listener(new PkslowReadListener())
    .processor(itemProcessor())
    .listener(new PkslowProcessListener())
    .writer(writer())
    .listener(new PkslowWriteListener())
    .build();
}

执行后看一下日志:

ziiQ3yv.png!mobile

这里就能明显看到之前设置的 chunk 的作用了。 Writer 每次是处理5条记录,如果一条输出一次,会对 IO 造成压力。

5 总结

Spring Batch 还有许多优秀的特性,如面对大量数据时的并行处理。本文主要入门介绍为主,不一一介绍,后续会专门讲解。

欢迎关注微信公众号< 南瓜慢说 >,将持续为你更新...

yIbiMf2.png!mobile

多读书,多分享;多写作,多整理。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK