canal-1.1.5实时同步MySQL数据到Elasticsearch - SportSky
source link: https://www.cnblogs.com/sportsky/p/16536264.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
一、环境准备
1、jdk 8+
2、mysql 5.7+
3、Elasticsearch 7+
4、kibana 7+
5、canal.adapter 1.1.5
一、创建数据库CanalDb和表UserInfo
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for UserInfo -- ---------------------------- DROP TABLE IF EXISTS `UserInfo`; CREATE TABLE `UserInfo` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `phone` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
二、kibana创建索引
PUT canal_product { "mappings": { "properties": { "user_name": { "type": "text" }, "phone": { "type": "text" }, "age": { "type": "integer" } } } }
三、下载安装canal.adapter
github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5
额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)
分别解压缩后,将v1.1.5-alpha-2解压缩文件夹下plugin文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,并重命名,再将该jar赋予权限 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar
1、解压并修改配置文件 conf/application.yml
只需要修改特定的几处即可,关于各节点说明可参考官方说明:https://help.aliyun.com/document_detail/135297.html
srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false username: canal password: canal
- name: es7 hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode properties: mode: rest #transport or rest security.auth: es:22222 # only used for rest mode cluster.name: elasticsearch # es集群节点名称
2、启动服务
# 启动服务 ./bin/startup.sh
3、查看日志是否启动成功
cat logs/adapter/adapter.log
4、实时同步
向数据库中插入一条数据
INSERT INTO `CanalDb`.`UserInfo`( `user_name`, `phone`, `age`) VALUES ('张三', '10086', 99);
kibana查看索引数据
GET canal_product/_search
5、全量同步,修改conf/es7/mytest_user.yml配置文件,或者新建一个yml文件也可
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example # canal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: canal_product # es 的索引名称 _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 sql: "SELECT p.id as _id, p.user_name, p.phone, p.age FROM UserInfo p " # sql映射 etlCondition: "where p.id>={}" #etl的条件参数 commitBatch: 3000 # 提交批大小
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK