41

Testing Spring Embedded Kafka consumer and producer - Knoldus Blogs

 2 years ago
source link: https://blog.knoldus.com/testing-spring-embedded-kafka-consumer-and-producer/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Reading Time: 2 minutes

This blog I’m talking about the Kafka testing without physical installation of Kafka services or docker container.
For testing, I’m going to use another Spring library that is called spring-kafka-test. It provides much functionality to ease our job in the testing process and takes care of Kafka consumer or a producer works as expected.

Maven Test Dependencies

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>

application.yml props file

These are the minimum configuration for Kafka producer and consumer API.

spring:
  profiles:
    active: test
  kafka:
    topic:
       name: test-topic
    consumer:
      auto-offset-reset: earliest
      group-id: test-group
    listener:
      ack-mode: manual
      client-id: test-client-id
      concurrency: 10
      type: batch
    admin:
      client-id: admin-test-client-id

Kafka Producer

The Producer API allows applications to send streams of data to topics in the Kafka cluster.

@Service
public class KafkaProducer {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private final KafkaTemplate<String, BankModel> kafkaTemplate;
    @Value("${spring.kafka.topic.name}")
    private String topic;

public KafkaProducer(KafkaTemplate<String, BankModel> kafkaTemplate){
     this.kafkaTemplate=kafkaTemplate;
}

public void send(BankModel model){
        kafkaTemplate.send(topic, model.getAccountNumber(), model);
}

}

Kafka Consumer

The Consumer API allows applications to read streams of data from topics in the Kafka cluster.

@Service
public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private ObjectMapper mapper=new ObjectMapper();

    @KafkaListener(topics = "#{'${spring.kafka.topic.name}'.split(',')}")
    public void listen(List<BankModel> recordBatch,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("------------------------------------------");
        log.info("Payload size= "+recordBatch.size());
        recordBatch.forEach(record -> {
            try {
                log.info("Bank model Json: " + mapper.writeValueAsString(recordBatch));
            } catch (JsonProcessingException e) {
                log.error("Exception occurred during messages processing ", e);
            }
        });
       log.info("------------------------------------------");
    }
}

Test Class

Test Kafka consumer that takes care of reading messages from Kafka. But before that, we need to do some configuration for EmbeddedKafka.

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class EmbeddedKafkaIntegrationTest {
    private BankModel event = new BankModel(UUID.randomUUID().toString(), UUID.randomUUID().toString(), "7703", 111, "12/05/2021", "Abid", "Khan", 10000d);

    @SpyBean
    private KafkaConsumerService consumer;

    @Autowired
    private KafkaProducer producer;

    @Captor
    ArgumentCaptor<List<BankModel>> bankModelArgumentCaptor;

    @Value("${spring.kafka.topic.name}")
    private String TOPIC_NAME;

    @Captor
    ArgumentCaptor<String> topicArgumentCaptor;

    @Test
    public void embeddedKafka_whenSendingToSimpleProducer_thenMessageReceived() {
        
        //Producer
        producer.send(event);

        //consumer
        verify(consumer, timeout(1000).times(1)).listen(BankModelArgumentCaptor.capture(),
                topicArgumentCaptor.capture());
        List<BankModel> batchPayload = BankModelArgumentCaptor.getValue();
        assertNotNull(batchPayload);
        assertThat(batchPayload.size(), equalTo(01));
        assertTrue(TOPIC_NAME.contains(topicArgumentCaptor.getValue()));
        testEvents(batchPayload);
    }

    private void testEvents(List<BankAccount> eventsPayload) {
        eventsPayload.forEach(record -> {
            assertNotNull(record);
            assertEquals(event.getAccountNumber(), record.getAccountNumber());
            assertEquals(event.getTransactionId(), record.getTransactionId());
            assertEquals(event.getIdentificationNumber(), record.getIdentificationNumber());
            assertEquals(event.getSecurityCode(), record.getSecurityCode());
            assertEquals(event.getDateOfBirth(), record.getDateOfBirth());
            assertEquals(event.getFirstName(), record.getFirstName());
            assertEquals(event.getLastName(), record.getLastName());
            assertEquals(event.getBalance(), record.getBalance());
        });
    }

The test class has three annotations,

  • @EmbeddedKafka – annotation that can be specified on a test class that runs Spring for Apache Kafka based tests.
  • @SpringBootTest – annotation tells Spring Boot to look for a main configuration class (one with @SpringBootApplication, for instance) and use that to start a Spring application context. using Spring Test and Spring Boot features to test the interactions between Spring and your code.
  • @DirtiesContext – test annotation which indicates that the ApplicationContext associated with a test is dirty and should therefore be closed and removed from the context cache.

Conclusion

It’s very easy to test a Kafka integration. The @EmbeddedKafka is providing a handy annotation to get started. With the JUnit 5 approach you can do similar tests.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK