

博文干货|Apache InLong 使用 Apache Pulsar 创建数据入库
source link: https://segmentfault.com/a/1190000041461176
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/
文章转自公众号:Apache InLong,原文地址:https://mp.weixin.qq.com/s/Wg...
Apache InLong 增加了通过 Apache Pulsar 接入数据的能力,充分利用了 Pulsar 不同于其它 MQ 的技术优势,为金融、计费等数据质量要求更高的数据接入场景,提供完整的解决方案。在下面的内容中,我们将通过一个完整的示例介绍如何通过 Apache InLong 使用 Apache Pulsar 接入数据。
Apache InLong(incubating)简介
Apache InLong(应龙 https://inlong.apache.org)是腾讯捐献给 Apache 社区的一站式数据流接入服务平台,提供自动、安全、可靠和高性能的数据传输能力,方便业务构建基于流式的数据分析、建模和应用。InLong 项目原名 TubeMQ ,专注于高性能、低成本的消息队列服务。为了进一步释放 TubeMQ 周边的生态能力,我们将项目升级为 InLong,专注打造一站式数据流接入服务平台。Apache InLong 以腾讯内部使用的 TDBank 为原型,依托万亿级别的数据接入和处理能力,整合了数据采集、汇聚、存储、分拣数据处理全流程,拥有简单易用、灵活扩展、稳定可靠等特性。
Apache InLong 服务于数据采集到落地的整个生命周期,按数据的不同阶段提供不同的处理模块,主要包括:
- inlong-agent,数据采集 Agent,支持从指定目录或文件读取常规日志、逐条上报。后续也将扩展 DB 采集、HTTP 上报等能力;
- inlong-dataproxy,一个基于 Flume-ng 的 Proxy 组件,支持数据发送阻塞和落盘重发,拥有将接收到的数据转发到不同 MQ(消息队列)的能力;
- inlong-tubemq,腾讯自研的消息队列服务,专注于大数据场景下海量数据的高性能存储和传输,在海量实践和低成本方面有着良好的核心优势;
- inlong-sort,对从不同的 MQ 消费到的数据进行 ETL 处理,然后汇聚并写入 Hive、ClickHouse、Hbase、Iceberg 等存储系统;
- inlong-manager,提供完整的数据服务管控能力,包括元数据、任务流、权限,OpenAPI 等;
- inlong-website,用于管理数据接入的前端页面,简化整个 InLong 管控平台的使用。
关于 Apache Pulsar
Apache Pulsar 是 Pub/Sub 模型的消息系统,并且从设计上做了存储和计算的分离。Apache Pulsar 计算与存储分离的架构,以及分片存储的设计为 Apache Pulsar 带来了相比于传统基于分区存储 MQ 的一些优势:
- Broker 和 Bookie 相互独立,方便实现独立的扩展以及独立的容错;
- Broker 无状态,便于快速上、下线,更加适合于云原生场景;
- 分区存储不受限于单个节点存储容量;
- 分区数据分布均匀。
- 安装 Apache Pulsar,版本 2.6+
- 安装 Apache Hive,版本 2.3+
安装 InLong
部署 InLong ,可以使用 Docker Compose 实现一键部署,也可以通过二进制文件在普通机器上部署。
区别于 InLong TubeMQ,如果使用 Apache Pulsar,需要在 Manager 组件安装中配置 Pulsar 集群信息,格式如下:
# Pulsar admin URL pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080 # Pulsar broker address pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650 # Default tenant of Pulsar pulsar.defaultTenant=public
创建数据接入
配置数据流 Group 信息
在创建数据接入时,数据流 Group 可选用的消息中间件选择 Pulsar,其它跟 Pulsar 相关的配置项还包括:
- Queue module:队列模型,并行或者顺序,选择并行时可设置 Topic 的分区数,顺序则为一个分区;
- Write quorum:消息写入的副本数;
- Ack quorum:确认写入 Bookies 的数量;
- retention time:已被 consumer 确认的消息被保存的时间;
- ttl:未被确认的消息的过期时间;
- retention size:已被 consumer 确认的消息被保存的大小。
配置数据流
配置消息来源时,文件数据源中的文件路径,可参照 inlong-agent 中 File Agent 的详细指引。
配置数据格式
配置 Hive 集群
保存 Hive 流向,点击“提交审批”。
数据接入审批
进入审批管理页面,点击我的审批,审批上面提交的接入申请,审批结束后会在 Pulsar 集群同步创建数据流需要的 Topic 和订阅。
我们可以在 Pulsar 集群使用命令行工具检查 Topic 是否创建成功:
配置文件 Agent
在配置文件 Agent 时,需要根据数据接入创建时指定的目录下创建文件:
touch /data/test_file.txt;
按照创建数据流时的数据源格式,向文件中写入数据(可以按格式写入更多数据):
echo -e "1|test\n2|test\n" >> /data/test_file.txt
数据落地检查
最后,我们登入 Hive 集群,通过 Hive 的 SQL 命令查看 test_stream
表中是否成功插入了数据。
如果出现数据未正确写入 Hive 集群,可以检查 Dataproxy 和 Sort 相关信息是否同步:
- 检查 Inlong-Dataproxy 的 conf/topics.properties 文件夹中是否正确写入该数据流对应的 Topic 信息
b_test_group/test_stream=persistent://public/b_test_group/test_stream
- 检查 InLong Sort 监听的 ZooKeeper 中是否成功推送了数据流的配置信息:
get /inlong_hive/dataflows/{{sink_id}}
关注公众号「Apache Pulsar」,获取更多技术干货
加入 Apache Pulsar 中文交流群 ??
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK