9

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

 2 years ago
source link: https://blog.csdn.net/lifetragedy/article/details/127141005
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

每一个商品被卖出去一条就以以下格式通过kafka发送过来,只对status=101的productId进行统计:

假设每过60s有上述内容被发送过来,那么flink应该会形成以下这样的一个排行榜在Redis内并且随着kafka传送过来的数据变化面实时变化着这个排行榜:

为什么a1001是2而不是3?因为只统计statusCode为101的商品

进入开发前需要解决的问题

时间窗口的问题;

每60秒一统计,这就是标准的流式计算了。我们假设每隔60秒进入的数据都是不同的,第一次60秒可能进入10条数据,那么这10条数据的“排行榜”情况和下一个60秒内进入的数据会有差别,因此每个“时间窗口”进入的数据都不一样,我们的统计就是根据这个“时间窗口”内的数据进行计算的,因此才叫实时处理。

它和传统的做法的区别在于如下根本区别。

传统的做法:

假设目前mysql内有100万条记录,取出最近60s数据虽然可能取出的结果只有10条但是数据量的底有100万,即你表面看到的是取出10条背后是100万条数据参与了计算。那么如果我们说这个时间窗口假设缩短到了每5秒一变化。试想一下前端是一个小程序,有1,000个用户并行打开这个界面,每个连接以每5s时间去生产假设生产内是1亿条记录这么刷一次,每5秒刷一次,你的生产直接会被搞爆。

流式的做法:

我才不管你这个底是1000万还是1个亿,我只取60秒内的数据,可能有10条也可能是1万条,也可能是10万条,对于flink来说,这点数据量so what?一点不care。

所以我们需要先解决这个时间窗口的问题。



newCodeMoreWhite.png

排名的问题;

这边我们可以取巧,我们使用了TreeMap里的内置compare,我们覆写这个方法,下面上代码



newCodeMoreWhite.png

然后把以上两块内容合并,写入Redis就搞定这件事了,下面给出完整代码

ProductAggregate.java



newCodeMoreWhite.png

SumSplitter.java



newCodeMoreWhite.png

SumReport2RedisMapper.java



newCodeMoreWhite.png

5bf052d2b215454bb5bbf6eab47a94a3.png

然后我们在kafka处输入如下内容

44b3c3bf46754b2f8ccadafa44638454.png

 我们去Redis内可以看到有一个这样的Key生成了,内容如下:

89d71fe1b49343b28258c9a5c204a18a.png

然后页面做个实时查询的。。。不管什么页面也好、做个什么APP也好、做个什么小程序也好,连着这个Redis数据结果,随便刷。。。啊我。。。”洗刷刷洗刷刷,洗刷刷“


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK