12

从KafkaConsumer看看Kafka(一) - Solinx

 4 years ago
source link: http://www.solinx.cn/archives/2019-10-19-22-37-48?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

  Kafka的消息模型为发布订阅模型,消息生产者将消息发布到主题(topic)中,一个或多个消费者订阅(消费)该主题消息并消费,此模型中发布到topic中的消息会被所有消费者所订阅到,先介绍Kafka消费模型,然后再通过KafkaConsumer原来了解它的业务流程,源码基于kafka 2.4;

Kafka消费模型关键点:

  1、Kafka一个消费组(ConsumerGroup)中存在一个或多个消费者(Consumer),每个消费者也必须属于一个消费者组;
  2、消费者组(ConsumerGroup)中的消费者(Consumer)独占一个或多个分区(Partition);
  3、消费时每个分区(Partition)最多只有一个Consumer再消费;
  4、消费者组(ConsumerGroup)在Broker存在一个协调者(Coordinator)分配管理Consumer与Partition之间的对应关系。当两种中的Consumer或Partition发生变更时将会触发reblance(重新平衡),重新分配Consumer与Partition的对应关系;

下面是Kafka消费者程序的示例:

//配置Consumer
Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//创建Consumer
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅主题
 consumer.subscribe(Arrays.asList("foo", "bar"));
//消费消息
     while (true) {
     ConsumerRecords<String, String> records = 
consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }

  在上面我们可以看到Kafka消费消息的整个流程:配置Consumer属性、订阅主题、拉取消费消息,基本流程知道了也就是这几个点,配置ConsumerId、自动提交offset、序列化、Kafka服务端地址,这就是Kafka最最最基础的配置,当然还有很多配置项可以到官网查看;

消费者关键点

  Consumer程序主要分为三个部分:配置、订阅主题、拉取消息;从中也可以看到在消费前需要订阅某个主题、在前面我们提到Consumer实例需要与某个Partition绑定关联然后才能进行消费数据,下面我们透过官方提供的Consumer程序简单看看如何订阅主题、如何关联Consumer与Partition、如何拉取消息消费;

订阅主题
  订阅主题可以说是Kafka消费的基础,下面先看看简化后的订阅方法:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    acquireAndEnsureOpen();
    try {
       //忽略部分代码
        if (topics.isEmpty()) {
            this.unsubscribe();
        } else {
            if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                metadata.requestUpdateForNewTopics();
        }
    } finally {
        release();
    }
}

  安全检查: Consumer注释中也说了KafkaConsumer为非线程安全的,从上也可看到acquireAndEnsureOpen的作用就是检查当前是否为多线程运行,确保Consumer只在一个线程中执行;
  设置订阅状态: SubscriptionState 对象的subscribe方法主要是设置ConsumerRelance监听器、设置所监听的主题;
  更新元数据: metadata对象维护了Kafka集群元数据子集,存储了Broker节点、Topic、Partition节点信息等;

  跟进metadata.requestUpdateForNewTopics方法发现最终调用了metadata对象的requestUpdate方法;

public synchronized int requestUpdate() {
    this.needUpdate = true;
    return this.updateVersion;
}

  此方法并没有什么实质性的动作,只是更新needUpdate属性为true;由于Kafka拉取数据时必须得到元数据信息否则无法知道broker、topic、Partition信息也就无法知道去哪个节点拉取数据;但此处并没有实质性的更新元数据请求,接下来我们看看拉取方法。

拉取数据
  上一步订阅了主题,这时我们就可以从中拉取数据,跟踪代码最终进入了KafkaConsumer的poll方法;

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    //多线程检查
    acquireAndEnsureOpen();
    try {//省略代码
//超时检查
            if (includeMetadataInTimeout) {
                //请求更新元数据
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {//省略代码
			}
            //拉取数据
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.pollNoWakeup();
                }
                //调用消费者拦截器后返回
                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        return ConsumerRecords.empty();
    } finally {
        release();
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}

此方法几个流程
1、 多线程检查
2、 超时检查
3、 请求更新元数据
4、 拉取数据
  此处我们比较关心的还是更新元数据与拉取数据,这里我们主要看看这两个流程的执行;

请求更新元数据
  在updateAssignmentMetadataIfNeeded方法中调用coordinator对象的poll方法去更新元数据,并且调用updateFetchPositions方法用于刷新Consumer对应Partition对应的offset值;

拉取数据
  数据的拉取在pollForFetches方法中;

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    //省略代码
    //从缓存区数据
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }
	//构造拉取请求发送
    fetcher.sendFetches();

    //省略代码
	//发起拉取数据请求
    Timer pollTimer = time.timer(pollTimeout);
    client.poll(pollTimer, () -> {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());
	//省略代码

    return fetcher.fetchedRecords();

pollForFetches方法执行逻辑:

  1、 从缓存取数据如有可用数据,直接返回;
  2、 构造请求对象fetches,一个节点node对应一个clientRequest对象,将其放入ConsumerNetworkClient对象的unsent属性中;
  3、 调用client对象poll方法,将上一步放入unsent属性的请求对象ClientRequest发送出去;
  4、 返回所拉取到的消息;

Offset提交
  offset提交放在ConsumerCoordinator对象中,offset提交又分为自动提交与手动提交;当设置了enable.auto.commit==true且  autoCommitIntervalMs等于指定间隔时有这么几个时机会触发自动:

  1、 consumer对象close时,调用commitOffsetsSync触发同步的offset提交;
  2、 consumer对象poll时,调用commitOffsetsAsync触发异步的offset提交;
  3、 触发Partition与Topic 分配 assign时触发commitOffsetsAsync异步提交;
  4、 当发生relance或有Consumer加入Group时触发commitOffsetsSync方法同步提交;

参考资料: http://kafka.apache.org


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK