44

深入理解 Kafka Connect:转换器和序列化

 5 years ago
source link: https://www.infoq.cn/articles/LOf0ukcu*dMtNaJdTxd5?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

AI 前线导读:Kafka Connect 是一个简单但功能强大的工具,可用于 Kafka 和其他系统之间的集成。人们对 Kafka Connect 最常见的误解之一是它的转换器。这篇文章将告诉我们如何正确地使用消息的序列化格式,以及如何在 Kafka Connect 连接器中对其进行标准化。

Kafka Connect 是 Apache Kafka 的一部分,为其他数据存储和 Kafka 提供流式集成。对于数据工程师来说,他们只需要配置一下 JSON 文件就可以了。Kafka 提供了一些可用于常见数据存储的连接器,如 JDBC、Elasticsearch、IBM MQ、S3 和 BigQuery,等等。

对于开发人员来说,Kafka Connect 提供了丰富的 API,如果有必要还可以开发其他连接器。除此之外,它还提供了用于配置和管理连接器的 REST API。

Kafka Connect 是一种模块化组件,提供了一种非常强大的集成方法。一些关键组件包括:

  • 连接器——定义如何与数据存储集成的 JAR 文件;
  • 转换器——处理数据的序列化和反序列化;
  • 变换——可选的运行时消息操作。

人们对 Kafka Connect 最常见的误解与数据的序列化有关。Kafka Connect 使用转换器处理数据序列化。接下来让我们看看它们是如何工作的,并说明如何解决一些常见问题。

Kafka 消息都是字节

Kafka 消息被保存在主题中,每条消息就是一个键值对。当它们存储在 Kafka 中时,键和值都只是字节。Kafka 因此可以适用于各种场景,但这也意味着开发人员需要决定如何序列化数据。

在配置 Kafka Connect 时,序列化格式是最关键的配置选项之一。你需要确保从主题读取数据时使用的序列化格式与写入主题的序列化格式相同,否则就会出现混乱和错误!

ZzURJvv.png!web

序列化格式有很多种,常见的包括:

  • JSON;
  • Avro;
  • Protobuf;
  • 字符串分隔(如 CSV)。

选择序列化格式

选择序列化格式的一些指导原则:

  • schema。很多时候,你的数据都有对应的 schema。你可能不喜欢,但作为开发人员,你有责任保留和传播 schema。schema 为服务之间提供了一种契约。某些消息格式(例如 Avro 和 Protobuf)具有强大的 schema 支持,而其他消息格式支持较少(JSON)或根本没有(CVS)。
  • 生态系统兼容性。Avro 是 Confluent 平台的一等公民,拥有来自 Confluent Schema Registry、Kafka Connect、KSQL 的原生支持。另一方面,Protobuf 依赖社区为部分功能提供支持。
  • 消息大小。JSON 是纯文本的,并且依赖了 Kafka 本身的压缩机制,Avro 和 Protobuf 都是二进制格式,序列化的消息体积更小。
  • 语言支持。Avro 在 Java 领域得到了强大的支持,但如果你的公司不是基于 Java 的,那么可能会觉得它不太好用。

如果目标系统使用 JSON,Kafka 主题也必须使用 JSON 吗?

完全不需要这样。从数据源读取数据或将数据写入外部数据存储的格式不需要与 Kafka 消息的序列化格式一样。

Kafka Connect 中的连接器负责从源数据存储(例如数据库)获取数据,并以数据内部表示将数据传给转换器。然后,Kafka Connect 的转换器将这些源数据对象序列化到主题上。

在使用 Kafka Connect 作为接收器时刚好相反——转换器将来自主题的数据反序列化为内部表示,传给连接器,以便能够使用特定于目标的适当方法将数据写入目标数据存储。

也就是说,主题数据可以是 Avro 格式,当你将数据写入 HDFS 时,指定接收器的连接器使用 HDFS 支持的格式即可。

配置转换器

Kafka Connect 默认使用了 worker 级别的转换器配置,连接器可以对其进行覆盖。由于在整个管道中使用相同的序列化格式通常会更好,所以一般只需要在 worker 级别设置转换器,而不需要在连接器中指定。但你可能需要从别人的主题拉取数据,而他们使了用不同的序列化格式——对于这种情况,你需要在连接器配置中设置转换器。即使你在连接器的配置中进行了覆盖,它仍然是执行实际任务的转换器。

好的连接器一般不会序列化或反序列化存储在 Kafka 中的消息,它会让转换器完成这项工作。

bUNRZbR.png!web

请记住,Kafka 消息是键值对字节,你需要使用 key.converter 和 value.converter 为键和值指定转换器。在某些情况下,你可以为键和值使用不同的转换器。

z6FJV36.png!web

这是使用 String 转换器的一个示例。

复制代码

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

有些转换器有一些额外的配置。对于 Avro,你需要指定 Schema Registry。对于 JSON,你需要指定是否希望 Kafka Connect 将 schema 嵌入到 JSON 消息中。在指定特定于转换器的配置时,请始终使用 key.converter. 或 value.converter. 前缀。例如,要将 Avro 用于消息载荷,你需要指定以下内容:

复制代码

"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",

常见的转换器包括:

  • Avro——来自 Confluent 的开源项目

复制代码

io.confluent.connect.avro.AvroConverter
  • String——Apache Kafka 的一部分

复制代码

org.apache.kafka.connect.storage.StringConverter
  • JSON——Apache Kafka 的一部分

复制代码

org.apache.kafka.connect.json.JsonConverter
  • ByteArray——Apache Kafka 的一部分

复制代码

org.apache.kafka.connect.converters.ByteArrayConverter
  • Protobuf——来自社区的开源项目

复制代码

com.blueapron.connect.protobuf.ProtobufConverter

JSON 和 schema

虽然 JSON 默认不支持嵌入 schema,但 Kafka Connect 提供了一种可以将 schema 嵌入到消息中的特定 JSON 格式。由于 schema 被包含在消息中,因此生成的消息大小可能会变大。

如果你正在设置 Kafka Connect 源,并希望 Kafka Connect 在写入 Kafka 消息包含 schema,可以这样:

复制代码

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

生成的 Kafka 消息看起来像下面这样,其中包含 schema 和 payload 节点元素:

复制代码

{
 "schema": {
   "type":"struct",
   "fields": [
      {
       "type":"int64",
       "optional":false,
       "field":"registertime"
      },
      {
       "type":"string",
       "optional":false,
       "field":"userid"
      },
      {
       "type":"string",
       "optional":false,
       "field":"regionid"
      },
      {
       "type":"string",
       "optional":false,
       "field":"gender"
      }
    ],
   "optional":false,
   "name":"ksql.users"
  },
 "payload": {
   "registertime":1493819497170,
   "userid":"User_1",
   "regionid":"Region_5",
   "gender":"MALE"
  }
}

请注意消息的大小,消息由 playload 和 schema 组成。每条消息中都会重复这些数据,这也就是为什么说 Avro 这样的格式会更好,因为它的 schema 是单独存储的,消息中只包含载荷(并进行了压缩)。

如果你正在使用 Kafka Connect 消费 Kafka 主题中的 JSON 数据,那么就需要知道数据是否包含了 schema。如果包含了,并且它的格式与上述的格式相同,那么你可以这样设置:

复制代码

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

不过,如果你正在消费的 JSON 数据如果没有 schema 加 payload 这样的结构,例如:

复制代码

{
 "registertime":1489869013625,
 "userid":"User_1",
 "regionid":"Region_2",
 "gender":"OTHER"
}

那么你必须通过设置 schemas.enable = false 告诉 Kafka Connect 不要查找 schema:

复制代码

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

和之前一样,转换器配置选项(这里是 schemas.enable)需要使用前缀 key.converter 或 value.converter。

常见错误

如果你错误地配置了转换器,将会遇到以下的一些常见错误。这些消息将显示在你为 Kafka Connect 配置的接收器中,因为你试图在接收器中反序列化 Kafka 消息。这些错误会导致连接器失败,主要错误消息如下所示:

复制代码

ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceededinerror handler
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)

在错误消息的后面,你将看到堆栈信息,描述了发生错误的原因。请注意,对于连接器中的任何致命错误,都会抛出上述异常,因此你可能会看到与序列化无关的错误。要快速查看错误配置可能会导致的错误,请参考下表:

N3uEVzj.png!web

问题:使用 JsonConverter 读取非 JSON 数据

如果你的源主题上有非 JSON 数据,但尝试使用 JsonConverter 读取它,你将看到:

复制代码

org.apache.kafka.connect.errors.DataException:Convertingbyte[]toKafkaConnectdatafailedduetoserializationerror:
…
org.apache.kafka.common.errors.SerializationException:java.io.CharConversionException:InvalidUTF-32character0x1cfa7e2(above0x0010ffff)atchar#1,byte#7)

这有可能是因为源主题使用了 Avro 或其他格式。

解决方案:如果数据是 Avro 格式的,那么将 Kafka Connect 接收器的配置改为:

复制代码

"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",

或者,如果主题数据是通过 Kafka Connect 填充的,那么你也可以这么做,让上游源也发送 JSON 数据:

复制代码

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

问题:使用 AvroConverter 读取非 Avro 数据

这可能是我在 Confluent Community 邮件组和 Slack 组等地方经常看到的错误。当你尝试使用 Avro 转换器从非 Avro 主题读取数据时,就会发生这种情况。这包括使用 Avro 序列化器而不是 Confluent Schema Registry 的 Avro 序列化器(它有自己的格式)写入的数据。

复制代码

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro messageforid -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

解决方案:检查源主题的序列化格式,修改 Kafka Connect 接收器连接器,让它使用正确的转换器,或将上游格式切换为 Avro。如果上游主题是通过 Kafka Connect 填充的,则可以按如下方式配置源连接器的转换器:

复制代码

"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",

问题:没有使用预期的 schema/payload 结构读取 JSON 消息

如前所述,Kafka Connect 支持包含载荷和 schema 的 JSON 消息。如果你尝试读取不包含这种结构的 JSON 数据,你将收到这个错误:

复制代码

org.apache.kafka.connect.errors.DataException: JsonConverterwithschemas.enablerequires "schema"and"payload" fieldsandmaynotcontain additional fields.Ifyou are tryingtodeserialize plainJSONdata,setschemas.enable=falseinyour converterconfiguration.

需要说明的是,当 schemas.enable=true 时,唯一有效的 JSON 结构需要包含 schema 和 payload 这两个顶级元素(如上所示)。

如果你只有简单的 JSON 数据,则应将连接器的配置改为:

复制代码

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

如果要在数据中包含 schema,可以使用 Avro(推荐),也可以修改上游的 Kafka Connect 配置,让它在消息中包含 schema:

复制代码

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",

故障排除技巧

查看 Kafka Connect 日志

要在 Kafka Connect 中查找错误日志,你需要找到 Kafka Connect 工作程序的输出。这个位置取决于你是如何启动 Kafka Connect 的。有几种方法可用于安装 Kafka Connect,包括 Docker、Confluent CLI、systemd 和手动下载压缩包。你可以这样查找日志的位置:

  • Docker:docker logs container_name;
  • Confluent CLI:confluent log connect;
  • systemd:日志文件在 /var/log/confluent/kafka-connect;
  • 其他:默认情况下,Kafka Connect 将其输出发送到 stdout,因此你可以在启动 Kafka Connect 的终端中找到它们。

查看 Kafka Connect 配置文件

  • Docker——设置环境变量,例如在 Docker Compose 中:

复制代码

CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL:'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:'http://schema-registry:8081'
  • Confluent CLI——使用配置文件 etc/schema-registry/connect-avro-distributed.properties;
  • systemd(deb/rpm)——使用配置文件 /etc/kafka/connect-distributed.properties;
  • 其他——在启动 Kafka Connect 时指定工作程序的属性文件,例如:

复制代码

$cdconfluent-5.0.0
$./bin/connect-distributed./etc/kafka/connect-distributed.properties

检查 Kafka 主题

假设我们遇到了上述当中的一个错误,并且想要解决 Kafka Connect 接收器无法从主题读取数据的问题。

我们需要检查正在被读取的数据,并确保它使用了正确的序列化格式。另外,所有消息都必须使用这种格式,所以不要假设你现在正在以正确的格式向主题发送消息就不会出问题。Kafka Connect 和其他消费者也会从主题上读取已有的消息。

下面,我将使用命令行进行故障排除,当然也可以使用其他的一些工具:

  • Confluent Control Center 提供了可视化检查主题内容的功能;
  • KSQL 的 PRINT 命令将主题的内容打印到控制台;
  • Confluent CLI 工具提供了 consume 命令,可用于读取字符串和 Avro 数据。

如果你的数据是字符串或 JSON 格式

你可以使用控制台工具,包括 kafkacat 和 kafka-console-consumer。我个人的偏好是使用 kafkacat:

复制代码

$ kafkacat -blocalhost:9092-t users-json-noschema -C -c1
{"registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}

你也可以使用 jq 验证和格式化 JSON:

复制代码

$kafkacat -blocalhost:9092-t users-json-noschema -C -c1|jq'.'
{
 "registertime":1493356576434,
 "userid":"User_8",
 "regionid":"Region_2",
 "gender":"MALE"
}

如果你得到一些“奇怪的”字符,你查看的很可能是二进制数据,这些数据是通过 Avro 或 Protobuf 写入的:

复制代码

$ kafkacat -blocalhost:9092-t users-avro -C -c1
ڝ���VUser_9Region_MALE

如果你的数据是 Avro 格式

你应该使用专为读取和反序列化 Avro 数据而设计的控制台工具。我使用的是 kafka-avro-console-consumer。确保指定了正确的 Schema Registry URL:

复制代码

$ kafka-avro-console-consumer--bootstrap-serverlocalhost:9092\
                             --propertyschema.registry.url=http://localhost:8081\
                             --topicusers-avro \
                             --from-beginning--max-messages1
{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}

和之前一样,如果要格式化,可以使用 jq:

复制代码

$kafka-avro-console-consumer--bootstrap-serverlocalhost:9092\
                             --propertyschema.registry.url=http://localhost:8081\
                             --topicusers-avro\
                             --from-beginning--max-messages1 | \
                             jq'.'
{
 "registertime":1505213905022,
 "userid":"User_5",
 "regionid":"Region_4",
 "gender":"FEMALE"
}

内部转换器

在分布式模式下运行时,Kafka Connect 使用 Kafka 来存储有关其操作的元数据,包括连接器配置、偏移量等。

可以通过 internal.key.converter/internal.value.converter 让这些 Kafka 使用不同的转换器。不过这些设置只在内部使用,实际上从 Apache Kafka 2.0 开始就已被弃用。你不应该更改这些配置,从 Apache Kafka 2.0 版开始,如果你这么做了将会收到警告。

将 schema 应用于没有 schema 的消息

很多时候,Kafka Connect 会从已经存在 schema 的地方引入数据,并使用合适的序列化格式(例如 Avro)来保留这些 schema。然后,这些数据的所有下游用户都可以使用这些 schema。但如果没有提供显式的 schema 该怎么办?

或许你正在使用 FileSourceConnector 从普通文件中读取数据(不建议用于生产环境中,但可用于 PoC),或者正在使用 REST 连接器从 REST 端点提取数据。由于它们都没有提供 schema,因此你需要声明它。

有时候你只想传递你从源读取的字节,并将它们保存在一个主题上。但大多数情况下,你需要 schema 来使用这些数据。在摄取时应用一次 schema,而不是将问题推到每个消费者,这才是一种更好的处理方式。

你可以编写自己的 Kafka Streams 应用程序,将 schema 应用于 Kafka 主题中的数据上,当然你也可以使用 KSQL。下面让我们来看一下将 schema 应用于某些 CSV 数据的简单示例。

假设我们有一个 Kafka 主题 testdata-csv,保存着一些 CSV 数据,看起来像这样:

复制代码

$ kafkacat -b localhost:9092-t testdata-csv -C
1,Rick Astley,Never Gonna Give You Up
2,Johnny Cash,Ring of Fire

我们可以猜测它有三个字段,可能是:

  • ID
  • Artist
  • Song

如果我们将数据保留在这样的主题中,那么任何想要使用这些数据的应用程序——无论是 Kafka Connect 接收器还是自定义的 Kafka 应用程序——每次都需要都猜测它们的 schema 是什么。或者,每个消费应用程序的开发人员都需要向提供数据的团队确认 schema 是否发生变更。正如 Kafka 可以解耦系统一样,这种 schema 依赖让团队之间也有了硬性耦合,这并不是一件好事。

因此,我们要做的是使用 KSQL 将 schema 应用于数据上,并使用一个新的派生主题来保存 schema。这样你就可以通过 KSQL 检查主题数据:

复制代码

ksql> PRINT'testdata-csv'FROM BEGINNING;
Format:STRING
11/6/182:41:23PM UTC , NULL ,1,Rick Astley,Never Gonna Give You Up
11/6/182:41:23PM UTC , NULL ,2,Johnny Cash,Ring of Fire

前两个字段(11/6/18 2:41:23 PM UTC 和 NULL)分别是 Kafka 消息的时间戳和键。其余字段来自 CSV 文件。现在让我们用 KSQL 注册这个主题并声明 schema:

复制代码

ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \
WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');

Message
----------------
Stream created
----------------

可以看到,KSQL 现在有一个数据流 schema:

复制代码

ksql> DESCRIBE TESTDATA_CSV;

Name                 : TESTDATA_CSV
 Field   |Type
-------------------------------------
 ROWTIME |BIGINT(system)
 ROWKEY  |VARCHAR(STRING) (system)
 ID      |INTEGER
 ARTIST  |VARCHAR(STRING)
 SONG    |VARCHAR(STRING)
-------------------------------------
Forruntimestatisticsandquery details run: DESCRIBE EXTENDED <Stream,Table>;

可以通过查询 KSQL 流来检查数据是否符合预期。请注意,这个时候我们只是作为现有 Kafka 主题的消费者——并没有更改或复制任何数据。

复制代码

ksql>SET'auto.offset.reset'='earliest';
Successfully changed localproperty'auto.offset.reset'from'null'to'earliest'
ksql>SELECTID, ARTIST, SONGFROMTESTDATA_CSV;
1| Rick Astley | Never Gonna Give You Up
2| Johnny Cash | RingofFire

最后,创建一个新的 Kafka 主题,使用带有 schema 的数据进行填充。KSQL 查询是持续的,因此除了将现有的数据从源主题发送到目标主题之外,KSQL 还将向目标主题发送未来将生成的数据。

复制代码

ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;

Message
----------------------------
Stream created and running
----------------------------

使用 Avro 控制台消费者验证数据:

复制代码

$ kafka-avro-console-consumer --bootstrap-server localhost:9092\
                                --propertyschema.registry.url=http://localhost:8081\
                                --topic TESTDATA \
                                --from-beginning | \
                                jq'.'
{
 "ID": {
   "int":1
},
 "ARTIST": {
   "string":"Rick Astley"
},
 "SONG": {
   "string":"Never Gonna Give You Up"
  }
}
[…]

你甚至可以在 Schema Registry 中查看已注册的 schema:

复制代码

$ curl -shttp://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'
{
 "type":"record",
 "name":"KsqlDataSourceSchema",
 "namespace":"io.confluent.ksql.avro_schemas",
 "fields": [
    {
     "name":"ID",
     "type": [
       "null",
       "int"
      ],
     "default":null
    },
    {
     "name":"ARTIST",
     "type": [
       "null",
       "string"
      ],
     "default":null
    },
    {
     "name":"SONG",
     "type": [
       "null",
       "string"
      ],
     "default":null
    }
  ]
}

写入原始主题(testdata-csv)的任何新消息都由 KSQL 自动处理,并以 Avro 格式写入新的 TESTDATA 主题。现在,任何想要使用这些数据的应用程序或团队都可以使用 TESTDATA 主题。你还可以更改主题的分区数、分区键和复制系数。

英文原文: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK