54

Flink原理|Flink Timer注册与Watermark触发

 4 years ago
source link: https://www.tuicool.com/articles/YjUNRnq
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在Flink中无论是WindowOperator还是KeyedProcessOperator都持有InternalTimerService具体实现的对象,通过此对象用户可以注册EventTime及ProcessTime的Timer,当Watermark越过这些Timer的时候,调用回调函数执行一定的操作。这里着重看下KeyedProcessOperator(WindowOperator机制大致相同,这里就不再细说)。

当StreamTask被调度执行的时候,具体生命周期如
*  -- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> initialize-operator-states()
*        +----> open-operators()
*        +----> run()
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()

KeyedProcessOperator的open方法将在StreamTask open-operators()阶段被调用:

@Override
public void open() throws Exception {
   super.open();
   collector = new TimestampedCollector<>(output);
   //为该Operator构造InternalTimerService并启动,通过该InternalTimerService可以访问时间
   InternalTimerService<VoidNamespace> internalTimerService =
         getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

   TimerService timerService = new SimpleTimerService(internalTimerService);

   context = new ContextImpl(userFunction, timerService);
   onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}

然后StreamTask调用run()启动计算:

@Override
protected void run() throws Exception {
   // cache processor reference on the stack, to make the code more JIT friendly
//在run方法中通过inputProcessor来从input gate里面读取消息,消息可以是正常的数据,也可以是watermark
   final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

   while (running && inputProcessor.processInput()) {
      // all the work happens in the "processInput" method
   }

   // the input is finished, notify non-head operators
   if (running) {
      synchronized (getCheckpointLock()) {
         OneInputStreamOperator<IN, OUT> headOperator = getHeadOperator();
         for (StreamOperator<?> operator : operatorChain.getAllOperatorsTopologySorted()) {
            if (operator.getOperatorID().equals(headOperator.getOperatorID())) {
               continue;
            }

            Preconditions.checkState(operator instanceof OneInputStreamOperator);
            ((OneInputStreamOperator<?, ?>) operator).endInput();
         }
      }
   }
}

在StreamInputProcessor的processInput()方法中

else {
      // now we can do the actual processing
      StreamRecord<IN> record = recordOrMark.asRecord();
      synchronized (lock) {
         numRecordsIn.inc();
         streamOperator.setKeyContextElement1(record);
         //正常数据处理,最终会调用用户实现的userfunction的processElement,对于KeyedProcessOperator就是调用用户定义keyedProcessFunction的processElement
         streamOperator.processElement(record);
      }
   }

   return true;
} else if (recordOrMark.isWatermark()) {
   // handle watermark
   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);//处理watermark
   continue;
} else if (recordOrMark.isStreamStatus()) {

下面就看下Watermark的处理过程,最终会调用到AbstractStreamOperator的processWatermark方法:

public void processWatermark(Watermark mark) throws Exception {
 if (timeServiceManager != null) {
      timeServiceManager.advanceWatermark(mark);//第一步处理watermark
   }
   output.emitWatermark(mark);//第二步,将watermark发送到下游
}

那么是怎么处理Watermark的呢?接着看InternalTimeServiceManager的advanceWatermark方法:

public void advanceWatermark(Watermark watermark) throws Exception {
   //这里之前调用getInternalTimerService构建的的InternalTimerService都要处理该Watermark
   for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
      service.advanceWatermark(watermark.getTimestamp());
   }
}

接着看HeapInternalTimerService我们可以发现,这里逻辑Timer时间小于Watermark的都应该被触发回调:

public void advanceWatermark(long time) throws Exception {
   currentWatermark = time;//更新当前watermark

   InternalTimer<K, N> timer;
 //取出所有低于Watermark的Timer触发回调。
   while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

      Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
      timerSet.remove(timer);
      eventTimeTimersQueue.remove();

      keyContext.setCurrentKey(timer.getKey());
      triggerTarget.onEventTime(timer);//这里的triggerTarget就是具体的operator对象
   }
}

这里triggerTarget就是具体的operator实例,在open的时候通过InternalTimeServiceManager的getInternalTimerService方法传递到HeapInternalTimerService。

RjM3qyN.jpg!web

NjQJ7fY.jpg!web

接着看KeyedProcessOperator的onEventTime,这里就是调用用户实现的KeyedProcessFunction的onTimer做一些具体的事情。对于Window来说也是调用onEventTime或者onProcessTime来从key和window对应的状态中的数据发送到WindowFunction中去计算并发送到下游节点:

@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
   collector.setAbsoluteTimestamp(timer.getTimestamp());
   invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
private void invokeUserFunction(
      TimeDomain timeDomain,
      InternalTimer<K, VoidNamespace> timer) throws Exception {
   onTimerContext.timeDomain = timeDomain;
   onTimerContext.timer = timer;
   userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);//这里就是前面用户实现的onTimer方法
   onTimerContext.timeDomain = null;
   onTimerContext.timer = null;
}

前面讲的是Watermark是怎么被触发的,但是还有另外一个问题,Timer是如何注册的? W indowOperator和KeyedProcessOperator直接或者间接持有timerService,通过timerService对象就可以注册相应的timer。

/**
 * Interface for working with time and timers.
 */
@PublicEvolving
public interface TimerService {

   /** Returns the current processing time. */
   long currentProcessingTime();

   /** Returns the current event-time watermark. */
   long currentWatermark();

   /**
    * Registers a timer to be fired when processing time passes the given time.
    *
    * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
    * in a keyed context, such as in an operation on
    * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
    * will also be active when you receive the timer notification.
    */
   void registerProcessingTimeTimer(long time);

   /**
    * Registers a timer to be fired when the event time watermark passes the given time.
    *
    * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
    * in a keyed context, such as in an operation on
    * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
    * will also be active when you receive the timer notification.
    */
   void registerEventTimeTimer(long time);
}

对于KeyedProcessOperator来说会将timeService对象间接的传递到KeyedProcessFunction,使用户在函数层面就能注册和访问Timer。这里需要注意的有两点:

1.Namespace相同的情况下,每一个key只有1个Timer。

2.如果TimeCharacteristic为processTime,当需要注册timer时间小于当前系统处理时间会立即触发回调。

rmUvya3.jpg!web

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛 http://hbase.group ,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号: hbasegroup ),非常欢迎大家积极投稿。

3INbInB.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK