45

Spark JDBC 写 clickhouse 操作总结 - 简书

 5 years ago
source link: https://www.jianshu.com/p/43f78c8a025b?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Spark JDBC 写 clickhouse 操作总结

0.2162019.08.15 11:58:35字数 743阅读 5,773

在中小业务数据规模上通过clickhouse进行数据分析很适合,维护简单操作方便,更主要的是快;接下来给大家分享下易企秀在from hive to clickhouse过程中的经验

clickhouse对hadoop生态并不友好,官方也没有提供spark connector直接用于读写操作,好在双方都支持jdbc; clickhouse支持两种jdbc驱动实现,一种是官方自带的8123端口的,另一种是来自第三方实现的驱动,9000端口基于tcp协议

jdbc:8123端口

这种方式是http协议实现的,整体性能差了很多 经常会出现超时的问题,且对数据压缩支持不好,因压缩速度跟不上写入速度,数据写入的过程中数据目录会快速膨胀 导致磁盘空间打满。

jdbc:9000端口

这种方式支持高性能写入,数据按列组织并压缩。

因spark jdbc的方式不支持在clickhouse中自动创建表结构,这里在插入前需要提前创建表
考虑到clickhouse中的数据维度会经常新增和缩减,表结构维护仍需自动化,我们用了一种取巧的方式,借助mysql进行桥接,因为spark jdbc方式支持在mysql自动创建表,同时clickhouse也支持create table from mysql 。

# clickhouse jdbc驱动使用1.7的版本
/data/work/spark-2/bin/spark-shell --name "to_ck_scene_model"    --master yarn --packages com.github.housepower:clickhouse-native-jdbc:1.7-stable --jars /data/work/spark-2/mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar  

#读取hive中数据并转化为 dataframe
var df=spark.sql("select * from "+tableName )

//在mysql中创建表
val prop = new java.util.Properties
prop.setProperty("user", mysqlUser)
prop.setProperty("password", mysqlPwd)
prop.setProperty("driver",mysqlDriver)
df.where("1=0").write.mode("Overwrite").jdbc(mysqlUrl,tableName, prop)

//通过mysql桥接 在clickhouse中创建表 操作后两边数据结构会一致
val connection = DriverManager.getConnection(ckUrl,"default","")
var pst=connection.createStatement()
pst.execute("drop table if exists "+ckTableN)
pst.execute("create table "+ckTableN+" ENGINE = MergeTree partition by ifNull(toYYYYMM("+partitionField+"),1970-01) order by "+orderFieldAndDefauV+" as  SELECT * FROM mysql('#:3306', 'bigdata', "+tableName+", '"+mysqlUser+"', '"+mysqlPwd+"')")

ckDriver = "com.github.housepower.jdbc.ClickHouseDriver"
var pro = new java.util.Properties
pro.put("driver",ckDriver)
#默认写入批次是2w,可以调大至5w
df.write.mode("append").option("batchsize", "50000").option("isolationLevel", "NONE").option("numPartitions", "1").jdbc(ckUrl,ckTableN,pro)

1、在mysql 中创建表时需注意,如果hive中存在一个以上的timestamp类型的字段时会创建失败,并报 Invalid default value for ‘update_time’ ,需要将字段先转成string类型写入mysql ,然后通过 alter table modify column 将string类型转成datetime就ok了

2、同时加载clickhouse与mysql的jdbc驱动可能会出现jar冲突的问题,出现 “Accept the id of response that is not recongnized by Server”的错误时,需先将clickhouse的驱动移除

val dv = DriverManager.getDriver(ckUrl)
DriverManager.deregisterDriver(dv)

当MySQL相关操作执行完毕后 ,再将clickhouse驱动重新注册一下

DriverManager.registerDriver(dv)

3、clickhouse不支持事务操作,需关闭事务 option("isolationLevel", "NONE") ,否则个别clickhouse的jdbc版本可能会报错

4、插入记录数偏大问题: 使用com.github.housepower:clickhouse-native-jdbc:1.6-stable版本的同学需要注意,这个版本对spark支持有问题,当单次插入数据小于默认batchsize时数据正常插入,当插入数据量超过一个batch时会出现数据不一致的问题,看了源码发现1.6版本执行完当前batch操作后未清除batch对象,导致后面数据一直在此基础上累加

5、如果觉得写入速度不够快,那么还可以通过调大num-executors或者增加batchsize;我们目前1亿数据写入用时不到20分钟


Recommend

  • 98
    • www.jianshu.com 7 years ago
    • Cache

    LevelDB:读操作 - 简书

    前面写了两篇文章介绍 LevelDB 的整体架构和接口使用。这篇文章,我们从代码的角度看看 LevelDB 的设计与实现,先从读操作开始。 LevelDB 的版本更新不是很频繁,整体变化不大。本文的源代码参考和索引的版本是 LevelDB v1.20。 LevelDB 的目录结构很简单,就不用...

  • 52
    • www.jianshu.com 5 years ago
    • Cache

    ClickHouse 高阶函数 - 简书

    ClickHouse 高阶函数0.6262019.09.25 14:48:47字数 692阅读 6,212 先来一个完整的例子,该示例根据行为日志计算用户访问的top路径 select data, count(1) cn from ( w...

  • 9
    • segmentfault.com 3 years ago
    • Cache

    Spark+ES+ClickHouse 构建DMP用户画像

    download:Spark+ES+ClickHouse 构建DMP用户画像include<iostream.h>typedef int Status;typedef char Cstack;defi...

  • 5

    安装clickhouse与使用 docker pull yandex/clickhouse-server 如果pull出现 docker pull yandex/clickhouse-server Using default tag: latest Error response from daemon: Get

  • 9

    微服务治理实战:Hadoop和Spark,都不及Clickhouse香 方勇 2022-01-28 10:28:16

  • 9

    目前来说,网上有很多相关的资料证明ClickHouse数据库查询响应速度比MySQL快上一百到几百倍。实际上,ClickHouse和MySQL具有不同的应用场景和局限性,最近在研究这个ClickHouse打算应用于大...

  • 7
    • owenyk.github.io 2 years ago
    • Cache

    Debian11用Python3和JDBC操作ACCESS

    Debian11用Python3和JDBC操作ACCESS | K's Life

  • 4

    Distributed database access with Spark and JDBC 10 Feb 2022 by dzlab

  • 11

    JDBC 操作 SQL Server 时如何传入列表参数 2023-08-09 | 阅读(4) 本文是作为将要对 PostgreSQL 的 in, any() 操作的一个铺垫,也是对先前用 JDBC 操作 SQL Server 的温习。以此记录一下用 JDBC 查询 S...

  • 9
    • blog.51cto.com 1 year ago
    • Cache

    JDBC 批量操作 in 的使用

    JDBC 批量操作 in 的使用 精选 原创  我们经常会有这种业务需求,根据一个条件集合去查询一张表的数据,比如:

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK