40

Flink 源码阅读笔记(14)- Async I/O 的实现 - JR's Blog

 4 years ago
source link: https://blog.jrwang.me/2019/flink-source-code-async-io/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。

Async I/O 的使用方式

在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        // 发起异步请求,返回结果是一个 Future
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        // 请求完成时的回调,将结果交给 ResultFuture
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

AsyncDataStream 提供了两种调用方法,分别是 orderedWaitunorderedWait,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。

由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。

Async I/O 的实现

AsyncDataStream 在运行时被转换为 AsyncWaitOperator 算子,它是 AbstractUdfStreamOperator 的子类。下面我们来看看 AsyncWaitOperator 的实现原理。

AsyncWaitOperator 算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator 内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue 提供了一种队列的抽象,一个“消费者”线程 Emitter 从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示。

async-io
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class AsyncWaitOperator<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
	/** Queue to store the currently in-flight stream elements into. */
	private transient StreamElementQueue queue;

	/** Pending stream element which could not yet added to the queue. */
	private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

	private transient ExecutorService executor;

	/** Emitter for the completed stream element queue entries. */
	private transient Emitter<OUT> emitter;

	/** Thread running the emitter. */
	private transient Thread emitterThread;

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
		//注册一个定时器,在超时时调用 timeout 方法
		if (timeout > 0L) {
			// register a timeout for this AsyncStreamRecordBufferEntry
			long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
			final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
				timeoutTimestamp,
				new ProcessingTimeCallback() {
					@Override
					public void onProcessingTime(long timestamp) throws Exception {
						userFunction.timeout(element.getValue(), streamRecordBufferEntry);
					}
				});
			// Cancel the timer once we've completed the stream record buffer entry. This will remove
			// the register trigger task
			streamRecordBufferEntry.onComplete(
				(StreamElementQueueEntry<Collection<OUT>> value) -> {
					timerFuture.cancel(true);
				},
				executor);
		}
		//加入队列
		addAsyncBufferEntry(streamRecordBufferEntry);
		//发送异步请求
		userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
	}

	//尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
	private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		assert(Thread.holdsLock(checkpointingLock));
		pendingStreamElementQueueEntry = streamElementQueueEntry;
		while (!queue.tryPut(streamElementQueueEntry)) {
			// we wait for the emitter to notify us if the queue has space left again
			checkpointingLock.wait();
		}
		pendingStreamElementQueueEntry = null;
	}

}

public class Emitter<OUT> implements Runnable {
	@Override
	public void run() {
		try {
			while (running) {
				//从队列阻塞地获取元素
				AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
				output(streamElementEntry);
			}
		}
	}
}

AsyncWaitOperator 可以工作在两种模式下,即 ORDEREDUNORDERED。Flink 通过 StreamElementQueue 的不同实现实现了这两种模式。

“有序”模式

在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue 的具体是实现是 OrderedStreamElementQueueOrderedStreamElementQueue 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。

OrderedStreamElementQueue
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class OrderedStreamElementQueue implements StreamElementQueue {
	/** Capacity of this queue. */
	private final int capacity;

	/** Queue for the inserted StreamElementQueueEntries. */
	private final ArrayDeque<StreamElementQueueEntry<?>> queue;

	@Override
	public AsyncResult peekBlockingly() throws InterruptedException {
		lock.lockInterruptibly();
		try {
			//只有队列头部的请求完成后才解除阻塞状态
			while (queue.isEmpty() || !queue.peek().isDone()) {
				headIsCompleted.await();
			}
			return queue.peek();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult poll() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (queue.isEmpty() || !queue.peek().isDone()) { 
				headIsCompleted.await();
			}
			notFull.signalAll();
			return queue.poll();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			if (queue.size() < capacity) { //未达容量上限
				addEntry(streamElementQueueEntry);
				return true;
			} else {
				return false;
			}
		} finally {
			lock.unlock();
		}
	}
}

“无序”模式

在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 UnorderedStreamElementQueue 中巧妙地实现了这两种情况。

UnorderedStreamElementQueue

从上图中可以看出,在 UnorderedStreamElementQueue 内部使用了两个队列,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 中保存未完成的异步请求计算结果,而 completedQueue 中保存已完成的异步请求计算结果。注意,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, watermarkSet 作为一种特殊的集合,其内部只有一个元素,即 Watermark,充当了不同散列集合之间的分界。这样就保证了在一个 Watermark 之后的异步请求的计算结果不会先于该 Watermark 之前进行提交。firstSet 中完成异步请求的计算结果会被转移到 completedQueue 队列中,firstSet 内部的所有异步请求的计算结果都是可以乱序提交的。

如果不使用“事件时间”,那么没有 Watermark 产生,所有的异步请求都会进入 firstSet 中,因而所有的结果都是乱序提交的。

具体代码实现逻辑如下,结合上面的示意图应该不难理解。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
public class UnorderedStreamElementQueue implements StreamElementQueue {
	/** Queue of uncompleted stream element queue entries segmented by watermarks. */
	private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

	/** Queue of completed stream element queue entries. */
	private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

	/** First (chronologically oldest) uncompleted set of stream element queue entries. */
	private Set<StreamElementQueueEntry<?>> firstSet;

	// Last (chronologically youngest) uncompleted set of stream element queue entries. New
	// stream element queue entries are inserted into this set.
	private Set<StreamElementQueueEntry<?>> lastSet;

	@Override
	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();
		try {
			if (numberEntries < capacity) {
				addEntry(streamElementQueueEntry);
				return true;
			} else {
				return false;
			}
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult poll() throws InterruptedException {
		lock.lockInterruptibly();
		try {
			//等待 completedQueue 中的元素
			while (completedQueue.isEmpty()) {
				hasCompletedEntries.await();
			}
			numberEntries--;
			notFull.signalAll();
			return completedQueue.poll();
		} finally {
			lock.unlock();
		}
	}

	//异步请求完成的回调
	public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();
		try {
			//如果完成的异步请求在 firstSet 中,那么就将 firstSet 中已完成的异步请求转移到 completedQueue 中
			if (firstSet.remove(streamElementQueueEntry)) {
				completedQueue.offer(streamElementQueueEntry);
				while (firstSet.isEmpty() && firstSet != lastSet) {
					//如果firset中所有的异步请求都完成了,那么就从 uncompletedQueue 获取下一个集合作为 firstSet
					firstSet = uncompletedQueue.poll();
					Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
					while (it.hasNext()) {
						StreamElementQueueEntry<?> bufferEntry = it.next();

						if (bufferEntry.isDone()) {
							completedQueue.offer(bufferEntry);
							it.remove();
						}
					}
				}
				hasCompletedEntries.signalAll();
			}
		} finally {
			lock.unlock();
		}
	}

	private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
		assert(lock.isHeldByCurrentThread());

		if (streamElementQueueEntry.isWatermark()) {
			//如果是watermark,就要构造一个只包含这个 watermark 的 set 加入到 uncompletedQueue 队列中
			lastSet = new HashSet<>(capacity);
			if (firstSet.isEmpty()) {
				firstSet.add(streamElementQueueEntry);
			} else {
				Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
				watermarkSet.add(streamElementQueueEntry);
				uncompletedQueue.offer(watermarkSet);
			}
			uncompletedQueue.offer(lastSet);
		} else {
			//正常记录,加入lastSet中
			lastSet.add(streamElementQueueEntry);
		}

		//设置异步请求完成后的回调
		streamElementQueueEntry.onComplete(
			(StreamElementQueueEntry<T> value) -> {
				try {
					onCompleteHandler(value);
				} catch (InterruptedException e) {
				} catch (Throwable t) {
					operatorActions.failOperator(new Exception("Could not complete the " +
						"stream element queue entry: " + value + '.', t));
				}
			},
			executor);

		numberEntries++;
	}
}

在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class AsyncWaitOperator<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
	/** Recovered input stream elements. */
	private transient ListState<StreamElement> recoveredStreamElements;

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);
		recoveredStreamElements = context
			.getOperatorStateStore()
			.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

	}

	@Override
	public void open() throws Exception {
		super.open();
		
		//......

		// 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
		if (recoveredStreamElements != null) {
			for (StreamElement element : recoveredStreamElements.get()) {
				if (element.isRecord()) {
					processElement(element.<IN>asRecord());
				}
				else if (element.isWatermark()) {
					processWatermark(element.asWatermark());
				}
				else if (element.isLatencyMarker()) {
					processLatencyMarker(element.asLatencyMarker());
				}
				else {
					throw new IllegalStateException("Unknown record type " + element.getClass() +
						" encountered while opening the operator.");
				}
			}
			recoveredStreamElements = null;
		}
	}


	@Override
	public void snapshotState(StateSnapshotContext context) throws Exception {
		super.snapshotState(context);

		//先清除状态
		ListState<StreamElement> partitionableState =
			getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
		partitionableState.clear();

		//将所有未完成处理请求对应的消息加入状态中
		Collection<StreamElementQueueEntry<?>> values = queue.values();
		try {
			for (StreamElementQueueEntry<?> value : values) {
				partitionableState.add(value.getStreamElement());
			}

			// add the pending stream element queue entry if the stream element queue is currently full
			if (pendingStreamElementQueueEntry != null) {
				partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
			}
		} catch (Exception e) {
			partitionableState.clear();
			throw new Exception("Could not add stream element queue entries to operator state " +
				"backend of operator " + getOperatorName() + '.', e);
		}
	}

}

在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。

-EOF-


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK