41

Spark JDBC 写 clickhouse 操作总结 - 简书

 4 years ago
source link: https://www.jianshu.com/p/43f78c8a025b?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spark JDBC 写 clickhouse 操作总结

0.2162019.08.15 11:58:35字数 743阅读 5,773

在中小业务数据规模上通过clickhouse进行数据分析很适合,维护简单操作方便,更主要的是快;接下来给大家分享下易企秀在from hive to clickhouse过程中的经验

clickhouse对hadoop生态并不友好,官方也没有提供spark connector直接用于读写操作,好在双方都支持jdbc; clickhouse支持两种jdbc驱动实现,一种是官方自带的8123端口的,另一种是来自第三方实现的驱动,9000端口基于tcp协议

jdbc:8123端口

这种方式是http协议实现的,整体性能差了很多 经常会出现超时的问题,且对数据压缩支持不好,因压缩速度跟不上写入速度,数据写入的过程中数据目录会快速膨胀 导致磁盘空间打满。

jdbc:9000端口

这种方式支持高性能写入,数据按列组织并压缩。

因spark jdbc的方式不支持在clickhouse中自动创建表结构,这里在插入前需要提前创建表
考虑到clickhouse中的数据维度会经常新增和缩减,表结构维护仍需自动化,我们用了一种取巧的方式,借助mysql进行桥接,因为spark jdbc方式支持在mysql自动创建表,同时clickhouse也支持create table from mysql 。

# clickhouse jdbc驱动使用1.7的版本
/data/work/spark-2/bin/spark-shell --name "to_ck_scene_model"    --master yarn --packages com.github.housepower:clickhouse-native-jdbc:1.7-stable --jars /data/work/spark-2/mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar  

#读取hive中数据并转化为 dataframe
var df=spark.sql("select * from "+tableName )

//在mysql中创建表
val prop = new java.util.Properties
prop.setProperty("user", mysqlUser)
prop.setProperty("password", mysqlPwd)
prop.setProperty("driver",mysqlDriver)
df.where("1=0").write.mode("Overwrite").jdbc(mysqlUrl,tableName, prop)

//通过mysql桥接 在clickhouse中创建表 操作后两边数据结构会一致
val connection = DriverManager.getConnection(ckUrl,"default","")
var pst=connection.createStatement()
pst.execute("drop table if exists "+ckTableN)
pst.execute("create table "+ckTableN+" ENGINE = MergeTree partition by ifNull(toYYYYMM("+partitionField+"),1970-01) order by "+orderFieldAndDefauV+" as  SELECT * FROM mysql('#:3306', 'bigdata', "+tableName+", '"+mysqlUser+"', '"+mysqlPwd+"')")

ckDriver = "com.github.housepower.jdbc.ClickHouseDriver"
var pro = new java.util.Properties
pro.put("driver",ckDriver)
#默认写入批次是2w,可以调大至5w
df.write.mode("append").option("batchsize", "50000").option("isolationLevel", "NONE").option("numPartitions", "1").jdbc(ckUrl,ckTableN,pro)

1、在mysql 中创建表时需注意,如果hive中存在一个以上的timestamp类型的字段时会创建失败,并报 Invalid default value for ‘update_time’ ,需要将字段先转成string类型写入mysql ,然后通过 alter table modify column 将string类型转成datetime就ok了

2、同时加载clickhouse与mysql的jdbc驱动可能会出现jar冲突的问题,出现 “Accept the id of response that is not recongnized by Server”的错误时,需先将clickhouse的驱动移除

val dv = DriverManager.getDriver(ckUrl)
DriverManager.deregisterDriver(dv)

当MySQL相关操作执行完毕后 ,再将clickhouse驱动重新注册一下

DriverManager.registerDriver(dv)

3、clickhouse不支持事务操作,需关闭事务 option("isolationLevel", "NONE") ,否则个别clickhouse的jdbc版本可能会报错

4、插入记录数偏大问题: 使用com.github.housepower:clickhouse-native-jdbc:1.6-stable版本的同学需要注意,这个版本对spark支持有问题,当单次插入数据小于默认batchsize时数据正常插入,当插入数据量超过一个batch时会出现数据不一致的问题,看了源码发现1.6版本执行完当前batch操作后未清除batch对象,导致后面数据一直在此基础上累加

5、如果觉得写入速度不够快,那么还可以通过调大num-executors或者增加batchsize;我们目前1亿数据写入用时不到20分钟


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK