30

Canal binlog 日志 Dump 流程分析

 3 years ago
source link: https://mp.weixin.qq.com/s/48x0i6djc-9REJosn_VUBw
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击上方“中间件兴趣圈”,选择“设为星标”

做积极的人,越努力越幸运!

640?wx_fmt=png

Canal 的 dump 支持串行和并行模式两种模式,本篇重点梳理 dump 的核心流程,以便对 dump 过程有一个充分的了解,更好的理解 Canal 的实现原理与细节,下一篇中将重点关注Canal是如何引入并行模式来提高dump的性能,即并行编程相关的技巧。

从前面的文章我们得知 Canal binlog 日志解析的基本流程如下图所示:

640?wx_fmt=png

在这里插入图片描述
解析来重点梳理一下 dump 命令的发送逻辑,特别是日志的处理流程,一些基本的日志格式。

1、 dump 流程分析


在 Canal 中 dump 方法声明如下:

640?wx_fmt=png

带有参数 MultiStageCoprocessor 为并行处理模式,底层使用了disruptor 高性能并发框架,下文将重点关注学习。我们今天重点来看一下串行dump的实现,其方法声明如下:
640?wx_fmt=png在这里插入图片描述其方法参数说明如下:

  • String binlogfilename
      binlog 文件名称,例如  mysql-bin.000038。

  • Long binlogPosition
    在文件中的偏移量。

  • SinkFunction func
    每解析出一条binlog日志的处理函数。

接下来我们直奔主题,一起来看一下 MysqlConnection 关于 dump 的实现流程。

640?wx_fmt=png

MysqlConnection#dump
Step1:在发送dump之前先设置相关的参数。

  • set wait_timeout=9999999
    连接空闲超时时间,默认为8消息,用于 Canal Slave 的等待超时时间远大于默认值。

  • set net_write_timeout=1800
    网络写请求超时时间,针对正在进行数据读写的连接,该值默认为 60s。

  • set net_read_timeout=1800
    网络读请求超时时间,针对正在进行数据读写的连接,该值默认为 30s。

  • set names 'binary'
    设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化。

  • set @master_binlog_checksum= @@global.binlog_checksum
    设置master_binlog_checksum,因为在mysql5.6之后为binlog引入了checksum机制,例如crc32,canal作为mysql slave,需要与服务端相关参数保持一致。

  • set @slave_uuid=uuid()
    canal相对与mysql数据库服务而言就是一个从服务器,这个指令用于设置server_id,使用uuid,避免server_id重复。

  • SET @master_heartbeat_period=15
    设置客户端与服务端心跳发送间隔,默认为15s。

    640?wx_fmt=png
    MysqlConnection#dump

    Step2:从主库查询binlog checksum,具体向主库发送 select @@global.binlog_checksum 语句。

    640?wx_fmt=png
    MysqlConnection#dump

    Step3:向MySQL Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15。

    640?wx_fmt=png
    MysqlConnection#dump

    Step4:向 MySQL Master 发送 dump 请求,MySQL是基于请求与应答模式,发送请求命令后,就会向网络通道中写入响应请求。(在这里大家不妨先大概思考一下如何读取 dump 命令的返回值,这部分虽然涉及到网络相关的知识,我在这边会稍微简单提一下)。

    640?wx_fmt=pngMysqlConnection#dump

    Step5:构建 DirectLogFetcher对象,实现基于 socket 的日志拉取服务,并构建 LogDecoder 对象,用于解析 binlog 日志。

    640?wx_fmt=png
    MysqlConnection#dump

    Step6:使用 while 循环反复拉取消息,通过通过 LogDecoder 对二进制流进行解析,提取一条完整的binlog事件,交给 SinkFunction 去处理,并且如果开启了半同步机制,则需要向master发送ACK。既然是while循环,该方法的退出条件还是值得我们关注的:

  • fetch.fetch()方法返回 false

  • SinkFunction 的 sink 方法 false,SinkFunction的详细处理流程将在下文介绍,这里先告知返回false的情况是 binlog 日志解析线程已停止运行。

上面粗略的介绍了 dump 命令的几个核心关键步骤,要想详细掌握其实现细节,我们必须继续深入探讨如下几个问题:

  • DirectLogFetcher 内部工作机制

  • LogDecoder binlog 日志解析

  • 发送Dump底层网实现思路

2、DirectLogFetcher 内部工作机制


2.1 DirectLogFetcher 类图

640?wx_fmt=png

DirectLogFetcher的类继承体系如上图所示,我们来看一下其关键点:

  • LogBuffer
    日志buffer,主要定义如下属性:

  • byte[] buffer
    缓存区中数据容器。

  • int origin
    当前buffer中的读指针

  • int limit
    当前buffer的最大可读可写指针

  • int position
    当前buffer的写指针。

  • int semival
    是否需要发送ACK(用于半同步)。
    LogBuffer封装了字节相关的操作,不仅定义了上面的属性,也定义了字节读取相关众多API,其截图如下:

    640?wx_fmt=png
    在这里插入图片描述
  • LogFetcher binlog日志抓取抽象类,定义了如下关键属性与抽象方法。

    • int DEFAULT_INITIAL_CAPACITY
      LogBuffer中的初始容量,默认为8K。

    • float DEFAULT_GROWTH_FACTOR
      容量增长因子,默认为 2.0。

    • int   BIN_LOG_HEADER_SIZE
      binlog日志条目 header 的长度,固定为4字节。

    • float  factor
      增长因子。

    • public abstract boolean fetch()
      抓取binlog日志。

    • public abstract void close()
           关闭 Fetch。

  • DirectLogFetcher Canal LogFetcher模式实现类,其核心属性如下:

    • SocketChannel channel
      网络通道,用于发送dump请求的网络通道。

    • boolean issemi = false
      是否开启半同步。

2.2 fetch流程详解

接下来我们重点剖析 DirectLogFetcher 的 fetch 方法,来探究其实现原理。
在研究DirectLogFetcher的fetch方法之前,我们先重点跟踪一下其内部网络读写方法fetch0方法,该方法是具体与网络读写相关的实现。

640?wx_fmt=png

DirectLogFetcher#fetch0
在详细介绍该方法之前先来介绍一下其参数的含义:

  • int off
    从通道中读取到的内容放入到buffer中的起始位置

  • int len
    期望从通道中读取的字节长度。

该方法的实现关键点如下:

  • 首先先确保接收缓存区有足够的剩余空间,如果空间不足,则进行扩容。

  • 然后从通道中读取指定长度的字节。

接下来我们来重点看一下DirectLogFetcher的fetch的实现流程。

640?wx_fmt=png

DirectLogFetcher#fetch
Step1:尝试从网络通道中读取4个字节(即读取协议的头部),如果通道中还没有可读取内容,返回false,造成的效果是一次 dump 请求结束。

640?wx_fmt=png

DirectLogFetcher#fetch
Step2:从上文读到的4个字节分别读出该网络包的总长度以及当前包的序号,从这里可以看成MySQL协议头为4字节,前3个字节为网络包的总长度,第4个字节为包的序列号。再取出数据包的长度后,继续向通道中读取netlen个字节,即读取一个完整的数据包到buffer中。

640?wx_fmt=png

DirectLogFetcher#fetch
Step3:继续从数据包中读取一个字节,判断该包的状态码,是否是一个成功的响应,如果是错误的响应,会向外抛出一次,Canal 会记录dump命令执行错误的次数。

640?wx_fmt=png

DirectLogFetcher#fetch
Step4:如果一个包的长度为允许的最大包长度,则继续读取,这个主要是根据MySQL协议做的处理,即读取到一个数据包,然后返回true,表示拉取到一条日志,然后通过LogDecoder解码,然后传入到sink方法中,进行日志的后续处理。

640?wx_fmt=png

DirectLogFetcher#fetch
Step5:这一步的目的,就是将buffer中的当前指针指向数据的开始位置。这样一次 fetch就结束了。

从上面的流程来看,DirectLogFetcher#fetch 方法结束后,就将进入到LogDecoder中。经过一次DirectLogFetcher#fetch方法后,即取回一条binlog日志,即二进制流,接下来就根据binlog协议对其解析。本文暂不深入该方法,如果大家想深入数据库中间件方面,可以作为一个很好的示例,面向MySQL通信协议进行编程。

3、SinkFunction


通过 LogDecoder从中解析一个事件后,会调用SinkFunction的sink方法,如果该方法返回 false,一次dump请求将介绍,接下来我们看一下其sink方法。

640?wx_fmt=png

AbstractEventParser#start
该方法的实现比较简单,这里不打算继续深入,我们重点来看一下 Canal.Entry 的结构:

640?wx_fmt=png

在这里插入图片描述
这个结构是基于Canal做架构设计,解决顺序消费、数据不丢失一个重要参考依据,没解析一条事务,最终放入到环形缓存区,环形缓存区尽量以一个事务提交到Sink组件,其代码如下:

640?wx_fmt=png

在这里插入图片描述
这里主要有如下几个关键点:

  • 首先需要调用EventSink组件将解析出来的数据传入EventSink。

  • EventSink组件处理成功后,会提交解析位点。

原创不易,如果对你有所帮助请你为本文点个【在看】吧,这将是我写作更多优质文章的最强动力。

欢迎加入我的知识星球,一起交流源码,探讨架构,揭秘亿级订单的架构设计与实践经验,打造高质量的技术交流圈,为广大星友提供高质量问答服务,长按如下二维码加入。

640?wx_fmt=png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK