13

Flink消费Kafka到HDFS实现及详解

 4 years ago
source link: http://www.cnblogs.com/smartloli/p/12499142.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1.概述

最近有同学留言咨询,Flink消费Kafka的一些问题,今天笔者将用一个小案例来为大家介绍如何将Kafka中的数据,通过Flink任务来消费并存储到HDFS上。

2.内容

这里举个消费Kafka的数据的场景。比如,电商平台、游戏平台产生的用户数据,入库到Kafka中的Topic进行存储,然后采用Flink去实时消费积累到HDFS上,积累后的数据可以构建数据仓库(如Hive)做数据分析,或是用于数据训练(算法模型)。如下图所示:

666745-20200315171521436-1201776900.png

2.1 环境依赖

整个流程,需要依赖的组件有Kafka、Flink、Hadoop。由于Flink提交需要依赖Hadoop的计算资源和存储资源,所以Hadoop的YARN和HDFS均需要启动。各个组件版本如下:

组件 版本 Kafka 2.4.0 Flink 1.10.0 Hadoop 2.10.0

2.2 代码实现

Flink消费Kafka集群中的数据,需要依赖Flink包,依赖如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.12</artifactId>
    <version>${flink.connector.version}</version>
 </dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>${flink.kafka.version}</version>
 </dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>${flink.streaming.version}</version>
 </dependency>

编写消费Topic的Flink代码,这里不对Topic中的数据做逻辑处理,直接消费并存储到HDFS上。代码如下:

/**
 * Flink consumer topic data and store into hdfs.
 * 
 * @author smartloli.
 *
 *         Created by Mar 15, 2020
 */
public class Kafka2Hdfs {

    private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class);

    public static void main(String[] args) {
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));

        // Storage into hdfs
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);

        sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd"));

        sink.setBatchSize(1024 * 1024 * 1024); // this is 1GB
        sink.setBatchRolloverInterval(1000 * 60 * 60); // one hour producer a file into hdfs
        transction.addSink(sink);

        env.execute("Kafka2Hdfs");
    }

    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

}

2.3 注意事项

  • 存储到HDFS时,不用添加其他HDFS依赖,只需要Flink采用yarn-cluster模式提交即可;
  • 采用FSDataOutputStream写入时,会先写入缓冲区,放在内存中;
  • Flink每次做Checkpoint的时候,会Flush缓冲区的数据,以及将Pending(已经完成的文件,但为被Checkpoint记录,可以通过sink.setPendingSuffix("xxx")来设置)结尾的文件记录下来
  • Flink每60秒(可以通过sink.setInactiveBucketCheckInterval(60 * 1000)来进行设置)检测,如果一个文件的FSDataOutputStream在60秒内(可以通过sink.setInactiveBucketThreshold(60 * 1000)来设置),都还没有接收到数据,Flink就会认为该文件是不活跃的Bucket,那么就会被Flush后关闭该文件;
  • 我们再深入一点查看代码,实际上只是在processingTimeService中注册了当前的时间(currentProcessingTime)+ 60秒不写入的时间(inactiveBucketCheckInterval)。接着通过onProcessIngTime方法去不停的判断是否满足60秒不写入,同时也会判断是否到了滚动时间。代码如下:
public void onProcessingTime(long timestamp) throws Exception {
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); 
        closePartFilesByTime(currentProcessingTime);
        processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}        
  • 在Flink内部封装了一个集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用来记录当前正在使用的文件,key是文件的路径,BucketState内部封装了该文件的所有信息,包括创建时间,最后一次写入时间(这里的写入指的是写入缓存区的时间,不是Flush的时间)。当前文件是打开还是关闭,写缓冲区的方法。都在这里。每次Flink要对文件进行操作的时候,都会从这里拿到文件的封装对象;
  • 当程序被取消的时候,当前正在操作的文件,会被Flush,然后关闭。然后将文件的后缀名从in-progress改为pending。这个前后缀都是可以设置,但如果没有什么特殊需求,默认即可。这里拿文件,用的就是上面说的bucketStates这个map。它在close方法中,会去遍历这个map,去做上述的操作;代码如下:
public void close() throws Exception {
        if (state != null) {
            for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
                closeCurrentPartFile(entry.getValue());
            }
        }
}
  • 每次写入的时候,都是会bucketStates这个map中获取对应的对象,如果没有,就会new一个该对象。然后先判断是否需要滚动(通过当前文件大小和滚动时间去判断),然后才将数据写入缓冲区,更新最后写入时间,代码如下:
public void invoke(T value) throws Exception {
        Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
 
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
        BucketState<T> bucketState = state.getBucketState(bucketPath);
        if (bucketState == null) {
            bucketState = new BucketState<>(currentProcessingTime);
            state.addBucketState(bucketPath, bucketState);
        }
 
        if (shouldRoll(bucketState, currentProcessingTime)) {
            openNewPartFile(bucketPath, bucketState);
        }
 
        bucketState.writer.write(value);
        bucketState.lastWrittenToTime = currentProcessingTime;
}
  • 写入和关闭HDFS是通过异步的方式的,异步的超时时间默认是60秒,可以通过 sink.setAsyncTimeout(60 * 1000)去设置

3.总结

Flink消费Kafka数据并写到HDFS的代码实现是比较简短了,没有太多复杂的逻辑。实现的时候,注意Kafka的地址、反序列化需要在属性中配置、以及Flink任务提交的时候,设置yarn-cluster模式、设置好内存和CPU、HDFS存储路径等信息。

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《 Kafka并不难学 》和《 Hadoop大数据挖掘从入门到进阶实战 》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK