9

A look at the Camel Kafka Consumer

 3 years ago
source link: https://sergiuoltean.com/2020/08/27/a-look-at-the-camel-kafka-consumer/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Kafka

A look at the Camel Kafka Consumer

In this post let’s look at the Kafka Camel Component. Apache Camel is an implementation of the enterprise integration patterns. The class we want is KafkaConsumer

@Override
protected void doStart() throws Exception {
//...
executor = endpoint.createExecutor();
//...
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
// pre-initialize task during startup so if there is any error we
// have it thrown asap
task.preInit();
executor.submit(task);
tasks.add(task);
}
}

The first thing to look at is the executor at line 19. We see this method

public ExecutorService createExecutor() {
return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
}

This is a little bit confusing since the consumerStreams property is actually the thread pool size. If we look at the docs we see this

consumerStreams (consumer)Number of concurrent consumers on the consumer10consumerStreams property

Anyway by default we start with a thread pool size of 10. Then we look at the consumerCount

consumersCount (consumer)The number of consumers that connect to kafka server1consumersCount property

and we create the respective threads

for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + "", getProps());
// pre-initialize task during startup so if there is any error we
// have it thrown asap
task.preInit();
executor.submit(task);
tasks.add(task);

It’s important to remember that a thread per consumer instance is the rule we must never break(KafkaConsumer is NOT thread safe). We see that this rule is respected here in the task.preInit() method

protected void doInit () {
// create consumer
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Kafka uses reflection for loading authentication settings,
// use its classloader
Thread.currentThread()
.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
// this may throw an exception if something is wrong with kafka
// consumer
this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
}
class KafkaFetchRecords implements Runnable, ConsumerRebalanceListener {

which means we use a fire and forget mechanism, we don’t wait for a response. This will improve greatly the throughput. Also we see that this takes some action when a rebalance is triggered.

Lines 254-304 take care of subscribing to the topic and initializing an offset. Then the polling starts

LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs);
ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);

We go through all partitions and start pulling records

Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator();

and create Exchange objects as we go

Exchange exchange = endpoint.createKafkaExchange(record);
propagateHeaders(record, exchange, endpoint.getConfiguration());
// if not auto commit then we have additional
// information on the exchange
if (!isAutoCommitEnabled()) {
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, !recordIterator.hasNext());
}
if (endpoint.getConfiguration().isAllowManualCommit()) {
// allow Camel users to access the Kafka
// consumer API to be able to do for example
// manual commits
KafkaManualCommit manual = endpoint.getComponent().getKafkaManualCommitFactory()
.newInstance(exchange, consumer, topicName, threadId,
offsetRepository, partition, record.offset());
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
}

Another interesting bit is what should happen when we encounter an error

if (endpoint.getConfiguration().isBreakOnFirstError()) {
// we are failing and we should break
// out
LOG.warn(
"Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.",
exchange,
topicName, partitionLastOffset);
// force commit so we resume on next
// poll where we failed
commitOffset(offsetRepository, partition, partitionLastOffset, true);
// continue to next partition
breakOnErrorHit = true;
}

The interesting property here is

breakOnFirstError (consumer)This options controls what happens when a consumer is processing an exchange and it fails. If the option is false then the consumer continues to the next message and processes it. If the option is true then the consumer breaks out, and will seek back to offset of the message that caused a failure, and then re-attempt to process this message. However this can lead to endless processing of the same message if its bound to fail every time, eg a poison message. Therefore its recommended to deal with that for example by using Camel’s error handler.falsebreakOnFirstError property

Before the consumer unsubscribes we make sure we commit the offset

if (!reConnect) {
if (isAutoCommitEnabled()) {
if ("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName);
consumer.commitAsync();
} else if ("sync".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName);
consumer.commitSync();
}
}
}

autoCommitOnStop (consumer)Whether to perform an explicit auto commit when the consumer stops to ensure the broker has a commit from the last consumed message. This requires the option autoCommitEnable is turned on. The possible values are: sync, async, or none. And sync is the default value. The value can be one of: sync, async, nonesyncautoCommitOnStop property

On rebalance listener the only special thing that is happening comes from the fact that camel offers the possibility to store offsets into different types of repositories, overriding the default (__consumer_offsets topic). This will trigger only if we have defined a custom one.

@Override
public void onPartitionsRevoked (Collection < TopicPartition > partitions) {
LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName);
StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
for (TopicPartition partition : partitions) {
String offsetKey = serializeOffsetKey(partition);
Long offset = lastProcessedOffset.get(offsetKey);
if (offset == null) {
offset = -1L;
}
LOG.debug("Saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset);
commitOffset(offsetRepository, partition, offset, true);
lastProcessedOffset.remove(offsetKey);
}
}
@Override
public void onPartitionsAssigned (Collection < TopicPartition > partitions) {
LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, topicName);
StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
if (offsetRepository != null) {
for (TopicPartition partition : partitions) {
String offsetState = offsetRepository.getState(serializeOffsetKey(partition));
if (offsetState != null && !offsetState.isEmpty()) {
// The state contains the last read offset so you need
// to seek from the next one
long offset = deserializeOffsetValue(offsetState) + 1;
LOG.debug("Resuming partition {} from offset {} from state", partition.partition(), offset);
consumer.seek(partition, offset);
}
}
}
}

offsetRepository (consumer)The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit.offsetRepository property


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK