23

StreamSets实时采集MySQL数据到HBase

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650229096&%3Bidx=1&%3Bsn=99253fe3eab7674406505abfbe3bb65a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击关注上方“ 知了小巷 ”,

设为“置顶或星标”,第一时间送达干货。

ju2e2iv.png!web

本地HBase环境

$ jps
4082 Jps
3556 NameNode
3813 QuorumPeerMain
3911 HMaster
3642 DataNode
3739 SecondaryNameNode
3999 HRegionServer

本地环境演示实例

mysql环境

$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About a minute (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

mysql版本:8.0.12

hbase环境版本:

Apache Hadoop:hadoop-3.1.1

Apache HBase:hbase-2.1.0

Apache Phoenix:apache-phoenix-5.0.0-HBase-2.0-bin

本地sdc环境

$ docker run --restart on-failure -p 18630:18630 -d --name streamsets-dc streamsets/datacollector
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cd2d89509457 streamsets/datacollector "/docker-entrypoint.…" 35 minutes ago Up 35 minutes 0.0.0.0:18630->18630/tcp streamsets-dc
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About an hour (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

虽然 docker很方便,但是在连接HBase时需要打通网络,由于本地HBase是localhost,访问有问题,所以放弃docker版sdc,改为本地解压版。

[zk: localhost:2181(CONNECTED) 4] get /hbase/master
�master:16000 }��PBUF


localhost�}�����.�}
cZxid = 0x1a725
ctime = Tue Jul 21 11:02:46 CST 2020
mZxid = 0x1a725
mtime = Tue Jul 21 11:02:46 CST 2020
pZxid = 0x1a725
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000069b14e0000
dataLength = 57
numChildren = 0

MJRnmeB.png!web

SDC本地解压版:

$ ./bin/streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Bypass activation because SDC contains only basic stage libraries.
Logging initialized @2901ms to org.eclipse.jetty.util.log.Slf4jLog
Running on URI : 'http://192.168.31.29:18630'

文档地址:

https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Getting_Started/GettingStarted_Title.html#concept_htw_ghg_jq

sdc支持的HBase版本下文测试环境演示实例中可以看到。

Phoenix与HBase服务的集成:

只需要将 Phoenix包解压后的phoenix-5.0.0-HBase-2.0-server.jar phoenix-core-5.0.0-HBase-2.0.jar两个jar包拷贝到hbase的lib目录下,修改hbase-site.xml,添加相关配置,重启hbase集群即可。

  <property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>

为了方便通过sqlline.py 访问 phoenix,将hbase-site.xml复制一份到phoenix的bin目录下。

$ ./bin/sqlline.py 
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
20/07/20 16:28:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BUCKETS | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 | |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
0: jdbc:phoenix:>

MySQL创建表user

CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '用户名',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

HBase创建命名空间和表

create_namespace 'ZLXX'
create 'ZLXX:USER', 'INFO'


hbase(main):001:0> create_namespace 'ZLXX'
Took 0.9431 seconds
hbase(main):002:0> create 'ZLXX:USER', 'INFO'
Created table ZLXX:USER
Took 1.4457 seconds
=> Hbase::Table - ZLXX:USER

Phoenix创建schema和表映射

create schema ZLXX;
create table ZLXX.USER (
id varchar primary key,
info.id varchar,
info.user_name varchar,
info.update_time varchar
) column_encoded_bytes=0;


0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BU |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null |
| | ZLXX | USER | TABLE | | | | | | false | null |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+

SDC创建pipeline流水线

3A326n.png!web

amaAFna.png!web

需要先安装JDBC和CDH的组件

qMRV7r.png!web

FJ3miaQ.png!web

选择Origin:JDBC Query Consumer和Destination HBase

FfaAzqr.png!web

简单完整pipeline如图

77nq2eU.png!web

上图中,直接从 UI管理界面上传SDC-MySQL JDBC驱动,根据提示重启SDC即可

上传成功后,可以在列表里面看到

22URruZ.png!web

MySQL版本是8.0.12,需要注意jar包版本

https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar

主要的数据源配置和数据目标系统配置

JDBC-MySQL:

3qiy6fz.png!web

HBase-CDH6.3.0:

UR7ZJfV.png!web

Validate成功之后,直接Start,运行后的界面:

FN3INva.png!web

简单演示一下,往 MySQL test.user表中插入和更新数据

INSERT INTO `test`.`user`(`id`, `user_name`, `update_time`) VALUES (2, 'ZLXX_INSERT', SYSDATE());

YNZf6bI.png!web

Record Count (since last startup)上面显示数量大于1是因为查询SQL里面的更新时间是大于等于OFFSET,而且每10秒扫描一次,因此会不断被扫描到,SQL里面加上了最近五分钟的限制,因此重复次数不会太多,如果是大于OFFSET,有可能会导致数据丢失。

eAneEzY.png!web

hbase端的数据

hbase(main):004:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388092263, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388092263, value=20200722111640
2 column=INFO:USER_NAME, timestamp=1595388092263, value=ZLXX_INSERT
2 row(s)
Took 0.0598 seconds

Phoenix端查询

0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+--------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+--------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_INSERT | 20200722111640 |
+-----+-----+--------------+-----------------+
2 rows selected (0.134 seconds)

MySQL test.user更新操作

UPDATE `test`.`user` SET `user_name` = 'ZLXX_IN_UPDATE', `update_time` = SYSDATE() WHERE `id` = 2;

Phoenix和HBase查询

0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+-----------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+-----------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_IN_UPDATE | 20200722113033 |
+-----+-----+-----------------+-----------------+
2 rows selected (0.128 seconds)
hbase(main):005:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388662927, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388662927, value=20200722113033
2 column=INFO:USER_NAME, timestamp=1595388662927, value=ZLXX_IN_UPDATE
2 row(s)
Took 0.0761 seconds

需要注意上面 Phoenix映射表的时候,两个ID字段,命名需要注意。

【本地环境 SDC实时采集MySQL数据到HBase并映射Phoenix表查询、END】

附CDH的HBase版本:

不同 CDH版本对应的hbase版本(重要)

YfiYbyv.png!web

YB3mAvz.png!web

往期推荐:

到底什么样的企业应该建设数据中台?

数据中台到底是不是大数据的下一站?

Phoenix Java API配置及使用总结

Phoenix表映射

Phoenix视图映射

Kafka消息送达语义说明

Kafka基础知识总结

Hadoop YARN:ApplicationMaster向ResourceManager注册AM源码调试

Apache Hadoop YARN:Client<-->ResourceManager源码解析

Apache Hadoop YARN:Client<-->ResourceManager源码DEBUG

Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

Hive企业级调优

HiveQL查询连续三天有销售记录的店铺

HiveQL实战蚂蚁森林低碳用户排名分析:解法一

HiveQL实战蚂蚁森林低碳用户排名分析:解法二

HiveQL实战蚂蚁森林植物申领统计分析

Hive-函数

Hive-查询

Hive-DML(Data Manipulation Language)数据操作语言

Hive-DDL(Data Definition Language)数据定义

Hive优化(整理版)

Spark Core之Shuffle解析

数据仓库开发规范

nIzeUzu.png!web

67Zzae3.gif

喜欢就分享-点赞-在看吧,谢谢~~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK