学习 Druid(三):Kafka 数据摄入

 4 years ago
source link: https://www.tuicool.com/articles/YnUry2N
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Druid 支持两种 Kafka 数据摄入方式:

  • Push 通过 Tranquility;
  • Pull 通过 Kafka Indexing Service。



Kafka Indexing Service

1. 编辑配置文件

编辑 Overload 和 MiddleManager 的 conf/druid/cluster/_common/common.runtime.properties 配置文件。

加载 Kafka Indexing Service 扩展:


2. 编写 Supervisor 说明文件


  "type": "kafka",
  "dataSchema": {
    "dataSource": "metrics-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        "dimensionsSpec": {
          "dimensions": [],
          "dimensionExclusions": [
    "metricsSpec": [
        "name": "count",
        "type": "count"
        "name": "value_sum",
        "fieldName": "value",
        "type": "doubleSum"
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE"
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  "ioConfig": {
    "topic": "metrics",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"

Supervisor 配置:

  • type kafka ,必填;
  • dataSchema 指定摄入数据的 schema,必填;
  • ioConfig Supervisor 和 Indexer 任务配置,必填;
  • tuningConfig Supervisor 和 Indexer 任务调优配置,选填。

2.1 DataSchema

Druid 的 Datasource 可以作为关系型数据库中的 Table。

Druid 的 Schema 符合时序库的数据结构,包括:

  • 时间戳(Timestamp) 描述事件发生的时间;
  • 维度(Dimension) 描述事件的属性;
  • 指标(Metric) 描述事件的值;


数据在 Datasource 中按时间戳进行分区,每一个分区被称为 Chunk,Chunk 中每一个文件被称为 Segment。


2.2 IOConfig

配置项 topic,Kafka Topic,必填;

配置项 consumerProperties,Kafka Consumer 配置项,必填;

配置项 replicas,副本集数;

配置项 taskCount,每个副本集任务数。

2.3 TuningConfig


3. 提交 Supervisor

Druid 提供两种方法提交 Supervisor:

  1. UI ,在 Tasks 菜单,点击 Submit Supervisor 按钮;
  2. REST APIcurl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor

Trouble Shooting

索引 Task 报错 Unrecognized VM option 'ExitOnOutOfMemoryError'


Unrecognized VM option 'ExitOnOutOfMemoryError'  
Did you mean 'OnOutOfMemoryError=<value>'?  
Error: Could not create the Java Virtual Machine.  
Error: A fatal exception has occurred. Program will exit.


导致该问题的原因是主机 JVM 不支持 ExitOnOutOfMemoryError 参数。

第一种方法,升级至 JDK 1.8 8u92 以上版本(推荐!!!)。

第二种方法,编辑 conf/druid/cluster/data/middleManager/runtime.properties 文件,找到 druid.indexer.runner.javaOpts 配置项:

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

移除其中的 -XX:+ExitOnOutOfMemoryError JVM 配置项。


About Joyk

Aggregate valuable and interesting links.
Joyk means Joy of geeK