2

Canal-监听数据库表的变化 - 张铁牛

 1 year ago
source link: https://www.cnblogs.com/ludangxin/p/16376932.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Canal-监听数据库表的变化 - 张铁牛 - 博客园

Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费功能。

工作原理

Mysql主备复制原理

1759273-20220615001310323-1683454139.jpg
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

mysql的binlog

它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。

binlog有三种模式:STATEMENTROWMIXED

  1. STATEMENT 记录的是执行的sql语句
  2. ROW 记录的是真实的行数据记录
  3. MIXED 记录的是1+2,优先按照1的模式记录
update user set age = 33

对应STATEMENT模式只是记录了当前执行的sql,而对应ROW模式则有可能有成千上万条记录(当然这取决于你user表的记录数)

2. 可以干什么

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

原生安装请参考:https://github.com/alibaba/canal/wiki/QuickStart

3.1 docker-compose安装

3.1.1 创建同步用户

如果想查看mysql server的相关配置可以参考 https://www.cnblogs.com/ludangxin/p/16358928.html 中的master配置

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3.1.2 修改配置文件

首先启动一个零时的容器用于 将容器中的配置文件信息copy到宿主机

# run 零时容器
docker run --name canal-temp -d --rm canal/canal-server:v1.1.6
# 执行copy操作  copy配置文件到当前目录中
docker cp canal-temp:/home/admin/canal-server/conf ./canal-server/conf

canal-server/conf配置文件目录结构如下

canal-server/conf
├── canal.properties # canal server 的配置文件参数信息 例如:服务的端口/集群参数/server 模式(# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)等
├── canal_local.properties
├── example # canal 实例相关配置信息,如果想要监听多个mysql server 可以copy此文件进行配置,当然也要在canal.properties的 canal.destinations 中添加对应的文件夹名称
│   ├── h2.mv.db
│   ├── instance.properties
│   └── meta.dat
├── logback.xml
...

修改配置文件信息

canal.properties我们使用默认的配置信息 即:canal.serverMode = tcp

example/instance.properties中配置mysql server连接信息 如下:

# position info
# mysql url 我这里直接使用的是 mysql容器name
canal.instance.master.address=my_mysql:3306
# 监听的binlog 文件名称 例:mysql-bin.000007
canal.instance.master.journal.name=
# 日志文件的Offset
canal.instance.master.position=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# table regex
# 默认配置是同步所有的库和表
#canal.instance.filter.regex=.*\\..*
# 配置只监听test库的user表,如果需要读取多个表可以使用正则表达式或者用逗号隔开
canal.instance.filter.regex=test.user

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠()
常见例子:
所有表:.* or .\…
canal schema下所有表: canal\…*
canal下的以canal打头的表:canal.canal.*
canal schema下的一张表:canal.test1
多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解
析sql,所以无法准确提取tableName进行过滤)

3.1.3 启动canal

docker-compose.yaml 如下

因为canal需要读取mysql server的bin-log所以需要设置加入到mysql server的网络中去

version: '3'   
services:
    canal:
        image: canal/canal-server:v1.1.6
        hostname: canal
        container_name: canal
        restart: "no"
        ports:
            - "11111:11111"
        volumes:
            - "./canal-server/conf:/home/admin/canal-server/conf"
            - "./canal-server/logs:/home/admin/canal-server/logs"
        networks:
            - mysql_mysql 
networks:
  mysql_mysql:
    external: true

4. springboot 测试

tips:可参考 https://github.com/NormanGyllenhaal/canal-client

4.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

4.2 添加配置

canal:
  server: localhost:11111
  destination: example

logging:
  level:
    top.javatool.canal.client.client: OFF

4.3 监听canal数据

package com.ldx.canaldemo.handler;

import com.ldx.canaldemo.domain.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@Slf4j
@Component
// 监听user表
@CanalTable(value = "user")
public class UserHandler implements EntryHandler<User> {

    @Override
    public void insert(User user) {
        log.info("insert info {}", user);
    }

    @Override
    public void update(User before, User after) {
        log.info("update before {} ", before);
        log.info("update after {}", after);
    }

    @Override
    public void delete(User user) {
        log.info("delete {}", user);
    }
}
package com.ldx.canaldemo.domain;

import lombok.Data;

import javax.persistence.Column;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;

@Data
@Table(name = "user")
public class User implements Serializable {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id")
    private Integer id;

    @Column(name = "username")
    private String username;

    @Column(name = "password")
    private String password;

    @Column(name = "sex")
    private Integer sex;
}

4.4 测试

user表信息如下

CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `username` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  `password` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  `age` int(1) DEFAULT NULL,
  `sex` int(1) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO `test`.`user`(`id`, `username`, `password`, `age`, `sex`) VALUES (5, 'zhangtieniu', '123456', 28, 1);

查看控制台输出如下

2022-06-14 13:50:38.144  INFO 71053 --- [xecute-thread-1] com.ldx.canaldemo.handler.UserHandler    : insert info User(id=5, username=zhangtieniu, password=123456, age=28, sex=1)
UPDATE `test`.`user` SET `username` = 'zhangsan', age = 23 WHERE id = 5;

查看控制台输出如下

2022-06-14 13:54:55.997  INFO 71053 --- [xecute-thread-2] com.ldx.canaldemo.handler.UserHandler    : update before User(id=null, username=zhangtieniu, password=null, age=28, sex=null) 
2022-06-14 13:54:55.997  INFO 71053 --- [xecute-thread-2] com.ldx.canaldemo.handler.UserHandler    : update after User(id=5, username=zhangsan, password=123456, age=23, sex=1)
DELETE FROM `test`.`user` WHERE id = 5;

查看控制台输出如下

2022-06-14 13:56:46.359  INFO 71053 --- [xecute-thread-3] com.ldx.canaldemo.handler.UserHandler    : delete User(id=5, username=zhangsan, password=123456, age=23, sex=1)

5. rabbit mq 测试

5.1 修改canal配置

canal.properties 修改如下

# 将serverMode 修改成rabbitMQ
canal.serverMode = rabbitMQ

# 添加rabbitmq 配置信息
rabbitmq.host = rabbitmq:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin123

example/instance.properties 添加路由规则

canal.mq.topic=canal_routing_key

5.2 springboot consumer

5.2.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.2.2 添加配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默认的虚拟主机
    virtual-host: /
    # rabbit 用户名密码
    username: admin
    password: admin123
    listener:
      simple:
        # manual 手动确认
        acknowledge-mode: manual

5.2.3 添加 consumer

package com.ldx.canaldemo.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author ludangxin
 * @date 2022/6/14
 */
@Slf4j
@Component
public class RabbitMQConsumer {
   @RabbitListener(bindings =
         {@QueueBinding(
               value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
               exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
               key = MqConstant.CANAL_ROUTING_KEY)
         })
   public void helloRabbitMq(Message message, Channel channel) throws IOException {
      MessageProperties messageProperties = message.getMessageProperties();
      try {
         log.info(message.toString());
         log.info(new String(message.getBody()));
         channel.basicAck(messageProperties.getDeliveryTag(), false);
      } catch(Exception e) {
         // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
         if(messageProperties.getRedelivered()) {
            log.info("消息已重复处理失败,拒绝再次接收...");
            // 拒绝消息Ò
            channel.basicReject(messageProperties.getDeliveryTag(), false);
         }
         else {
            log.info("消息即将再次返回队列处理...");
            channel.basicNack(messageProperties.getDeliveryTag(), false, true);
         }
      }
   }
}
package com.ldx.canaldemo.rabbitmq;

/**
 * @author ludangxin
 * @date 2022/6/14
 */
public interface MqConstant {
    String CANAL_EXCHANGE = "canal.exchange";
    String CANAL_QUEUE = "canal_queue";
    String CANAL_ROUTING_KEY = "canal_routing_key";
}

5.3 启动测试

先启动项目让程序自动建立所需mq中的交换机和队列

1759273-20220615001328969-1025827877.png
INSERT INTO `test`.`user`(`id`, `username`, `password`, `age`, `sex`) VALUES (8, 'zhangtieniu', '123456', 28, 1);

查看控制台输出如下

2022-06-14 14:42:04.818  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : (Body:'[B@189a76a(byte[414])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=4, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue])
2022-06-14 14:42:04.818  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : {"data":[{"id":"8","username":"zhangtieniu","password":"123456","age":"28","sex":"1"}],"database":"test","es":1655188924000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655188924822,"type":"INSERT"}
UPDATE `test`.`user` SET `username` = 'zhangsan', age = 23 WHERE id = 8;

查看控制台输出如下

2022-06-14 14:56:23.471  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : (Body:'[B@6a3a1f0(byte[446])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=5, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue])
2022-06-14 14:56:23.471  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : {"data":[{"id":"8","username":"zhangsan","password":"123456","age":"23","sex":"1"}],"database":"test","es":1655189783000,"id":7,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":[{"username":"zhangtieniu","age":"28"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655189783493,"type":"UPDATE"}
DELETE FROM `test`.`user` WHERE id = 8;

查看控制台输出如下

2022-06-14 14:57:06.407  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : (Body:'[B@628caa50(byte[411])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=6, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue])
2022-06-14 14:57:06.408  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : {"data":[{"id":"8","username":"zhangsan","password":"123456","age":"23","sex":"1"}],"database":"test","es":1655189826000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655189826419,"type":"DELETE"}

6. canal admin管理canal

详情查看:https://gitee.com/zhengqingya/docker-compose

使用手册:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide

原生安装:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

  1. 创建canal admin 数据库 canal_manager

  2. 运行初始化sqlLiunx/canal/canal_admin/canal_manager.sql

    文件内容如下:

    CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
    
    USE `canal_manager`;
    
    SET NAMES utf8;
    SET FOREIGN_KEY_CHECKS = 0;
    
    -- ----------------------------
    -- Table structure for canal_adapter_config
    -- ----------------------------
    DROP TABLE IF EXISTS `canal_adapter_config`;
    CREATE TABLE `canal_adapter_config` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `category` varchar(45) NOT NULL,
      `name` varchar(45) NOT NULL,
      `status` varchar(45) DEFAULT NULL,
      `content` text NOT NULL,
      `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Table structure for canal_cluster
    -- ----------------------------
    DROP TABLE IF EXISTS `canal_cluster`;
    CREATE TABLE `canal_cluster` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `name` varchar(63) NOT NULL,
      `zk_hosts` varchar(255) NOT NULL,
      `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Table structure for canal_config
    -- ----------------------------
    DROP TABLE IF EXISTS `canal_config`;
    CREATE TABLE `canal_config` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `cluster_id` bigint(20) DEFAULT NULL,
      `server_id` bigint(20) DEFAULT NULL,
      `name` varchar(45) NOT NULL,
      `status` varchar(45) DEFAULT NULL,
      `content` text NOT NULL,
      `content_md5` varchar(128) NOT NULL,
      `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`),
      UNIQUE KEY `sid_UNIQUE` (`server_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Table structure for canal_instance_config
    -- ----------------------------
    DROP TABLE IF EXISTS `canal_instance_config`;
    CREATE TABLE `canal_instance_config` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `cluster_id` bigint(20) DEFAULT NULL,
      `server_id` bigint(20) DEFAULT NULL,
      `name` varchar(45) NOT NULL,
      `status` varchar(45) DEFAULT NULL,
      `content` text NOT NULL,
      `content_md5` varchar(128) DEFAULT NULL,
      `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`),
      UNIQUE KEY `name_UNIQUE` (`name`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Table structure for canal_node_server
    -- ----------------------------
    DROP TABLE IF EXISTS `canal_node_server`;
    CREATE TABLE `canal_node_server` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `cluster_id` bigint(20) DEFAULT NULL,
      `name` varchar(63) NOT NULL,
      `ip` varchar(63) NOT NULL,
      `admin_port` int(11) DEFAULT NULL,
      `tcp_port` int(11) DEFAULT NULL,
      `metric_port` int(11) DEFAULT NULL,
      `status` varchar(45) DEFAULT NULL,
      `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Table structure for canal_user
    -- ----------------------------
    DROP TABLE IF EXISTS `canal_user`;
    CREATE TABLE `canal_user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `username` varchar(31) NOT NULL,
      `password` varchar(128) NOT NULL,
      `name` varchar(31) NOT NULL,
      `roles` varchar(31) NOT NULL,
      `introduction` varchar(255) DEFAULT NULL,
      `avatar` varchar(255) DEFAULT NULL,
      `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    SET FOREIGN_KEY_CHECKS = 1;
    
    -- ----------------------------
    -- Records of canal_user
    -- ----------------------------
    BEGIN;
    INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
    COMMIT;
    
    SET FOREIGN_KEY_CHECKS = 1;
    
  3. docker-compose.yaml如下

    version: '3'
    services:
      canal_admin:
        image: canal/canal-admin:v1.1.6
        container_name: canal_admin             
        restart: unless-stopped                 
        volumes: 
          - "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
        environment:
          TZ: Asia/Shanghai
          LANG: en_US.UTF-8
          canal.adminUser: admin
          canal.adminPasswd: 123456
          spring.datasource.address: my_mysql:3306
          spring.datasource.database: canal_manager
          spring.datasource.username: root
          spring.datasource.password: 123456
        ports:
          - "8089:8089"
        networks:
          - canal
          - mysql_mysql
      canal_server:
        image: canal/canal-server:v1.1.6
        container_name: canal_server              
        restart: unless-stopped                   
        volumes:                                  
          - "./canal/canal-server/logs:/home/admin/canal-server/logs"
        environment:                              
          TZ: Asia/Shanghai
          LANG: en_US.UTF-8
          canal.admin.manager: canal_admin:8089
          canal.admin.port: 11110
          canal.admin.user: admin
          canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
        ports:
          - "11110:11110"
          - "11111:11111"
          - "11112:11112"
        depends_on:
          - canal_admin
        links:
          - canal_admin
        networks:
          - canal
          - mysql_mysql
    networks:
      canal:
      mysql_mysql:
        external: true
    
  4. 启动服务/访问http://localhost:8089/#/canalServer/nodeServers

    登陆 用户名/密码:admin/123456

    1759273-20220615001350713-33575165.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK