1

自定义StreamOperator

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA%3D%3D&%3Bmid=2247484057&%3Bidx=1&%3Bsn=3312590b9fd114704a660e6cd32ea139
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击上方蓝

字关注~

在上一篇 StreamOperator源码简析 从源码角度分析了StreamOperator以及其实现类,此篇幅主要分析一下如何自定义一个StreamOperator。

StreamOperator接口提供了其生命周期的抽象方法,例如初始化方法setup、open、initializeState,checkpoint相关方法prepareSnapshotPreBarrier、snapshotState,但是我们没有必要去自己一一实现这些方法,可以继承其抽象类AbstractStreamOperator,覆盖一些我们需要重写的方法。 在上一篇分析中提到对于source端不需要接受上游数据,也就不需要实现OneInputStreamOperator或者TwoInputStreamOperator接口,如果我们需要接收上游数据就必须实现这两个接口中的一个,主要看一个输入还是两个输入来选择。

案例: 假设我们现在需要实现一个通用的定时、定量的输出的StreamOperator。

实现步骤:

  1. 继承AbstractStreamOperator抽象类,实现OneInputStreamOperator接口

  2. 重写open方法,调用flink 提供的定时接口,并且注册定时器

  3. 重写initializeState/snapshotState方法,由于批量写需要做缓存,那么需要保证数据的一致性,将缓存数据存在状态中

  4. 重写processElement方法,将数据存在缓存中,达到一定大小然后输出

  5. 由于需要做定时调用,那么需要有一个定时调用的回调方法,那么定义的类需要实现ProcessingTimeCallback接口,并且实现其onProcessingTime方法(关于flink定时可以参考定时系列文章)

代码:

publicabstractclassCommonSinkOperator<T extendsSerializable>extendsAbstractStreamOperator<Object>
implementsProcessingTimeCallback,OneInputStreamOperator<T,Object>{
 
privateList<T> list;
 
privateListState<T> listState;
 
privateint batchSize;
 
privatelong interval;
 
privateProcessingTimeService processingTimeService;
 
publicCommonSinkOperator(){
}
 
publicCommonSinkOperator(int batchSize,long interval){
this.chainingStrategy =ChainingStrategy.ALWAYS;
this.batchSize = batchSize;
this.interval = interval;
}
 
@Overridepublicvoid open()throwsException{
super.open();
if(interval >0&& batchSize >1){
//获取AbstractStreamOperator里面的ProcessingTimeService, 该对象用来做定时调用
//注册定时器将当前对象作为回调对象,需要实现ProcessingTimeCallback接口
processingTimeService = getProcessingTimeService();
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now + interval,this);
}
}
//状态恢复
@Overridepublicvoid initializeState(StateInitializationContext context)throwsException{
super.initializeState(context);
this.list =newArrayList<T>();
listState = context.getOperatorStateStore().getSerializableListState("batch-interval-sink");
if(context.isRestored()){
listState.get().forEach(x ->{
list.add(x);
});
}
 
}
 
@Overridepublicvoid processElement(StreamRecord<T> element)throwsException{
list.add(element.getValue());
if(list.size()>= batchSize){
saveRecords(list);
}
 
}
//checkpoint
@Overridepublicvoid snapshotState(StateSnapshotContext context)throwsException{
super.snapshotState(context);
if(list.size()>0){
listState.clear();
listState.addAll(list);
}
}
//定时回调
@Overridepublicvoid onProcessingTime(long timestamp)throwsException{
if(list.size()>0){
saveRecords(list);
list.clear();
}
long now = processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(now + interval,this);//再次注册
}
 
publicabstractvoid saveRecords(List<T> datas);
}

如何调用? 直接使用dataStream.transform方式即可。

整体来说这个demo相对来说是比较简单的,但是这里面涉及的定时、状态管理也是值得研究,比喻说在这里定时我们直接选择ProcessingTimeService,而没有选择InternalTimerService来完成定时注册,主要是由于InternalTimerService会做定时调用状态保存,在窗口操作中需要任务失败重启仍然可以触发定时,但是在我们案例中不需要,直接下次启动重新注册即可,因此选择了ProcessingTimeService。

推荐阅读

1.  Flink中延时调用设计与实现

2. F link维表关联系列之Hbase维表关联:LRU策略

3.  你应该了解的Watermark

4. Flink exactly-once系列之事务性输出实现

5. F link时间系统系列之实例讲解:如何做定时输出

6.  Flink实战:全局TopN分析与实现

7.  Flink per-Job模式InfluxdbReporter上报JobName

8.  Flink SQL自定义聚合函数

关注回复 Flink 获取更多信息~

aQnAZf3.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK