3

canal-1.1.5实时同步MySQL数据到Elasticsearch - SportSky

 1 year ago
source link: https://www.cnblogs.com/sportsky/p/16536264.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、环境准备

1、jdk 8+

2、mysql 5.7+

3、Elasticsearch 7+

4、kibana 7+

5、canal.adapter 1.1.5 

一、创建数据库CanalDb和表UserInfo

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for UserInfo
-- ----------------------------
DROP TABLE IF EXISTS `UserInfo`;
CREATE TABLE `UserInfo`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `phone` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;
520237-20220730234344884-54382551.png

二、kibana创建索引

PUT canal_product 
{
  "mappings": {
    "properties": {
      "user_name": {
        "type": "text"
      },
      "phone": {
        "type": "text"
      },
      "age": {
        "type": "integer"
      }
    }
  }
}
520237-20220730234815675-1461531000.png

三、下载安装canal.adapter

github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5

额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)

分别解压缩后,将v1.1.5-alpha-2解压缩文件夹下plugin文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,并重命名,再将该jar赋予权限 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar

520237-20220730235123781-510628456.png

1、解压并修改配置文件 conf/application.yml

只需要修改特定的几处即可,关于各节点说明可参考官方说明:https://help.aliyun.com/document_detail/135297.html

srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false
      username: canal
      password: canal
- name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode 
        properties:
          mode: rest #transport  or rest 
          security.auth: es:22222 #  only used for rest mode
          cluster.name: elasticsearch  # es集群节点名称
520237-20220731000423608-1313018258.png

 2、启动服务

# 启动服务
./bin/startup.sh

3、查看日志是否启动成功

cat logs/adapter/adapter.log
520237-20220731001909262-1568770330.png

 4、实时同步

向数据库中插入一条数据

INSERT INTO `CanalDb`.`UserInfo`( `user_name`, `phone`, `age`) VALUES ('张三', '10086', 99);
520237-20220731002124256-349839232.png

kibana查看索引数据

GET canal_product/_search
520237-20220731002431411-1344734140.png

5、全量同步,修改conf/es7/mytest_user.yml配置文件,或者新建一个yml文件也可

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: canal_product # es 的索引名称
  _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         p.id as _id,
         p.user_name,
         p.phone,
         p.age
        FROM
         UserInfo p "        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小
520237-20220731002651069-674428221.png
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml

520237-20220731002928117-2133709605.png

520237-20220731003134747-2134433605.png

 学习链接:https://help.aliyun.com/document_detail/135297.html

                   https://blog.csdn.net/zh1998wx/article/details/123101442?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-123101442-blog-125808233.pc_relevant_aa&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-123101442-blog-125808233.pc_relevant_aa&utm_relevant_index=1


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK