StreamSets实时采集MySQL数据到HBase
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650229096&%3Bidx=1&%3Bsn=99253fe3eab7674406505abfbe3bb65a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
点击关注上方“ 知了小巷 ”,
设为“置顶或星标”,第一时间送达干货。
本地HBase环境
$ jps
4082 Jps
3556 NameNode
3813 QuorumPeerMain
3911 HMaster
3642 DataNode
3739 SecondaryNameNode
3999 HRegionServer
本地环境演示实例
mysql环境
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About a minute (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
mysql版本:8.0.12
hbase环境版本:
Apache Hadoop:hadoop-3.1.1
Apache HBase:hbase-2.1.0
Apache Phoenix:apache-phoenix-5.0.0-HBase-2.0-bin
本地sdc环境
$ docker run --restart on-failure -p 18630:18630 -d --name streamsets-dc streamsets/datacollector
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cd2d89509457 streamsets/datacollector "/docker-entrypoint.…" 35 minutes ago Up 35 minutes 0.0.0.0:18630->18630/tcp streamsets-dc
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About an hour (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
虽然 docker很方便,但是在连接HBase时需要打通网络,由于本地HBase是localhost,访问有问题,所以放弃docker版sdc,改为本地解压版。
[zk: localhost:2181(CONNECTED) 4] get /hbase/master
�master:16000 }��PBUF
localhost�}�����.�}
cZxid = 0x1a725
ctime = Tue Jul 21 11:02:46 CST 2020
mZxid = 0x1a725
mtime = Tue Jul 21 11:02:46 CST 2020
pZxid = 0x1a725
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000069b14e0000
dataLength = 57
numChildren = 0
SDC本地解压版:
$ ./bin/streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Bypass activation because SDC contains only basic stage libraries.
Logging initialized @2901ms to org.eclipse.jetty.util.log.Slf4jLog
Running on URI : 'http://192.168.31.29:18630'
文档地址:
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Getting_Started/GettingStarted_Title.html#concept_htw_ghg_jq
sdc支持的HBase版本下文测试环境演示实例中可以看到。
Phoenix与HBase服务的集成:
只需要将 Phoenix包解压后的phoenix-5.0.0-HBase-2.0-server.jar 和 phoenix-core-5.0.0-HBase-2.0.jar两个jar包拷贝到hbase的lib目录下,修改hbase-site.xml,添加相关配置,重启hbase集群即可。
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
为了方便通过sqlline.py 访问 phoenix,将hbase-site.xml复制一份到phoenix的bin目录下。
$ ./bin/sqlline.py
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
20/07/20 16:28:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BUCKETS | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 | |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
0: jdbc:phoenix:>
MySQL创建表user
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '用户名',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
HBase创建命名空间和表
create_namespace 'ZLXX'
create 'ZLXX:USER', 'INFO'
hbase(main):001:0> create_namespace 'ZLXX'
Took 0.9431 seconds
hbase(main):002:0> create 'ZLXX:USER', 'INFO'
Created table ZLXX:USER
Took 1.4457 seconds
=> Hbase::Table - ZLXX:USER
Phoenix创建schema和表映射
create schema ZLXX;
create table ZLXX.USER (
id varchar primary key,
info.id varchar,
info.user_name varchar,
info.update_time varchar
) column_encoded_bytes=0;
0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BU |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null |
| | ZLXX | USER | TABLE | | | | | | false | null |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
SDC创建pipeline流水线
需要先安装JDBC和CDH的组件
选择Origin:JDBC Query Consumer和Destination HBase
简单完整pipeline如图
上图中,直接从 UI管理界面上传SDC-MySQL JDBC驱动,根据提示重启SDC即可
上传成功后,可以在列表里面看到
MySQL版本是8.0.12,需要注意jar包版本
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar
主要的数据源配置和数据目标系统配置
JDBC-MySQL:
HBase-CDH6.3.0:
Validate成功之后,直接Start,运行后的界面:
简单演示一下,往 MySQL test.user表中插入和更新数据
INSERT INTO `test`.`user`(`id`, `user_name`, `update_time`) VALUES (2, 'ZLXX_INSERT', SYSDATE());
Record Count (since last startup)上面显示数量大于1是因为查询SQL里面的更新时间是大于等于OFFSET,而且每10秒扫描一次,因此会不断被扫描到,SQL里面加上了最近五分钟的限制,因此重复次数不会太多,如果是大于OFFSET,有可能会导致数据丢失。
hbase端的数据
hbase(main):004:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388092263, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388092263, value=20200722111640
2 column=INFO:USER_NAME, timestamp=1595388092263, value=ZLXX_INSERT
2 row(s)
Took 0.0598 seconds
Phoenix端查询
0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+--------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+--------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_INSERT | 20200722111640 |
+-----+-----+--------------+-----------------+
2 rows selected (0.134 seconds)
MySQL test.user更新操作
UPDATE `test`.`user` SET `user_name` = 'ZLXX_IN_UPDATE', `update_time` = SYSDATE() WHERE `id` = 2;
Phoenix和HBase查询
0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+-----------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+-----------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_IN_UPDATE | 20200722113033 |
+-----+-----+-----------------+-----------------+
2 rows selected (0.128 seconds)
hbase(main):005:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388662927, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388662927, value=20200722113033
2 column=INFO:USER_NAME, timestamp=1595388662927, value=ZLXX_IN_UPDATE
2 row(s)
Took 0.0761 seconds
需要注意上面 Phoenix映射表的时候,两个ID字段,命名需要注意。
【本地环境 SDC实时采集MySQL数据到HBase并映射Phoenix表查询、END】
附CDH的HBase版本:
不同 CDH版本对应的hbase版本(重要)
往期推荐:
Kafka消息送达语义说明
Hadoop YARN:ApplicationMaster向ResourceManager注册AM源码调试
Apache Hadoop YARN:Client<-->ResourceManager源码解析
Apache Hadoop YARN:Client<-->ResourceManager源码DEBUG
Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析
Hive-DML(Data Manipulation Language)数据操作语言
Hive-DDL(Data Definition Language)数据定义
喜欢就分享-点赞-在看吧,谢谢~~
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK