

FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis
source link: https://blog.csdn.net/lifetragedy/article/details/127139476
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis
在kafka发送plainText消息,以逗号分割。逗号前的作为key,逗号后的作为value。
然后把kafka发过来的东西以Redis的HashMap结构存入flink这个主Key中去。
为了解决这个问题,我们需要在前两个的范围内解决掉以下三个问题:
- flink如何接入kafka
- flink如何不作统计(前两课我们用的是烂网上的wordcount例子)只接入流和折分
- flink如何sink到Redis
flink如何接入kafka
pom.xml

kafka在flink内核心API的用法
以上代码相当的简单。
有一处需要注意,如果我把以上代码改成了如下那么它的效果就是每次这个flink应用重启,都会把kafka从test这个topic发过来的第一条消息全部重新读一遍,区别就在于这个“OffsetsInitializer.earliest()”。我们取的是最近一条kafka消息,因此我们才用了:OffsetsInitializer.latest()。
flink如何不作统计(前两几篇我们用的是烂网上的wordcount例子)只接入流和折分数据
DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
我们接着来看LineSplitter这个类
LineSplitter.java

非常简单,只读入流核心起作用的就是这个collector.collect,看,它按照逗号对读入的流进行折分。
flink如何sink到Redis
我们来看这边的SinkRedisMapper这个类
SinkRedisMapper.java

它的作用就是使用Redis HashMap结构,把读入的流Sink到Redis里以flink这个key开头的内容中去。
所以整个SimpleKafka内容如下
完整SimpleKafka.java

第一步:把zk运行起来
第二步:把kafka运行起来
第三步:在kafka上创建一条command窗口的producer
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
第四步:把SimpleKafka运行起来

第五步:在kafka的producer内输入点东西如下:

然后在eclipse工程中我们看到了这样的内容
我们来看我们的Redis里

看,sink成功。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK