41

Flink实践|Flink TopN 实践

 4 years ago
source link: https://www.tuicool.com/articles/EVRj6vi
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

TopN无论是在离线还是在实时计算中都是比较常见的功能,不同于离线计算中的TopN,实时数据是持续不断的,这样就给TopN的计算带来很大的困难,因为要持续在内存中维持一个TopN的数据结构,当有新数据来的时候,更新这个数据结构。参考网上提出的思路,基于Flink进行了一次TopN实践。

基于Flink实现TopN关键知识点:

  • TopN 采用小根堆数据结构,新的数据来了,只需要和根节点比较,小,则丢弃,大,插入小根堆,此次采用TreeMap来替代小根堆数据结构;

  • 当Watermark越过Window边界的时候,Window的数据触发计算先发到下游,接着Watermark才发送到下游;

  • 用MapState保存TopN数据结构,key为Window;

  • Namespace相同的情况下,每一个key只有1个Timer。

FN3qMv2.jpg!web

假如有依次有A,B,C,A流入WindowOperator, 那么每一个数据来了,首先Window assign,接着进行agg获取新的acc更新到State里面,最后注册Timer。当Watermark流入到Window Operator的时候,Timer小于Watermark都将触发毁掉,触发计算,并发送数据到下游。

6rEJVfM.jpg!web

接着按Window进行keyBy,接受到一条消息之后的处理流程为:

@Override
public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
 try {
  long timstamp = (Long) value.getField(timeIndex);
  TreeMap<CntKey, String> topNMap = null;
  if (treeMapMapState != null) {
   if (treeMapMapState.contains(timstamp)) { //获取该window对应的treeMap
    topNMap = treeMapMapState.get(timstamp);
   }
  }
  if (topNMap == null) {
   topNMap = Maps.newTreeMap();
  }

  add(value, topNMap);//判断该值是否要插入treeMap

  treeMapMapState.put(timstamp, topNMap);//更新treeMapd到state里面去

  ctx.timerService().registerEventTimeTimer(timstamp + 1);//注册timer,相key的timer只会有一个
 } catch (Exception e) {
  LOG.error("process {} failed", value, e);
 }
}

Watermark在WindowOperator所有低于该Watermark的Window触发完成以后被发送到下游,当KeyedProcessOperator接受到Watermark之后,说明之前Window的所有数据已经流入KeyedProcessOperator,接着在KeyedProcessOperatorr中注册的低于Watermark的Timer都会被触发,最后会调用onTimer方法。

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception {
 long key = ctx.getCurrentKey();
 try {
  if (treeMapMapState.contains(key)) {
   TreeMap treeMap = treeMapMapState.get(key);//获取该window对应的treeMap,代表一个widnow下的topN数据
   LOG.error("*********************** \n");
   LOG.info("top 10 {}", treeMap);
   LOG.error("*********************** \n");
   String result = (String) treeMap.values().stream().collect(Collectors.joining(","));
   treeMapMapState.remove(key);
   Row row = new Row(2);
   row.setField(0, redisKey);
   row.setField(1, result);
   out.collect(row);
  }
 } catch (Exception e) {
  LOG.error("onTimer failed for windowEnd {}", key, e);
 }
}

这里需要注意的一点是,如果使用的是ProcessTime,那么KeyedProcessOperator注册Timer的时候,会和处理时间进行比较,如果低于系统处理时间就会立即触发onTimer方法,那么触发的时候其实是没有收集完整上一个窗口的数据的。

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号: hbasegroup ),非常欢迎大家积极投稿。

3INbInB.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK