23

深入解析 Flink 的算子链机制

 3 years ago
source link: https://segmentfault.com/a/1190000038146668
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者:LittleMagic

“为什么我的 Flink 作业 Web UI 中只显示出了一个框,并且 Records Sent 和Records Received 指标都是 0 ?是我的程序写得有问题吗?”

Flink 算子链简介

笔者在 Flink 社区群里经常能看到类似这样的疑问。这种情况几乎都不是程序有问题,而是因为 Flink 的 operator chain ——即算子链机制导致的,即提交的作业的执行计划中,所有算子的并发实例(即 sub-task )都因为满足特定条件而串成了整体来执行,自然就观察不到算子之间的数据流量了。

当然上述是一种特殊情况。我们更常见到的是只有部分算子得到了算子链机制的优化,如官方文档中出现过多次的下图所示,注意 Source 和 map() 算子。

VjQfIbn.png!mobile

算子链机制的好处是显而易见的:所有 chain 在一起的 sub-task 都会在同一个线程(即 TaskManager 的 slot)中执行,能够减少不必要的数据交换、序列化和上下文切换,从而提高作业的执行效率。

MzyQBbI.png!mobile

铺垫了这么多,接下来就通过源码简单看看算子链产生的条件,以及它是如何在 Flink Runtime 中实现的。

逻辑计划中的算子链

对 Flink Runtime 稍有了解的看官应该知道,Flink 作业的执行计划会用三层图结构来表示,即:

  • StreamGraph —— 原始逻辑执行计划
  • JobGraph —— 优化的逻辑执行计划(Web UI 中看到的就是这个)
  • ExecutionGraph —— 物理执行计划

算子链是在优化逻辑计划时加入的,也就是由 StreamGraph 生成 JobGraph 的过程中。那么我们来到负责生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 类,查看其核心方法 createJobGraph() 的源码。

private JobGraph createJobGraph() {
    // make sure that all vertices start immediately
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }
    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    setChaining(hashes, legacyHashes, chainedOperatorHashes);

    setPhysicalEdges();
    // 略......

    return jobGraph;
}

可见,该方法会先计算出 StreamGraph 中各个节点的哈希码作为唯一标识,并创建一个空的 Map 结构保存即将被链在一起的算子的哈希码,然后调用 setChaining() 方法,如下源码所示。

<pre class="cm-s-default" style="color: rgb(55, 61, 65); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {        createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);    }}</pre>

可见是逐个遍历 StreamGraph 中的 Source 节点,并调用 createChain() 方法。createChain() 是逻辑计划层创建算子链的核心方法,完整源码如下,有点长。

private List<StreamEdge> createChain(
        Integer startNodeId,
        Integer currentNodeId,
        Map<Integer, byte[]> hashes,
        List<Map<Integer, byte[]>> legacyHashes,
        int chainIndex,
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    if (!builtVertices.contains(startNodeId)) {
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }

        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
        }

        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
            createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
        }

        List<Tuple2<byte[], byte[]>> operatorHashes =
            chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

        byte[] primaryHashBytes = hashes.get(currentNodeId);
        OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
            operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
        }

        chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }
        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {
            config.setChainStart();
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
        } else {
            chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);
        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;
    } else {
        return new ArrayList<>();
    }
}

先解释一下方法开头创建的 3 个 List 结构:

  • transitiveOutEdges:当前算子链在 JobGraph 中的出边列表,同时也是 createChain() 方法的最终返回值;
  • chainableOutputs:当前能够链在一起的 StreamGraph 边列表;
  • nonChainableOutputs:当前不能够链在一起的 StreamGraph 边列表。

接下来,从 Source 开始遍历 StreamGraph 中当前节点的所有出边,调用 isChainable() 方法判断是否可以被链在一起(这个判断逻辑稍后会讲到)。可以链接的出边被放入 chainableOutputs 列表,否则放入 nonChainableOutputs 列表。 对于 chainableOutputs 中的边,就会以这些边的直接下游为起点,继续递归调用createChain() 方法延展算子链。对于 nonChainableOutputs 中的边,由于当前算子链的延展已经到头,就会以这些“断点”为起点,继续递归调用 createChain() 方法试图创建新的算子链。也就是说,逻辑计划中整个创建算子链的过程都是递归的,亦即实际返回时,是从 Sink 端开始返回的。

然后要判断当前节点是不是算子链的起始节点。如果是,则调用 createJobVertex()方法为算子链创建一个 JobVertex( 即 JobGraph 中的节点),也就形成了我们在Web UI 中看到的 JobGraph 效果:

2yqAjmA.png!mobile

最后,还需要将各个节点的算子链数据写入各自的 StreamConfig 中,算子链的起始节点要额外保存下 transitiveOutEdges。StreamConfig 在后文的物理执行阶段会再次用到。

形成算子链的条件

来看看 isChainable() 方法的代码。

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
    StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

    return downStreamVertex.getInEdges().size() == 1
            && outOperator != null
            && headOperator != null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled();
}

由此可得,上下游算子能够 chain 在一起的条件还是非常苛刻的(老生常谈了),列举如下:

  • 上下游算子实例处于同一个 SlotSharingGroup 中(之后再提);
  • 下游算子的链接策略(ChainingStrategy)为 ALWAYS ——既可以与上游链接,也可以与下游链接。我们常见的 map()、filter() 等都属此类;
  • 上游算子的链接策略为 HEAD 或 ALWAYS。HEAD 策略表示只能与下游链接,这在正常情况下是 Source 算子的专属;
  • 两个算子间的物理分区逻辑是 ForwardPartitioner ,可参见之前写过的《聊聊Flink DataStream 的八种物理分区逻辑》;
  • 两个算子间的 shuffle 方式不是批处理模式;
  • 上下游算子实例的并行度相同;
  • 没有禁用算子链。

禁用算子链

用户可以在一个算子上调用 startNewChain() 方法强制开始一个新的算子链,或者调用 disableOperatorChaining() 方法指定它不参与算子链。代码位于 SingleOutputStreamOperator 类中,都是通过改变算子的链接策略实现的。

@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
    return setChainingStrategy(ChainingStrategy.NEVER);
}

@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
    return setChainingStrategy(ChainingStrategy.HEAD);
}

如果要在整个运行时环境中禁用算子链,调用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可。

物理计划中的算子链

在 JobGraph 转换成 ExecutionGraph 并交由 TaskManager 执行之后,会生成调度执行的基本任务单元 ——StreamTask,负责执行具体的 StreamOperator 逻辑。在StreamTask.invoke() 方法中,初始化了状态后端、checkpoint 存储和定时器服务之后,可以发现:

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

构造出了一个 OperatorChain 实例,这就是算子链在实际执行时的形态。解释一下OperatorChain 中的几个主要属性。

private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
  • headOperator:算子链的第一个算子,对应 JobGraph 中的算子链起始节点;
  • allOperators:算子链中的所有算子,倒序排列,即 headOperator 位于该数组的末尾;
  • streamOutputs:算子链的输出,可以有多个;
  • chainEntryPoint:算子链的“入口点”,它的含义将在后文说明。

由上可知,所有 StreamTask 都会创建 OperatorChain。如果一个算子无法进入算子链,也会形成一个只有 headOperator 的单个算子的 OperatorChain。 OperatorChain 构造方法中的核心代码如下。

StreamEdge outEdge = outEdgesInOrder.get(i);
    RecordWriterOutput<?> streamOutput = createStreamOutput(
        recordWriters.get(i),
        outEdge,
        chainedConfigs.get(outEdge.getSourceId()),
        containingTask.getEnvironment());
    this.streamOutputs[i] = streamOutput;
    streamOutputMap.put(outEdge, streamOutput);
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
    containingTask,
    configuration,
    chainedConfigs,
    userCodeClassloader,
    streamOutputMap,
    allOps);

if (operatorFactory != null) {
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
    headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
    headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
    headOperator = null;
}

// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);

首先会遍历算子链整体的所有出边,并调用 createStreamOutput() 方法创建对应的下游输出 RecordWriterOutput。然后就会调用 createOutputCollector() 方法创建物理的算子链,并返回 chainEntryPoint,这个方法比较重要,部分代码如下。

StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators) {
    List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

    // create collectors for the network outputs
    for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
        @SuppressWarnings("unchecked")
        RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }

    // Create collectors for the chained outputs
    for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
        int outputId = outputEdge.getTargetId();
        StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
        WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators,
            outputEdge.getOutputTag());
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }
    // 以下略......
}

该方法从上一节提到的 StreamConfig 中分别取出出边和链接边的数据,并创建各自的 Output。出边的 Output 就是将数据发往算子链之外下游的 RecordWriterOutput,而链接边的输出要靠 createChainedOperator() 方法。

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators,
        OutputTag<IN> outputTag) {
    // create the output that the operator writes to first. this may recursively create more operators
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
        containingTask,
        operatorConfig,
        chainedConfigs,
        userCodeClassloader,
        streamOutputs,
        allOperators);

    // now create the operator and give it the output collector to write its output to
    StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
    OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
            containingTask, operatorConfig, chainedOperatorOutput);

    allOperators.add(chainedOperator);

    WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
    if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
        currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
    }
    else {
        TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
        currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
    }

    // wrap watermark gauges since registered metrics must be unique
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
    return currentOperatorOutput;
}

我们一眼就可以看到,这个方法递归调用了上述 createOutputCollector() 方法,与逻辑计划阶段类似,通过不断延伸 Output 来产生 chainedOperator(即算子链中除了headOperator 之外的算子),并逆序返回,这也是 allOperators 数组中的算子顺序为倒序的原因。

chainedOperator 产生之后,将它们通过 ChainingOutput 连接起来,形成如下图所示的结构。

rMJ7FfF.png!mobile

图片来自: http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

最后来看看 ChainingOutput.collect() 方法是如何输出数据流的。

@Override
public void collect(StreamRecord<T> record) {
    if (this.outputTag != null) {
        // we are only responsible for emitting to the main input
        return;
    }
    pushToOperator(record);
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
        // we are only responsible for emitting to the side-output specified by our
        // OutputTag.
        return;
    }
    pushToOperator(record);
}

protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        operator.setKeyContextElement1(castRecord);
        operator.processElement(castRecord);
    }
    catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}

可见是通过调用链接算子的 processElement() 方法,直接将数据推给下游处理了。也就是说,OperatorChain 完全可以看做一个由 headOperator 和 streamOutputs组成的单个算子,其内部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽,同时没有引入任何 overhead。

打通了算子链在执行层的逻辑,看官应该会明白 chainEntryPoint 的含义了。由于它位于递归返回的终点,所以它就是流入算子链的起始 Output,即上图中指向 headOperator 的 RecordWriterOutput。

文章转载自简书,作者:LittleMagic。

原文链接: https://www.jianshu.com/p/799744e347c7


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK