33

「从零单排canal 02」canal集群版 + admin控制台 最新搭建姿势(基于1.1.4版本)

 3 years ago
source link: http://www.cnblogs.com/awan-note/p/13089193.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据 订阅 和 消费。应该是阿里云DTS(Data Transfer Service)的开源版本,开源地址:

https://github.com/alibaba/canal。

canal从1.1.4版本开始引入了admin控制台,有了很多不一样的配置方式。在搭建过程中如果仅仅按照wiki的用户手册,还是容易踩很多坑的。因此,将笔者在搭建过程中的步骤记录下来,作为官方wiki的 补充,希望能有所帮助。

根据本文内容与搭建顺序 ,并搭配对应的官网文档链接,应该就能快速搭建完成了,enjoy~

1. 部署canal-admin

1)部署服务

官方文档地址:

https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

主要配置application.yml文件

server:
  port: 8089

spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: xxxx
  assword: xxxxx
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

这里需要注意,canal的adminPasswd并不是登陆admin的密码,登陆admin的密码是设置在对应的数据库中的,默认为123456。

另外,因为 Canal Admin 是一个管理系统,需要使用数据库存放配置信息,只用在 MySQL 中执行 Canal Admin 提供的数据库初始化文件即可,该文件在“conf/canal_manager.sql”路径下面。

2)登陆浏览器访问

上面的 Canal Admin 配置好了之后直接根据“/bin/startup.sh”启动 Canal Admin 即可,在浏览器上面输入 hostip:8089 即可进入到管理页面,如果使用的默认的配置信息,用户名入”admin”,密码输入”123456”即可访问首页。

进入到首页点击集群的菜单栏,然后选择新建集群。

在里面输入集群的名称以及 Zookeeper即可,这里的集群目前还没有任务节点,后续通过配置 Canal Server 的自动注册功能,便可以查看该集群下面拥有的节点。

创建集群后,需要先配置集群 主配置,载入模板即可。

如果没有载入这个模版,那么在canal-server执行 sh bin/startup.sh local 命令,读取canal_local.propeties配置启动时,会报错

1 Caused by: com.alibaba.otter.canal.common.CanalException: requestGet for canal config error: canal.properties config is empty

注意,在主配置中,还是需要记得加入zk的地址配置,跟上面配置集群名字的时候输入的zk无关 (那个zk地址不知道有啥用):

canal.zkServers = xxx.xx.xx.xx:2181,xxx.xx.xx.xx:2181,xxx.xx.xx.xx:2181

如果希望使用canal的集群模式(推荐使用),记得更改配置使用default-instance.xml,不要使用file-instance.xml配置,如下:

#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

2. 部署canal-server

注意,建议先部署好admin,再来部署canal-server,省事不少。

官方文档地址:

https://github.com/alibaba/canal/wiki/QuickStart

注意,不同于单机版的properties配置,使用admin后,各种配置通过admin全局管理,所以只需要配置canal_local.properties即可。canal_local.properties内容如下:

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = xxxxxxxxxxxxxx

# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

对各个参数说明一下:

  • canal.register.ip:用来指定当前 Canal Server 的 IP 信息,如果主机是多网卡,可以避免 IP 信息错乱的问题。
  • canal.admin.passwd:这里的密码就是之前配置 Canal Admin 里面配置的adminPasswd,只不过这里并不是明文展示,使用 MySQL 的”select password("admin")”语句查询处理过的密码,注意查询结果前面的”*”要去掉。
  • canal.admin.register.auto:这里是自动注册的意思,如果没有配置,Canal Server 启动后需要自行在 Canal Admin 上面添加。
  • canal.admin.register.cluster:这个配置如果不写代表当前的 Canal Server 是一个单机节点,如果添加的名字在 Canal Admin 上面没有提前注册,Canal Server 启动时会报错。

启动server,切记切记带上参数local,这样才会读取canal_local.properties的配置

sh bin/startup.sh local
启动完成后,可以在admin界面看到server的连接信息

i6JRjm6.jpg!web

同时,登陆zk,查看是否已经注册成功。

YVnyaaM.jpg!web

3. 配置canal-server

采用admin的集群模式后,集群内的canal-server通过zk做HA,因此,canal-server的配置也只能通过集群做全局配置。

就是前面的集群配置-主配置进入。

eyiMJvE.jpg!web

另外,canal作为一个增量数据抓取模块,抓到变更信息后需要投递。

通过canal.sererMode配置

这里也限制了,一个集群内的canal只能支持一种投递模式。

我们目前暂时以投递RocketMQ为例进行配置。

官方文档地址:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

最终配置文件的配置如下:

#################################################
#########         common argument        #############
#################################################

# tcp bind ip
canal.ip =

# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112

# canal instance user/passwd
canal.user = xxxx
canal.passwd = xxxxxxxxxxxxxxxxxxx

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = xxxx
canal.admin.passwd = xxxxxxxxxxxxxx
canal.zkServers = xx.xx.xx.xx:xxxx,xx.xx.xx.xxx:xxxx,xx.xx.xx.xx:xxxx

# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false

# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ

# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000

## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384

## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024

## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false

#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024

# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
# 这里可以对订阅的消息做过滤
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true

## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal

# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24

# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########         destinations        #############
#################################################
canal.destinations =

# conf root dir
canal.conf.dir = ../conf

# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}

#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
canal.mq.servers = xx.xx.xx.xx:xxxx,xx.xx.xx.xxx:xxxx,xx.xx.xx.xx:xxxx
canal.mq.retries = 0
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

#canal.mq.properties. =
canal.mq.producerGroup = test_canal_cluster

# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

# aliyun mq namespace

#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

4. 配置canal instance(以投递MQ为例)

Canal Admin 提供了 Canal Instance 的管理功能。

我们尝试通过 UI 界面添加需要监听的数据库,让该 Instance 消费 binlog 并将事件发送到 MQ。

  • 点击“新建 Instance”按钮创建 Instance
  • 点击“载入模板”,进行配置修改。

主要修改以下配置:

  • ”canal.instance.mysql.slaveId”:目前1.1.4版本已经不需要配置,系统自动生成
  • ”canal.instance.master.address”:配置你的数据库地址
  • canal.instance.dbUsername:数据库用户名
  • canal.instance.dbPassword:数据库密码
  • canal.mq.topic:mq的topic
  • canal.instance.filter.regex=订阅的库表名单(例如:dbvtest\\..*),具体参考https://github.com/alibaba/canal/wiki/AdminGuide

保存,启动,观察日志,没有报错即可。

同时可以从 canal-server的 操作-详情 进入,查看可以看到正在运行的instance

这里需要注意,如果没有正确注册到zk,那么每个canal-server都会有一个正在运行的instance,会导致变更投递多次。

如果正确注册了zk,同一个集群下,每个instance应该只在某一个cannal-server中运行。

可以查看zk路径

/otter/canal/destinations/{xxxx}/running
xxxx就是instance的 名称,running节点表示它运行在哪个server上。

另外,目前对binlog的订阅支持gtid模式和position模式,通过以下参数设置

# enable gtid use true/false
canal.instance.gtidon=false

在demo过程中,发现高可用模式下,gtid无法正常更新到zk,而position模式使用正常。

gtid模式在canal高可用模式下可能存在bug(参考issue:

https://github.com/alibaba/canal/issues/2616),暂时不建议使用。

5. 演练订阅

在rds执行以下sql

UPDATE `album` SET `attribute`='5' WHERE `albumid`='1';

在MQ中的消息格式如下:

{

    "data":[

        {
            "albumid":"1",
            "picid":"1234",
            "attribute":"5",
            "lastmodified":"2020-05-15 18:13:35",
            "created":"2019-09-04 18:18:51"
        }
    ],
    "database":"dbvtest",
    "es":1589537615000,
    "id":75,
    "isDdl":false,
    "mysqlType":{
        "albumid":"int(10) unsigned",
        "picid":"int(10) unsigned",
        "attribute":"varchar(200)",
        "lastmodified":"timestamp",
        "created":"timestamp"
    },
    "old":[
        {
            "attribute":"2",
            "lastmodified":"2020-05-15 17:42:57"
        }
    ],
    "pkNames":[
        "albumid"
    ],
    "sql":"",
    "sqlType":{
        "albumid":4,
        "picid":4,
        "attribute":12,
        "lastmodified":93,
        "created":93
    },
    "table":"album",
    "ts":1589537615877,
    "type":"UPDATE"
}

6. 监控告警

官方文档地址:

https://github.com/alibaba/canal/wiki/Prometheus-QuickStart

如果你已经有一套prometheus的监控体系,我们只需要导入模板(

canal/conf/metrics/Canal_instances_tmpl.json)即可,完美~

都看到最后了,原创不易,点个关注,点个赞吧~

知识碎片重新梳理,构建Java知识图谱: github.com/saigu/JavaK… (历史文章查阅非常方便)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK