41

专为实时而构建:使用Apache Kafka进行大数据消息传递,第1部分

 5 years ago
source link: http://www.apexyun.com/zhuan-wei-shi-shi-er-gou-jian-shi-yong-apache-kafkajin-xing-da-shu-ju-xiao-xi-chuan-di-di-1bu-fen/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

当大数据运动开始时,它主要集中在批处理上。分布式数据存储和查询工具(如MapReduce,Hive和Pig)都旨在分批处理数据而不是连续处理数据。企业每晚都会运行多个作业,从数据库中提取数据,然后分析,转换并最终存储数据。最近,企业发现了分析和处理数据和事件的能力,而不是每隔几个小时 就会发生 一次。然而,大多数传统的消息传递系统不能扩展以实时处理大数据。所以LinkedIn的工程师 构建并开源Apache Kafka :一种分布式消息传递框架,通过扩展商用硬件来满足大数据的需求。

在过去几年中,Apache Kafka已经出现,以解决各种情况。在最简单的情况下,它可以是用于存储应用程序日志的简单缓冲区。结合Spark Streaming等技术,它可用于 跟踪数据更改并对数据执行操作 ,然后将其保存到最终目标。Kafka的预测模式使其成为检测欺诈的有力工具,例如在信用卡交易发生时检查信用卡交易的有效性,而不是等待数小时后的批处理。

这个由两部分组成的教程介绍了Kafka,从如何在开发环境中安装和运行它开始。您将了解Kafka的架构,然后介绍如何开发开箱即用的Apache Kafka消息传递系统。最后,您将构建一个自定义生产者/消费者应用程序,通过Kafka服务器发送和使用消息。在本教程的后半部分,您将学习如何对消息进行分区和分组,以及如何控制Kafka消费者将使用哪些消息。

什么是Apache Kafka?

Apache Kafka 是为大数据扩展而构建的消息传递系统。与 Apache ActiveMQRabbitMq 类似,Kafka使构建在不同平台上的应用程序能够通过异步消息传递进行通信。但Kafka与这些更传统的消息传递系统的关键方式不同:

  • 它旨在通过添加更多服务器来横向扩展。
  • 它为生产者和消费者流程提供了更高的吞吐量。
  • 它可用于支持批处理和实时用例。
  • 它不支持Java的面向消息的中间件API JMS

Apache Kafka的架构

在我们探索Kafka的架构之前,您应该了解它的基本术语:

  • producer 是将消息发布到主题的一个过程。
  • consumer 是订阅一个或多个主题并且消费发布到主题的消息的过程。
  • topic 是消息发布的主题的名称。
  • broker 是在一台机器上运行的进程。
  • cluster是 一起工作的一组 broker
raMbamR.png!web

Apache Kafka的架构非常简单,可以在某些系统中实现更好的性能和吞吐量。Kafka中的每个topic都像一个简单的日志文件。当生产者发布消息时,Kafka服务器会将其附加到其给定topic的日志文件的末尾。服务器还分配一个 偏移量 ,该 偏移量 是用于永久识别每条消息的数字。随着消息数量的增加,每个偏移量的值增加; 例如,如果生产者发布三条消息,第一条消息可能获得偏移量1,第二条消息偏移量为2,第三条偏移量为3。

当Kafka消费者首次启动时,它将向服务器发送拉取请求,要求检索偏移值大于0的特定topic的任何消息。服务器将检查该topic的日志文件并返回三个新消息。消费者将处理消息,然后发送偏移 量大 于3的消息请求,依此类推。

在Kafka中,客户端负责记住偏移计数和检索消息.Kafka服务器不跟踪或管理消息消耗。默认情况下,Kafka服务器将保留七天的消息。服务器中的后台线程检查并删除七天或更早的消息。只要消息在服务器上,消费者就可以访问消息。它可以多次读取消息,甚至可以按收到的相反顺序读取消息。但是,如果消费者在七天之前未能检索到消息,那么它将错过该消息。

LinkedIn和其他企业的生产使用表明,通过适当的配置,Apache Kafka每天能够处理数百GB的数据。2011年,三位LinkedIn工程师使用 基准测试 来证明Kafka可以实现比ActiveMQ和RabbitMQ更高的吞吐量。

Apache Kafka快速设置和演示

我们将在本教程中构建一个自定义应用程序,但让我们首先安装和测试一个开箱即用的生产者和消费者的Kafka实例。

  1. 访问 Kafka下载页面 以安装最新版本(撰写本文时为0.9)。
  2. 将二进制文件解压缩到一个 software/kafka 文件夹中。对于当前版本,它是 software/kafka_2.11-0.9.0.0
  3. 将当前目录更改为指向新文件夹。
  4. 通过执行以下命令启动Zookeeper服务器: bin/zookeeper-server-start.sh config/zookeeper.properties
  5. 执行以下命令启动Kafka服务器: bin/kafka-server-start.sh config/server.properties
  6. 创建一个可用于测试的测试topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld
  7. 启动一个简单的控制台使用者,它可以使用发布到给定topic的消息,例如 javaworldbin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning
  8. 启动一个简单的生产者控制台,可以将消息发布到测试topic: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld
  9. 尝试在生产者控制台中输入一条或两条消息。您的消息应显示在使用者控制台中。

Apache Kafka的示例应用程序

您已经了解了Apache Kafka如何开箱即用。接下来,让我们开发一个自定义生产者/消费者应用程序。生产者将从控制台检索用户输入,并将每个新行作为消息发送到Kafka服务器。消费者将检索给定topic的消息并将其打印到控制台。在这种情况下,生产者和消费者组件是您自己的 kafka-console-producer.shkafka-console-consumer.sh

让我们从创建一个 Producer.java 类开始。此客户端类包含从控制台读取用户输入并将该输入作为消息发送到Kafka服务器的逻辑。

我们通过从 java.util.Properties 类创建对象并设置其属性来配置生产者。该 ProducerConfig 类定义了所有不同的属性可用,但Kafka的默认值足以满足大多数用途。对于默认配置,我们只需要设置三个必需属性:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) 设置主机:端口对的列表,用于以 host1:port1,host2:port2,... 格式建立与Kakfa集群的初始连接。即使我们的Kafka集群中有多个代理,我们也只需要指定第一个代理的值 host:port 。Kafka客户端将使用此值在代理上进行发现调用,该代理将返回集群中所有代理的列表。最好在 BOOTSTRAP_SERVERS_CONFIG 中指定多个代理,这样如果第一个代理停止运行,客户端将能够尝试其他代理。

Kafka服务器需要 byte[] key, byte[] value 格式化的消息。Kafka的客户端库不是转换每个键和值,而是允许我们使用更友好的类型 Stringint 发送消息。库将这些转换为适当的类型。例如,示例应用程序没有特定于消息的key,因此我们将使用 null 作为key。对于值,我们将使用 String ,即用户在控制台上输入的数据。

要配置 消息key ,我们用 org.apache.kafka.common.serialization.ByteArraySerializer 设定 KEY_SERIALIZER_CLASS_CONFIG 的值。这是有效的,因为 null 不需要转换为 byte[] 。对于 消息值 ,我们为 VALUE_SERIALIZER_CLASS_CONFIG 设置了 org.apache.kafka.common.serialization.StringSerializer ,因为该类知道如何将 String 转换为 byte[]

Kafka 生产者

Properties 使用必要的配置属性填充类之后,我们可以使用它来创建对象 KafkaProducer 。每当我们要发送的消息后,该Kafka服务器,我们将创建一个对象 ProducerRecord ,并调用 KafkaProducersend() 方法发送消息。 ProducerRecord 有两个参数:应该发布消息的topic的名称,以及实际的消息。使用生产者时,不要忘记调用该方法: Producer.close()

清单1. KafkaProducer

public class Producer {
          private static Scanner in;
          public static void main(String[] argv)throws Exception {
              if (argv.length != 1) {
                  System.err.println("Please specify 1 parameters ");
                  System.exit(-1);
              }
              String topicName = argv[0];
              in = new Scanner(System.in);
              System.out.println("Enter message(type exit to quit)");

              //Configure the Producer
              Properties configProperties = new Properties();
              configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
              configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
              configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

              org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
              String line = in.nextLine();
              while(!line.equals("exit")) {
                  ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
                  producer.send(rec);
                  line = in.nextLine();
              }
              in.close();
              producer.close();
          }
        }

配置消息使用者

接下来,我们将创建一个订阅topic的简单消费者。每当向topic发布新消息时,它将读取该消息并将其打印到控制台。消费者代码与生产者代码非常相似。我们首先创建一个对象 java.util.Properties ,设置其特定于消费者的属性,然后使用它来创建一个新对象 KafkaConsumerConsumerConfig 类定义了我们可以设置的所有属性。只有四个强制属性:

BOOTSTRAP_SERVERS_CONFIG(bootstrap.servers)KEY_DESERIALIZER_CLASS_CONFIG(key.deserializer)VALUE_DESERIALIZER_CLASS_CONFIG(value.deserializer)GROUP_ID_CONFIG(bootstrap.servers)

正如我们为生产者类所做的那样,我们将使用 BOOTSTRAP_SERVERS_CONFIG 为消费者类配置主机/端口对。此配置允许我们以 host1:port1,host2:port2,... 格式建立与Kakfa集群的初始连接。正如我之前提到的,Kafka服务器需要 byte[] 键和 byte[] 值格式的消息,并且有自己的实现来序列化不同的类型 byte[] 。正如我们对生产者所做的那样,在消费者方面,我们将不得不使用自定义反序列化器转换 byte[] 回适当的类型。在示例应用程序的情况下,我们知道生产者正在使用`ByteArraySerializer`

key和 StringSerializer 值。因此,在客户端,我们需要使用 org.apache.kafka.common.serialization.ByteArrayDeserializer 序列化key和 org.apache.kafka.common.serialization.StringDeserializer 序列化值。将这些类为赋值 KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 将使消费者反序列化由生产者发送的byte[]类型的数据。最后,我们需要设置值 GROUP_ID_CONFIG 。这应该是字符串格式的组名。我会在一分钟内详细解释这个配置。现在,只需查看具有四个强制属性集的Kafka消费者:

清单2. KafkaConsumer

public class Consumer {
      private static Scanner in;
      private static boolean stop = false;

      public static void main(String[] argv)throws Exception{
          if (argv.length != 2) {
              System.err.printf("Usage: %s <topicName> <groupId>\n",
                      Consumer.class.getSimpleName());
              System.exit(-1);
          }
          in = new Scanner(System.in);
          String topicName = argv[0];
          String groupId = argv[1];

          ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
          consumerRunnable.start();
          String line = "";
          while (!line.equals("exit")) {
              line = in.next();
          }
          consumerRunnable.getKafkaConsumer().wakeup();
          System.out.println("Stopping consumer .....");
          consumerRunnable.join();
      }

      private static class ConsumerThread extends Thread{
          private String topicName;
          private String groupId;
          private KafkaConsumer<String,String> kafkaConsumer;

          public ConsumerThread(String topicName, String groupId){
              this.topicName = topicName;
              this.groupId = groupId;
          }
          public void run() {
              Properties configProperties = new Properties();
              configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

              //Figure out where to start processing messages from
              kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
              kafkaConsumer.subscribe(Arrays.asList(topicName));
              //Start processing messages
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                      for (ConsumerRecord<String, String> record : records)
                          System.out.println(record.value());
                  }
              }catch(WakeupException ex){
                  System.out.println("Exception caught " + ex.getMessage());
              }finally{
                  kafkaConsumer.close();
                  System.out.println("After closing KafkaConsumer");
              }
          }
          public KafkaConsumer<String,String> getKafkaConsumer(){
             return this.kafkaConsumer;
          }
      }
  }

消费者和消费者线程

将清单2中的消费者代码分为两部分来确保 Consumer 在退出之前关闭对象。我将依次描述每个类。首先, ConsumerThread 是一个内部类,它将topic名称和组名称作为其参数。在该类的 run() 方法中,它创建一个具有适当属性的 KafkaConsumer 对象。它通过调用 kafkaConsumer.subscribe() 方法订阅topic,然后每100毫秒轮询Kafka服务器以检查topic中是否有任何新消息。它将遍历任何新消息的列表并将其打印到控制台。

Consumer 类中,我们创建一个新对象,并在另一个 ConsumerThread 线程中启动它。在 ConsumerThead 开始一个无限循环,并保持轮询新消息的topic。同时在 Consumer 类中,主线程等待用户进入 exit 控制台。一旦用户进入退出,它就会调用该 KafkaConsumer.wakeup() 方法,导致 KafkaConsumer 停止轮询新消息并抛出一个 WakeupException 。然后,我们可以通过调用 kafkaConsumerclose() 方法关闭 KafkaConsumer

运行该应用程序

要测试此应用程序,您可以从IDE运行清单1和清单2中的代码,也可以按照以下步骤操作:

  1. 通过执行以下命令下载示例代码 KafkaAPIClient : git clone https://github.com/sdpatil/KafkaAPIClient.git .
  2. 编译代码并使用以下命令创建胖JAR : mvn clean compile assembly:single .
  3. 启动消费者: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1
  4. 启动生产者: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test
  5. 在生产者控制台中输入消息,然后检查该消息是否出现在使用者中。试试几条消息。
  6. 键入 exit 消费者和生产者控制台以关闭它们。
nyimi2Q.png!web

第1部分的结论

在本教程的前半部分,您已经了解了使用Apache Kafka进行大数据消息传递的基础知识,包括Kafka的概念性概述,设置说明以及如何使用Kafka配置生产者/消费者消息传递系统。

正如您所见,Kafka的架构既简单又高效,专为性能和吞吐量而设计。在第2部分中,我将介绍一些使用Kafka进行分布式消息传递的更高级技术,从使用分区细分主题开始。我还将演示如何管理消息偏移以支持不同的用例。

英文原文: https://www.javaworld.com/article/3060078/big-data-messaging-with-kafka-part-1.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK