

Using Debezium With the Apicurio API and Schema Registry
source link: https://debezium.io/blog/2020/04/09/using-debezium-wit-apicurio-api-schema-registry/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Using Debezium With the Apicurio API and Schema Registry
Change events streamed from a database by Debezium are (in developer parlance) strongly typed. This means that event consumers should be aware of the types of data conveyed in the events. This problem of passing along message type data can be solved in multiple ways:
-
the message structure is passed out-of-band to the consumer, which is able to process the data stored in it
-
the message contains metadata (the schema) that is embedded within the message
-
the message contains a reference to a registry which contains the associated metadata
An example of the first case is Apache Kafka’s well known JsonConverter
. It can operate in two modes - with and without schemas. When configured to work without schemas, it generates a plain JSON message where the consumer either needs to know the types of each field beforehand, or it needs to execute heuristic rules to "guess" and map values to datatypes. While this approach is quite flexible it can fail for more advanced cases, e.g. temporal or other semantic types encoded as strings. Also, constraints associated with the types are usually lost.
Here’s an example of such a message:
{
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]"
},
"source": {
"version": "1.1.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 0,
"snapshot": "true",
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1586331101491,
"transaction": null
}
Note how no type information beyond JSON’s basic type system is present. E.g. a consumer cannot conclude from the event itself, which length the numeric id
field has.
An example of the second case is again JsonConverter
. By means of its schemas.enable
option, the JSON message will consist of two parts - schema
and payload
. The payload
part is exactly the same as in the previous case; the schema
part contains a description of the message, its fields, field types and associated type constraints. This enables the consumer to process the message in a type-safe way. The drawback of this approach is that the message size has increased significantly, as the schema is quite a large object. As schemas tend to be changed rarely (how often do you change the definitions of the columns of your database tables?), adding the schema to each and every event poses a signficant overhead.
The following example of a message with a schema clearly shows that the schema itself can be significantly larger than the payload and is not very economical to use:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]"
},
"source": {
"version": "1.1.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 0,
"snapshot": "true",
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1586331101491,
"transaction": null
}
}
Recommend
-
13
Debezium serialization with Apache Avro and Apicurio Registry
-
9
Testing Apicurio Registry’s performance and scalability
-
7
Contract First API Design with Apicurio and Fuse/Camel
-
303
AKHQ (previously known as KafkaHQ) Kafka GUI for Apache Kafka to manage topics, topics data, consumers group, schema registry, connect and more... Contents
-
8
Confluent CloudData Sharing Patterns with Confluent Schema RegistryDecember 15, 2021Sharing metadata on the data you st...
-
24
Article Serialize Debezium events with Apache Avro and OpenShift Service Registry ...
-
7
CloudKarafka introduces Schema Registry Written by Elin Vinka 2018-04-23 To maintain Kafka's characteris...
-
7
Previous Articles on CDC: Motivation I was working on a demo requiring data to be serialized in Avro format. CockroachDB supports Avro for change data capture when used in conjunction with Confluent Schema Registry....
-
10
Future license changes for Connectors, Schema Registry, REST proxy Written by CloudKarafka Blog Team 2019-02-05
-
7
Schema Registry ensures that the Kafka components communicate in the correct format and interact with both the producer and the consumer to provide a serving layer for metadata. In this guide, CloudKarafka demonstrates how to use Schema Re...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK