12

Using Debezium With the Apicurio API and Schema Registry

 4 years ago
source link: https://debezium.io/blog/2020/04/09/using-debezium-wit-apicurio-api-schema-registry/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Using Debezium With the Apicurio API and Schema Registry

Change events streamed from a database by Debezium are (in developer parlance) strongly typed. This means that event consumers should be aware of the types of data conveyed in the events. This problem of passing along message type data can be solved in multiple ways:

  1. the message structure is passed out-of-band to the consumer, which is able to process the data stored in it

  2. the message contains metadata (the schema) that is embedded within the message

  3. the message contains a reference to a registry which contains the associated metadata

An example of the first case is Apache Kafka’s well known JsonConverter. It can operate in two modes - with and without schemas. When configured to work without schemas, it generates a plain JSON message where the consumer either needs to know the types of each field beforehand, or it needs to execute heuristic rules to "guess" and map values to datatypes. While this approach is quite flexible it can fail for more advanced cases, e.g. temporal or other semantic types encoded as strings. Also, constraints associated with the types are usually lost.

Here’s an example of such a message:

{
  "before": null,
  "after": {
    "id": 1001,
    "first_name": "Sally",
    "last_name": "Thomas",
    "email": "[email protected]"
  },
  "source": {
    "version": "1.1.0.Final",
    "connector": "mysql",
    "name": "dbserver1",
    "ts_ms": 0,
    "snapshot": "true",
    "db": "inventory",
    "table": "customers",
    "server_id": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",
  "ts_ms": 1586331101491,
  "transaction": null
}

Note how no type information beyond JSON’s basic type system is present. E.g. a consumer cannot conclude from the event itself, which length the numeric id field has.

An example of the second case is again JsonConverter. By means of its schemas.enable option, the JSON message will consist of two parts - schema and payload. The payload part is exactly the same as in the previous case; the schema part contains a description of the message, its fields, field types and associated type constraints. This enables the consumer to process the message in a type-safe way. The drawback of this approach is that the message size has increased significantly, as the schema is quite a large object. As schemas tend to be changed rarely (how often do you change the definitions of the columns of your database tables?), adding the schema to each and every event poses a signficant overhead.

The following example of a message with a schema clearly shows that the schema itself can be significantly larger than the payload and is not very economical to use:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "[email protected]"
    },
    "source": {
      "version": "1.1.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "c",
    "ts_ms": 1586331101491,
    "transaction": null
  }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK