16

Spring Batch remote partitioning with AWS SQS

 3 years ago
source link: https://arnoldgalovics.com/spring-batch-remote-partitioning-aws-sqs/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Spring Batch remote partitioning with AWS SQS

On the last article, I explained how to implement a Spring Batch job with remote partitioning using Kafka. This time I’m gonna discuss how to do Spring Batch remote partitioning with AWS SQS as the messaging layer between the manager and workers.

You can find the previous article here: Scaling Spring Batch processing with remote partitioning using Kafka

If you’re interested in the concepts of remote partitioning, check the article above; right now I’m gonna only focus on the implementation details because the concept is the very same.

Project implementation

Before doing anything, let’s create a Gradle project on start.spring.io with the following dependencies:

  • Spring Batch
  • Spring Integration
  • MySQL
  • Liquibase

This is just the initial set of dependencies, we gotta add a few more manually. Open

build.gradle
build.gradle

:

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.batch:spring-batch-integration'
implementation 'org.springframework.integration:spring-integration-aws:2.3.5.RELEASE'
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.6.RELEASE'
implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.186'
implementation 'org.liquibase:liquibase-core'
runtimeOnly 'mysql:mysql-connector-java'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-batch'
	implementation 'org.springframework.batch:spring-batch-integration'
	implementation 'org.springframework.integration:spring-integration-aws:2.3.5.RELEASE'
	implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.6.RELEASE'
	implementation 'com.amazonaws:aws-java-sdk-sqs:1.12.186'
	implementation 'org.liquibase:liquibase-core'
	runtimeOnly 'mysql:mysql-connector-java'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

The following 3 needs to be added:

  • org.springframework.integration:spring-integration-aws
    org.springframework.integration:spring-integration-aws
  • org.springframework.cloud:spring-cloud-aws-messaging
    org.springframework.cloud:spring-cloud-aws-messaging
  • com.amazonaws:aws-java-sdk-sqs
    com.amazonaws:aws-java-sdk-sqs

These dependencies will supply the AWS SQS support with Spring Integration.

Manager

First of all, the manager configuration class:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;
}

This is very similar to the one in the other article but obviously we don’t need the

KafkaTemplate
KafkaTemplate

here.

The

@Profile
@Profile annotation will ensure that this configuration class is only picked up if the
manager
manager

profile is set.

Next, the channel we’ll send the partitioned requests to:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }
}

The partitioner is the exact same I used for the Kafka article:

public class ExamplePartitioner implements Partitioner {
public static final String PARTITION_PREFIX = "partition";
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int partitionCount = 50;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < partitionCount; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("data", new ArrayList<Integer>());
partitions.put(PARTITION_PREFIX + i, executionContext);
for (int i = 0; i < 1000; i++) {
String key = PARTITION_PREFIX + (i % partitionCount);
ExecutionContext executionContext = partitions.get(key);
List<Integer> data = (List<Integer>) executionContext.get("data");
data.add(i + 1);
return partitions;
public class ExamplePartitioner implements Partitioner {
    public static final String PARTITION_PREFIX = "partition";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int partitionCount = 50;
        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < partitionCount; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.put("data", new ArrayList<Integer>());
            partitions.put(PARTITION_PREFIX + i, executionContext);
        }
        for (int i = 0; i < 1000; i++) {
            String key = PARTITION_PREFIX + (i % partitionCount);
            ExecutionContext executionContext = partitions.get(key);
            List<Integer> data = (List<Integer>) executionContext.get("data");
            data.add(i + 1);
        }
        return partitions;
    }
}

This partitioner implementation is not doing anything interesting. It creates 50 partitions and for each partition, it puts some numbers into a list that’s accessible under the key data. That means, each partition will have 20 numbers in the list.

Let’s create the job steps and create a bean from the partitioner:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public ExamplePartitioner partitioner() {
return new ExamplePartitioner();
@Bean
public Step partitionerStep() {
return stepBuilderFactory.get("partitionerStep")
.partitioner(Constants.WORKER_STEP_NAME, partitioner())
.outputChannel(outboundRequests())
.build();
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public ExamplePartitioner partitioner() {
        return new ExamplePartitioner();
    }
    @Bean
    public Step partitionerStep() {
        return stepBuilderFactory.get("partitionerStep")
                .partitioner(Constants.WORKER_STEP_NAME, partitioner())
                .outputChannel(outboundRequests())
                .build();
    }
}

The

Constant
Constant

class looks the following:

public class Constants {
public static final String QUEUE_NAME = "work";
public static final String WORKER_STEP_NAME = "simpleStep";
public class Constants {
    public static final String QUEUE_NAME = "work";
    public static final String WORKER_STEP_NAME = "simpleStep";
}

Then the job definition:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean(name = "partitionerJob")
public Job partitionerJob() {
return jobBuilderFactory.get("partitioningJob")
.start(partitionerStep())
.incrementer(new RunIdIncrementer())
.build();
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean(name = "partitionerJob")
    public Job partitionerJob() {
        return jobBuilderFactory.get("partitioningJob")
                .start(partitionerStep())
                .incrementer(new RunIdIncrementer())
                .build();
    }
}

And then hook up the output channel we defined earlier with AWS SQS:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public IntegrationFlow outboundFlow(AmazonSQSAsync sqsAsync) {
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsync);
sqsMessageHandler.setQueue(Constants.QUEUE_NAME);
return IntegrationFlows.from(outboundRequests())
.transform(objectToJsonTransformer())
.log()
.handle(sqsMessageHandler)
.get();
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public IntegrationFlow outboundFlow(AmazonSQSAsync sqsAsync) {
        SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsync);
        sqsMessageHandler.setQueue(Constants.QUEUE_NAME);
        return IntegrationFlows.from(outboundRequests())
                .transform(objectToJsonTransformer())
                .log()
                .handle(sqsMessageHandler)
                .get();
    }
}

The

SqsMessageHandler
SqsMessageHandler is the important Spring Integration class we’re using with an
AmazonSQSAsync
AmazonSQSAsync bean which is essenatially just an SQS client – we’ll define that in a second. One more thing to call out here is the
objectToJsonTransformer()
objectToJsonTransformer()

call which is a bean definition since we have to tell Spring Integration how to serialize the messages.

@Configuration
@Profile("manager")
public class ManagerConfiguration {
// previous content is omitted for simplicity
@Bean
public ObjectToJsonTransformer objectToJsonTransformer() {
return new ObjectToJsonTransformer();
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    // previous content is omitted for simplicity

    @Bean
    public ObjectToJsonTransformer objectToJsonTransformer() {
        return new ObjectToJsonTransformer();
    }
}

Then on, let’s create a new class to define the SQS client:

@Configuration
public class SqsConfiguration {
@Bean
public AmazonSQSAsync amazonSQSAsync() {
AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().build();
ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
if (listQueuesResult.getQueueUrls().isEmpty()) {
sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
return sqsAsync;
@Configuration
public class SqsConfiguration {
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().build();
        ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
        if (listQueuesResult.getQueueUrls().isEmpty()) {
            sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
        }
        return sqsAsync;
    }
}

If you’re running a real AWS SQS queue, this will be the configuration but since we have emulators, let’s configure the SQS client to point to localhost – i.e. the emulated SQS service. It’s also important to note that the queue has to be created so the configuration above actually creates the queue on startup if it doesn’t exist.

@Configuration
public class SqsConfiguration {
@Bean
public AmazonSQSAsync amazonSQSAsync() {
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1");
AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials("", ""))
).withEndpointConfiguration(endpointConfiguration).build();
ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
if (listQueuesResult.getQueueUrls().isEmpty()) {
sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
return sqsAsync;
@Configuration
public class SqsConfiguration {
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        AwsClientBuilder.EndpointConfiguration endpointConfiguration =
                new AwsClientBuilder.EndpointConfiguration("http://localhost:4566", "us-east-1");

        AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard().withCredentials(
                new AWSStaticCredentialsProvider(new BasicAWSCredentials("", ""))
        ).withEndpointConfiguration(endpointConfiguration).build();
        ListQueuesResult listQueuesResult = sqsAsync.listQueues(Constants.QUEUE_NAME);
        if (listQueuesResult.getQueueUrls().isEmpty()) {
            sqsAsync.createQueueAsync(Constants.QUEUE_NAME);
        }
        return sqsAsync;
    }
}

I’ll use

localstack
localstack

to create the emulated service, you’ll see it in a sec.

That concludes the configuration for the manager.

Worker

The worker is going to be very similar:

@Configuration
@Profile("worker")
public class WorkerConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public IntegrationFlow inboundFlow(AmazonSQSAsync sqsAsync) {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(sqsAsync, Constants.QUEUE_NAME);
return IntegrationFlows
.from(adapter)
.transform(jsonToObjectTransformer())
.channel(inboundRequests())
.get();
@Bean
public Transformer jsonToObjectTransformer() {
return new JsonToStepExecutionRequestTransformer();
@Bean
public QueueChannel inboundRequests() {
return new QueueChannel();
@Configuration
@Profile("worker")
public class WorkerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public IntegrationFlow inboundFlow(AmazonSQSAsync sqsAsync) {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(sqsAsync, Constants.QUEUE_NAME);
        return IntegrationFlows
                .from(adapter)
                .transform(jsonToObjectTransformer())
                .channel(inboundRequests())
                .get();
    }

    @Bean
    public Transformer jsonToObjectTransformer() {
        return new JsonToStepExecutionRequestTransformer();
    }

    @Bean
    public QueueChannel inboundRequests() {
        return new QueueChannel();
    }
}

In this case, the

SqsMessageDrivenChannelAdapter
SqsMessageDrivenChannelAdapter is used with the SQS client and we need to deserialize the JSON message into an Object so we’ll use a
JsonToStepExecutionRequestTransformer
JsonToStepExecutionRequestTransformer

which looks the following:

public class JsonToStepExecutionRequestTransformer extends AbstractTransformer {
@Override
protected Object doTransform(Message<?> message) {
Map map = null;
map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
StepExecutionRequest stepExecutionRequest = new StepExecutionRequest((String) map.get("stepName"), Long.valueOf((Integer) map.get("jobExecutionId")),
Long.valueOf((Integer) map.get("stepExecutionId")));
return this.getMessageBuilderFactory().withPayload(stepExecutionRequest).build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
public class JsonToStepExecutionRequestTransformer extends AbstractTransformer {
    @Override
    protected Object doTransform(Message<?> message) {
        Map map = null;
        try {
            map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
            StepExecutionRequest stepExecutionRequest = new StepExecutionRequest((String) map.get("stepName"), Long.valueOf((Integer) map.get("jobExecutionId")),
                    Long.valueOf((Integer) map.get("stepExecutionId")));
            return this.getMessageBuilderFactory().withPayload(stepExecutionRequest).build();
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

And the remaining configuration for the worker is the steps:

@Configuration
@Profile("worker")
public class WorkerConfiguration {
// previous content is omitted for simplicity
@Bean(name = "simpleStep")
public Step simpleStep() {
return stepBuilderFactory.get(Constants.WORKER_STEP_NAME)
.inputChannel(inboundRequests())
.<Integer, Customer>chunk(100)
.reader(itemReader(null))
.processor(itemProcessor())
.writer(itemWriter())
.build();
@Bean
public ItemWriter<Customer> itemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.beanMapped()
.dataSource(dataSource)
.sql("INSERT INTO customers (id) VALUES (:id)")
.build();
@Bean
public ItemProcessor<Integer, Customer> itemProcessor() {
return new ItemProcessor<>() {
@Override
public Customer process(Integer item) {
return new Customer(item);
@Bean
@StepScope
public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) {
List<Integer> remainingData = new ArrayList<>(data);
return new ItemReader<>() {
@Override
public Integer read() {
if (remainingData.size() > 0) {
return remainingData.remove(0);
return null;
@Configuration
@Profile("worker")
public class WorkerConfiguration {
    // previous content is omitted for simplicity

    @Bean(name = "simpleStep")
    public Step simpleStep() {
        return stepBuilderFactory.get(Constants.WORKER_STEP_NAME)
                .inputChannel(inboundRequests())
                .<Integer, Customer>chunk(100)
                .reader(itemReader(null))
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .beanMapped()
                .dataSource(dataSource)
                .sql("INSERT INTO customers (id) VALUES (:id)")
                .build();
    }

    @Bean
    public ItemProcessor<Integer, Customer> itemProcessor() {
        return new ItemProcessor<>() {
            @Override
            public Customer process(Integer item) {
                return new Customer(item);
            }
        };
    }

    @Bean
    @StepScope
    public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) {
        List<Integer> remainingData = new ArrayList<>(data);
        return new ItemReader<>() {
            @Override
            public Integer read() {
                if (remainingData.size() > 0) {
                    return remainingData.remove(0);
                }

                return null;
            }
        };
    }
}

The steps are essentially the same as for the Kafka article.

The

Customer
Customer

class:

public class Customer {
private int id;
public Customer(int id) {
this.id = id;
public int getId() {
return id;
public class Customer {
    private int id;

    public Customer(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
}

That’s it.

Bit more generic config

Liquibase changelog in

src/main/resources/db/changelog/db.changelog-master.xml
src/main/resources/db/changelog/db.changelog-master.xml

:

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
<changeSet id="0001-initial" author="Arnold Galovics">
<createTable tableName="customers">
<column name="id" type="number">
</column>
</createTable>
<sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/>
</changeSet>
</databaseChangeLog>
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
    <changeSet id="0001-initial" author="Arnold Galovics">
        <createTable tableName="customers">
            <column name="id" type="number">
            </column>
        </createTable>
        <sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/>
    </changeSet>
</databaseChangeLog>

The

application.properties
application.properties

:

spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true
spring.datasource.username=root
spring.datasource.password=mysql
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml
spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true
spring.datasource.username=root
spring.datasource.password=mysql
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml

That’s all.

Testing

For testing, we need the infrastructure, a MySQL server and a locakstack server to provide an emulated SQS service. Let’s create a

docker-compose.yml
docker-compose.yml

in the root of the project:

version: "3"
services:
mysql:
image: mysql
ports:
- '3306:3306'
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: mysql
localstack:
image: localstack/localstack
ports:
- "4566:4566"
environment:
- SERVICES=sqs
- DOCKER_HOST=unix:///var/run/docker.sock
- DEFAULT_REGION=us-east-1
- HOSTNAME_EXTERNAL=localhost
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
version: "3"
services:
  mysql:
    image: mysql
    ports:
      - '3306:3306'
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: mysql
  localstack:
    image: localstack/localstack
    ports:
      - "4566:4566"
    environment:
      - SERVICES=sqs
      - DOCKER_HOST=unix:///var/run/docker.sock
      - DEFAULT_REGION=us-east-1
      - HOSTNAME_EXTERNAL=localhost
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"

That’s all. Let’s do a

docker-compose up
docker-compose up

to start it.

Then start 3 worker instances by executing the below command three times:

$ ./gradlew bootRun --args='--spring.profiles.active=worker'
$ ./gradlew bootRun --args='--spring.profiles.active=worker'

Then let’s start the manager with:

$ ./gradlew bootRun --args='--spring.profiles.active=manager'
$ ./gradlew bootRun --args='--spring.profiles.active=manager'

The manager instance will distribute the work and then wait until the workers have completed all the partitions and eventually stops successfully with the message:

2022-04-10 12:20:35.539 INFO 26368 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 6s629ms
2022-04-10 12:20:35.539  INFO 26368 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitioningJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 6s629ms

And then if we check the database state on the

customers
customers

table, you’ll find 1000 distinct IDs which is the expected behavior.

Summary

That’s how easy it is to integrate Spring Batch with AWS SQS for remote partitioning.

The full code is available on GitHub.

If you liked it, feel free to share it with your friends, colleagues and make sure to follow me on Twitter and Facebook.

email-icon-whatsapp-viber-icon-white-texture-white-board-clothing-transparent-png-232675.png

Join the circle!

Get access to exclusive content I'll only send to this group. Deals, upcoming courses, regular articles and much more. Sign up and get access.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK