15

Change data capture: Install Debezium on K8s

 2 years ago
source link: https://tienbm90.medium.com/change-data-capture-install-debezium-on-k8s-8a98a55a1406
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Change data capture: Install Debezium on K8s

Create a namespace for the resources we’re going to create:

kubectl create namespace kafka

Then install the cluster operator and associated resources:

curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.16.1/strimzi-cluster-operator-0.16.1.yaml \
| sed 's/namespace: .*/namespace: kafka/' \
| kubectl apply -f - -n kafka

And spin up a Kafka cluster, waiting until it’s ready:

kubectl -n kafka \
apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.16.1/examples/kafka/kafka-persistent-single.yaml \
&& kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka

Build Kafka Connect image

The next step is to create a Strimzi Kafka Connect image which includes the Debezium MySQL connector and its dependencies.

First download and extract the Debezium MySQL connector archive

curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.0.0.Final/debezium-connector-mysql-1.0.0.Final-plugin.tar.gz \
| tar xvz

Prepare a Dockerfile which adds those connector files to the Strimzi Kafka Connect image

cat <<EOF >Dockerfile
FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001
EOF

Then build the image from that Dockerfile and push it to dockerhub.

# You can use your own dockerhub organization
export DOCKER_ORG=tjbentley
docker build . -t ${DOCKER_ORG}/connect-debezium
docker push ${DOCKER_ORG}/connect-debezium

Deploy MariaDB

We will use bitnami helm chart to deploy MariaDB on K8s cluster with following configurations:

## Global Docker image parameters
## Please, note that this will override the image parameters, including dependencies, configured to use the global value
## Current available global Docker image parameters: imageRegistry and imagePullSecrets
##
# global:
# imageRegistry: myRegistryName
# imagePullSecrets:
# - myRegistryKeySecretName
# storageClass: myStorageClass## Use an alternate scheduler, e.g. "stork".
## ref: https://kubernetes.io/docs/tasks/administer-cluster/configure-multiple-schedulers/
##
# schedulerName:## Bitnami MariaDB image
## ref: https://hub.docker.com/r/bitnami/mariadb/tags/
##
image:
registry: docker.io
repository: bitnami/mariadb
tag: 10.3.22-debian-10-r27
## Specify a imagePullPolicy
## Defaults to 'Always' if image tag is 'latest', else set to 'IfNotPresent'
## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images
##
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
##
# pullSecrets:
# - myRegistryKeySecretName## Set to true if you would like to see extra information on logs
## It turns BASH and NAMI debugging in minideb
## ref: https://github.com/bitnami/minideb-extras/#turn-on-bash-debugging
debug: false## String to partially override mariadb.fullname template (will maintain the release name)
##
# nameOverride:## String to fully override mariadb.fullname template
##
# fullnameOverride:## Init containers parameters:
## volumePermissions: Change the owner and group of the persistent volume mountpoint to runAsUser:fsGroup values from the securityContext section.
##
volumePermissions:
enabled: false
image:
registry: docker.io
repository: bitnami/minideb
tag: buster
pullPolicy: Always
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
##
# pullSecrets:
# - myRegistryKeySecretName
resources: {}service:
## Kubernetes service type, ClusterIP and NodePort are supported at present
type: ClusterIP
# clusterIp:
# master: xx.xx.xx.xx
# slave: xx.xx.xx.xx
port: 3306
## Specify the nodePort value for the LoadBalancer and NodePort service types.
## ref: https://kubernetes.io/docs/concepts/services-networking/service/#type-nodeport
##
# nodePort:
# master: 30001
# slave: 30002## Pods Service Account
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/
serviceAccount:
## Specifies whether a ServiceAccount should be created
##
create: false
## The name of the ServiceAccount to use.
## If not set and create is true, a name is generated using the mariadb.fullname template
# name:## Role Based Access
## Ref: https://kubernetes.io/docs/admin/authorization/rbac/
##
rbac:
create: false## Pod Security Context
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/
##
securityContext:
enabled: true
fsGroup: 1001
runAsUser: 1001## Use existing secret (ignores root, db and replication passwords)
##
# existingSecret:## MariaDB admin credentials
##
rootUser:
## MariaDB admin password
## ref: https://github.com/bitnami/bitnami-docker-mariadb#setting-the-root-password-on-first-run
##
password: "TestDB@2019"
## Option to force users to specify a password. That is required for 'helm upgrade' to work properly.
## If it is not force, a random password will be generated.
##
forcePassword: true
## Mount admin password as a file instead of using an environment variable
##
injectSecretsAsVolume: true## Custom user/db credentials
##
db:
## MariaDB username and password
## ref: https://github.com/bitnami/bitnami-docker-mariadb#creating-a-database-user-on-first-run
##
user: "testUser"
password: "TestUser@2020"
## Database to create
## ref: https://github.com/bitnami/bitnami-docker-mariadb#creating-a-database-on-first-run
##
name: test_core
## Option to force users to specify a password. That is required for 'helm upgrade' to work properly.
## If it is not force, a random password will be generated.
##
forcePassword: true
## Mount user password as a file instead of using an environment variable
##
injectSecretsAsVolume: true## Replication configuration
##
replication:
## Enable replication. This enables the creation of replicas of MariaDB. If false, only a
## master deployment would be created
##
enabled: false
## MariaDB replication user
## ref: https://github.com/bitnami/bitnami-docker-mariadb#setting-up-a-replication-cluster
##
user: replicator
## MariaDB replication user password
## ref: https://github.com/bitnami/bitnami-docker-mariadb#setting-up-a-replication-cluster
##
password: "Replicator@2019"
## Option to force users to specify a password. That is required for 'helm upgrade' to work properly.
## If it is not force, a random password will be generated.
##
forcePassword: true
## Mount replication user password as a file instead of using an environment variable
##
injectSecretsAsVolume: true## initdb scripts
## Specify dictionary of scripts to be run at first boot
## Alternatively, you can put your scripts under the files/docker-entrypoint-initdb.d directory
##
# initdbScripts:
# my_init_script.sh: |
# #!/bin/sh
# echo "Do something."
#
## ConfigMap with scripts to be run at first boot
## Note: This will override initdbScripts
# initdbScriptsConfigMap:master:
## Mariadb Master additional pod annotations
## ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
# annotations:
# key: value
# another-key: another-value## MariaDB additional command line flags
## Can be used to specify command line flags, for example:
##
## extraFlags: "--max-connect-errors=1000 --max_connections=155"## Affinity for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
##
affinity: {}## Kept for backwards compatibility. You can now disable it by removing it.
## if you wish to set it through master.affinity.podAntiAffinity instead.
##
antiAffinity: soft## Node labels for pod assignment
## Ref: https://kubernetes.io/docs/user-guide/node-selection/
##
nodeSelector: {}## Tolerations for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
##
tolerations: []## updateStrategy for MariaDB Master StatefulSet
## ref: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#update-strategies
updateStrategy:
type: RollingUpdate## Enable persistence using Persistent Volume Claims
## ref: http://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
## If true, use a Persistent Volume Claim, If false, use emptyDir
##
enabled: true
# Enable persistence using an existing PVC
# existingClaim:
# Subdirectory of the volume to mount
# subPath:
mountPath: /bitnami/mariadb
## Persistent Volume Storage Class
## If defined, storageClassName: <storageClass>
## If set to "-", storageClassName: "", which disables dynamic provisioning
## If undefined (the default) or set to null, no storageClassName spec is
## set, choosing the default provisioner. (gp2 on AWS, standard on
## GKE, AWS & OpenStack)
##
# storageClass: "-"
## Persistent Volume Claim annotations
##
annotations: {}
## Persistent Volume Access Mode
##
accessModes:
- ReadWriteOnce
## Persistent Volume size
##
size: 8GiextraInitContainers: |
# - name: do-something
# image: busybox
# command: ['do', 'something']## An array to add extra environment variables
## For example:
## extraEnvVars:
## - name: TZ
## value: "Europe/Paris"
##
# extraEnvVars:## Configure MySQL with a custom my.cnf file
## ref: https://mysql.com/kb/en/mysql/configuring-mysql-with-mycnf/#example-of-configuration-file
##
config: |-
[mysqld]
skip-name-resolve
explicit_defaults_for_timestamp
basedir=/opt/bitnami/mariadb
plugin_dir=/opt/bitnami/mariadb/plugin
port=3306
socket=/opt/bitnami/mariadb/tmp/mysql.sock
tmpdir=/opt/bitnami/mariadb/tmp
max_allowed_packet=16M
bind-address=0.0.0.0
pid-file=/opt/bitnami/mariadb/tmp/mysqld.pid
log-error=/opt/bitnami/mariadb/logs/mysqld.log
character-set-server=UTF8
collation-server=utf8_general_ci## Configure binary log for capturing database change
binlog_format=ROW
server-id = 184054
log_bin = mysql-bin
expire_logs_days = 10


[client]
port=3306
socket=/opt/bitnami/mariadb/tmp/mysql.sock
default-character-set=UTF8
plugin_dir=/opt/bitnami/mariadb/plugin[manager]
port=3306
socket=/opt/bitnami/mariadb/tmp/mysql.sock
pid-file=/opt/bitnami/mariadb/tmp/mysqld.pid## Configure master resource requests and limits
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
##
resources: {}
livenessProbe:
enabled: true
##
## Initializing the database could take some time
initialDelaySeconds: 120
##
## Default Kubernetes values
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
readinessProbe:
enabled: true
initialDelaySeconds: 30
##
## Default Kubernetes values
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3podDisruptionBudget:
enabled: false
minAvailable: 1
# maxUnavailable: 1## Allow customization of the service resource
##
service:
## Add custom annotations to the service
##
annotations: {}slave:
replicas: 1## Mariadb Slave additional pod annotations
## ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
# annotations:
# key: value
# another-key: another-value## MariaDB additional command line flags
## Can be used to specify command line flags, for example:
##
## extraFlags: --max-connect-errors=1000 --max_connections=155"## Affinity for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
##
affinity: {}## Kept for backwards compatibility. You can now disable it by removing it.
## if you wish to set it through slave.affinity.podAntiAffinity instead.
##
antiAffinity: soft## Node labels for pod assignment
## Ref: https://kubernetes.io/docs/user-guide/node-selection/
##
nodeSelector: {}## Tolerations for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
##
tolerations: []## updateStrategy for MariaDB Slave StatefulSet
## ref: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#update-strategies
updateStrategy:
type: RollingUpdatepersistence:
## If true, use a Persistent Volume Claim, If false, use emptyDir
##
enabled: true
# storageClass: "-"
annotations:
accessModes:
- ReadWriteOnce
## Persistent Volume size
##
size: 8GiextraInitContainers: |
# - name: do-something
# image: busybox
# command: ['do', 'something']## An array to add extra environment variables
## For example:
## extraEnvVars:
## - name: TZ
## value: "Europe/Paris"
##
# extraEnvVars:## Configure MySQL slave with a custom my.cnf file
## ref: https://mysql.com/kb/en/mysql/configuring-mysql-with-mycnf/#example-of-configuration-file
##
config: |-
[mysqld]
skip-name-resolve
explicit_defaults_for_timestamp
basedir=/opt/bitnami/mariadb
port=3306
socket=/opt/bitnami/mariadb/tmp/mysql.sock
tmpdir=/opt/bitnami/mariadb/tmp
max_allowed_packet=16M
bind-address=0.0.0.0
pid-file=/opt/bitnami/mariadb/tmp/mysqld.pid
log-error=/opt/bitnami/mariadb/logs/mysqld.log
character-set-server=UTF8
collation-server=utf8_general_ci[client]
port=3306
socket=/opt/bitnami/mariadb/tmp/mysql.sock
default-character-set=UTF8[manager]
port=3306
socket=/opt/bitnami/mariadb/tmp/mysql.sock
pid-file=/opt/bitnami/mariadb/tmp/mysqld.pid##
## Configure slave resource requests and limits
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
##
resources: {}
livenessProbe:
enabled: true
##
## Initializing the database could take some time
initialDelaySeconds: 120
##
## Default Kubernetes values
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
readinessProbe:
enabled: true
initialDelaySeconds: 45
##
## Default Kubernetes values
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3podDisruptionBudget:
enabled: false
minAvailable: 1
# maxUnavailable: 1## Allow customization of the service resource
##
service:
## Add custom annotations to the service
##
annotations: {}metrics:
enabled: false
image:
registry: docker.io
repository: bitnami/mysqld-exporter
tag: 0.12.1-debian-10-r27
pullPolicy: IfNotPresent
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
##
# pullSecrets:
# - myRegistryKeySecretName
resources: {}
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9104"## Extra args to be passed to mysqld_exporter
## ref: https://github.com/prometheus/mysqld_exporter/
##
extraArgs:
master: []
slave: []
# - --collect.auto_increment.columns
# - --collect.binlog_size
# - --collect.engine_innodb_status
# - --collect.engine_tokudb_status
# - --collect.global_status
# - --collect.global_variables
# - --collect.info_schema.clientstats
# - --collect.info_schema.innodb_metrics
# - --collect.info_schema.innodb_tablespaces
# - --collect.info_schema.innodb_cmp
# - --collect.info_schema.innodb_cmpmem
# - --collect.info_schema.processlist
# - --collect.info_schema.processlist.min_time
# - --collect.info_schema.query_response_time
# - --collect.info_schema.tables
# - --collect.info_schema.tables.databases
# - --collect.info_schema.tablestats
# - --collect.info_schema.userstats
# - --collect.perf_schema.eventsstatements
# - --collect.perf_schema.eventsstatements.digest_text_limit
# - --collect.perf_schema.eventsstatements.limit
# - --collect.perf_schema.eventsstatements.timelimit
# - --collect.perf_schema.eventswaits
# - --collect.perf_schema.file_events
# - --collect.perf_schema.file_instances
# - --collect.perf_schema.indexiowaits
# - --collect.perf_schema.tableiowaits
# - --collect.perf_schema.tablelocks
# - --collect.perf_schema.replication_group_member_stats
# - --collect.slave_status
# - --collect.slave_hosts
# - --collect.heartbeat
# - --collect.heartbeat.database
# - --collect.heartbeat.tablelivenessProbe:
enabled: true
##
## Initializing the database could take some time
initialDelaySeconds: 120
##
## Default Kubernetes values
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3
readinessProbe:
enabled: true
initialDelaySeconds: 30
##
## Default Kubernetes values
periodSeconds: 10
timeoutSeconds: 1
successThreshold: 1
failureThreshold: 3# Enable this if you're using https://github.com/coreos/prometheus-operator
serviceMonitor:
enabled: false
## Specify a namespace if needed
# namespace: monitoring
# fallback to the prometheus default unless specified
# interval: 10s
# scrapeTimeout: 10s
## Defaults to what's used if you follow CoreOS [Prometheus Install Instructions](https://github.com/helm/charts/tree/master/stable/prometheus-operator#tldr)
## [Prometheus Selector Label](https://github.com/helm/charts/tree/master/stable/prometheus-operator#prometheus-operator-1)
## [Kube Prometheus Selector Label](https://github.com/helm/charts/tree/master/stable/prometheus-operator#exporters)
selector:
prometheus: kube-prometheus## Bats Framework (= Bash Automated Testing System) is needed to test if MariaDB is accessible
## See test-runner.yaml and tests.yaml for details.
## To run the tests after the deployment, enter "helm test <release-name>".
tests:
enabled: true
# resources: {}
testFramework:
image:
registry: docker.io
repository: dduportal/bats
tag: 0.4.0
# resources: {}

Run following command to create MariaDB pods:

helm -n kafka install mariadb bitnami/mariadb -f values.yaml

Create database and grant permissions

To connect to your database:

1. Run a pod that you can use as a client:

kubectl run mariadb-client — rm — tty -i — restart=’Never’ — image docker.io/bitnami/mariadb:10.3.22-debian-10-r27 — namespace kafka  — command — bash

2. To connect to master service (read/write):

mysql -h mariadb.kafka.svc.cluster.local -uroot -p

3. Create new database

create database test_core;

4. Grant permisssions to Debezium for connecting to MariaDB

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'testUser' IDENTIFIED BY 'TestUser@2020';

Create the database credentials

To make this a bit more realistic we’re going to use Kafka’s config.providers mechanism to avoid having to pass secret information over Kafka Connect REST interface (which uses unencrypted HTTP). We’ll to use a Kubernetes Secret called my-sql-credentials to store the database credentials. This will be mounted as a secret volume within the connect pods. We can then configure the connector with the path to this file.

Let’s create the secret:

cat <<EOF > debezium-mysql-credentials.properties
mysql_username: testUser
mysql_password: TestUser@2020
EOF
kubectl -n kafka create secret generic my-sql-credentials \
--from-file=debezium-mysql-credentials.properties

Create the Connect cluster

Now we can create a KafkaConnect cluster in Kubernetes:

cat <<EOF | kubectl -n kafka apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ${DOCKER_ORG}/connect-debezium
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: connector-config
secret:
secretName: my-sql-credentialsEOF

It’s worth pointing out a couple of things about the above resource:

  • In the metadata.annotations the strimzi.io/use-connector-resources: "true" annotation tells the cluster operator that KafkaConnector resources will be used to configure connectors within this Kafka Connect cluster.
  • The spec.image is the image we created with docker.
  • In the config we’re using replication factor 1 because we created a single-broker Kafka cluster.
  • In the externalConfiguration we’re referencing the secret we just created.

Create the connector

The last piece is to create the KafkaConnector resource configured to connect to our “inventory” database in MySQL.

Here’s what the KafkaConnector resource looks like:

cat | kubectl -n kafka apply -f - << 'EOF'
apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
name: "inventory-connector"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mariadb.kafka.svc.cluster.local
database.port: "3306"
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"

database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
database.history.kafka.topic: "schema-changes.inventory"
include.schema.changes: "true"
EOF

In metadata.labels, strimzi.io/cluster names the KafkaConnect cluster which this connector will be created in.

The spec.class names the Debezium MySQL connector and spec.tasksMax must be 1 because that’s all this connector ever uses.

The spec.config object contains the rest of the connector configuration. The Debezium documentation explains the available properties, but it’s worth calling out some specifically:

  • I’m using database.hostname: address for connecting to MySQL.
  • The database.port: "3306" works because of the -p 3306:3306 argument we used when we started up the MariaDB server.
  • The ${file:...} used for the database.user and database.password is a placeholder which gets replaced with the referenced property from the given file in the secret we created.
  • The database.whitelist: "inventory" basically tells Debezium to only watch the inventory database.
  • The database.server.id: "184054" make sure database.server.id matches with the server-id you set in your my.cnf file.
  • The database.history.kafka.topic: "schema-changes.inventory" configured Debezium to use the schema-changes.inventory topic to store the database schema history.

Verify Kafka connector status:

kubectl get kctr inventory-connector -o yaml

If everything is fine, the result will like:

......
status:
conditions:
- lastTransitionTime: "2020-06-17T02:26:58.497Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: RUNNING
worker_id: 10.40.182.6:8083
name: inventory-connector
tasks:
- id: 0
state: RUNNINGworker_id: 10.40.182.6:8083
type: source
observedGeneration: 1

Check kafka topics

for example using kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- bin/kafka-topics.sh --bootstrap-server localhost:9092 --list you should see:

__consumer_offsets
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
schema-changes.inventory

The connect-cluster-* topics are the usual internal Kafka Connect topics. Debezium has created a topic for the server itself (dbserver1), and one for each table within the inventory database.

Let’s start consuming from one of those change topics:

kubectl -n kafka exec my-cluster-kafka-0 -c kafka -i -t -- \
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic dbserver1.inventory.customers

Insert or update new value to table “customers” and you will see the result.!!!

References:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK