3

Flink同步Kafka数据到ClickHouse分布式表

 1 year ago
source link: https://blog.51cto.com/ikeguang/5915948
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Flink同步Kafka数据到ClickHouse分布式表

精选 原创

大数据技术派 2022-12-06 14:50:44 博主文章分类:大数据 ©著作权

文章标签 建表 数据 bc 文章分类 Hadoop 大数据 技术人为什么要写博客? 阅读数205

我的gitee地址: https://gitee.com/ddxygq/bigdata-technical-pai

业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse

什么是ClickHouse?

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。

列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档。

Flink同步Kafka数据到ClickHouse分布式表_建表
Flink同步Kafka数据到ClickHouse分布式表_数据_02

我们使用Flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。

这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。

为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?

  • 常见原因1:

建表流程存在问题。ClickHouse的分布式集群搭建并没有原生的分布式DDL语义。如果您在自建ClickHouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的Server上创建了。下次连接重置换一个Server,您就看不到这个表了。

解决方案:
建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。
Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple();
在test表上再创建一个分布式表引擎,建表语句如下。
Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));

  • 常见原因2:

ReplicatedMergeTree存储表配置有问题。ReplicatedMergeTree表引擎是对应MergeTree表引擎的主备同步增强版,在单副本实例上限定只能创建MergeTree表引擎,在双副本实例上只能创建ReplicatedMergeTree表引擎。

解决方案:
在双副本实例上建表时,请使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。

这里引出了复制表的概念,这里介绍一下,只有 MergeTree 系列里的表可支持副本:

  • ReplicatedMergeTree
  • ReplicatedSummingMergeTree
  • ReplicatedReplacingMergeTree
  • ReplicatedAggregatingMergeTree
    ReplicatedCollapsingMergeTree
  • ReplicatedVersionedCollapsingMergeTree
  • ReplicatedGraphiteMergeTree

副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。

创建复制表

先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 Replicated 前缀。例如:ReplicatedMergeTree。

  1. 首先创建一个分布式数据库
create database test on cluster default_cluster;
  1. 创建本地表

由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:

CREATE TABLE test.test_data_shade on cluster default_cluster
(
    `data` Map(String, String),
    `uid` String,
    `remote_addr` String,
    `time` Datetime64,
    `status` Int32,
    ...其它字段省略
    `dt` String
)
ENGINE = ReplicatedMergeTree()
partition by dt
order by (dt, sipHash64(uid));

这里表引擎为ReplicatedMergeTree,即有副本的表,根据dt按天分区,提升查询效率,sipHash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。

  1. 创建分布式表
CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));

在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表写入或读取数据,Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

通过jdbc写入

这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。

Flink同步Kafka数据到ClickHouse分布式表_建表_03

这里贴一下我的Pom依赖

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.1-patch</version>
    <classifier>shaded</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。

Flink同步Kafka数据到ClickHouse分布式表_数据_04

连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。

这里官方推荐的是连接驱动是0.3.2:

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2-patch11</version>
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

Flink同步Kafka数据到ClickHouse分布式表_建表_05

官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:

 https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK