5

Flink快速了解(6)——常用算子(Operator)

 3 years ago
source link: https://niyanchun.com/flink-quick-learning-6-operators.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Flink快速了解(6)——常用算子(Operator)

2021-03-28 大数据 Flink 30次阅读

Flink的Stream Job就是由一些算子构成的(Source和Sink实质也是特殊的算子而已),本文介绍常见的DataStream算子(Operator)。我用一种不太科学的方式将这些算子分成了2类,并起了一个不太严谨的名字:

  • 单流算子:这类算子一般在一个流上面使用;
  • 多流算子:这类算子往往操作多个流。

单流算子大都比较简单,粗略介绍。

  1. map/flatmap:使用最多的算子,map是输入一个元素,输出一个元素;flatmap是输入一个元素,输出0个或多个元素。
  2. filter:过滤,条件为真就继续往下传,为假就过滤掉了。
  3. keyBy:按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,经过keyBy的流变成KeyedStream。一般和其它算子一起使用。
  4. reduce算子:在KeyedStream上面使用,“滚动式”的操作流中的元素。比如下面这个滚动式相加的例子:

    public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        // produce 1,2,3,4,5
        DataStream<Long> source = env.fromSequence(1, 5);
    
        source.keyBy(value -> 1)    // 所有数据在一个Partition
            .reduce(new ReduceFunction<Long>() {
            @Override
            public Long reduce(Long value1, Long value2) throws Exception {
                // value1 保存上次迭代的结果值
                System.out.printf("value1: %d, value2: %d\n", value1, value2);
                return value1 + value2;
            }
            }).print();
    
        env.execute();
    }
    }

    输出如下:

    value1: 1, value2: 2
    3
    value1: 3, value2: 3
    6
    value1: 6, value2: 4
    10
    value1: 10, value2: 5
    15
  5. Aggregation算子:在KeyedStream上面使用,包括summinminBymaxmaxBy。这些算子在DataStream/DataSet中要被废弃了(见FLIP-134),这里主要介绍一下min/minBy的区别(max/maxBy类似),直接看代码吧:

    public class AggregationDemo {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment envMin = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamExecutionEnvironment envMinBy = StreamExecutionEnvironment.getExecutionEnvironment();
         envMin.setParallelism(1);
         envMinBy.setParallelism(1);
    
         List<Tuple3<Integer, Integer, Integer>> data = new ArrayList<>();
         data.add(new Tuple3<>(0, 2, 4));
         data.add(new Tuple3<>(0, 4, 5));
         data.add(new Tuple3<>(0, 3, 3));
         data.add(new Tuple3<>(0, 1, 2));
         data.add(new Tuple3<>(1, 2, 4));
         data.add(new Tuple3<>(1, 5, 1));
         data.add(new Tuple3<>(1, 1, 0));
         data.add(new Tuple3<>(1, 2, 2));
    
         System.out.println("Min:");
         DataStream<Tuple3<Integer, Integer, Integer>> sourceMin = envMin.fromCollection(data);
         sourceMin.keyBy(tuple3 -> tuple3.f0).min(1).print();
    
         envMin.execute();
    
         System.out.println("\nMinBy:");
         DataStream<Tuple3<Integer, Integer, Integer>> sourceMinBy = envMinBy.fromCollection(data);
         sourceMinBy.keyBy(tuple3 -> tuple3.f0).minBy(1).print();
    
         envMinBy.execute();
     }
    }

    输出结果:

    Min:
    (0,2,4)
    (0,2,4)
    (0,2,4)
    (0,1,4)
    (1,2,4)
    (1,2,4)
    (1,1,4)
    (1,1,4)
    
    MinBy:
    (0,2,4)
    (0,2,4)
    (0,2,4)
    (0,1,2)
    (1,2,4)
    (1,2,4)
    (1,1,0)
    (1,1,0)

可以看到min只取了元素中用于排序的那个key,元素其它字段还是第一个元素的;而minBy则是保留了完整的key最小的那个元素(好拗口...)。

  1. window类算子:这个就不说了,前面的多篇文章已经介绍过了。

下面看稍微复杂一些的多流算子。

多流算子的差异点主要体现在以下3个方面:

  • 能同时处理的流的个数
  • 是否可以处理不同类型的流

union

union用于将多个同类型的流合并成一个新的流。比如一个流与自身union则会将元素翻倍:

public class UnionDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Long> source = env.fromSequence(0, 5);
    source.print();

    DataStream<Long> unionStream = source.union(source);
    unionStream.print();

    env.execute();
  }
}
# source 
0
1
2
3
4
5
# unionStream
0
0
1
1
2
2
3
3
4
4
5
5

join只能操作2个keyedStream流,但这2个流的类型可以不一样,它对数据的操作相当于数据库里面的inner join:对一个数据集中相同key的数据执行inner join。在Flink DataStream里面有2种类型的流:

  • Window Join:通过窗口获取一个数据集

    • Tumbling Window Join
    • Sliding Window Join
    • Session Window Join
  • Interval Join:通过定义一个时间段获取一个数据集

Window Join就是配合窗口使用,然后又根据窗口的类型细分成了3种。Window Join的语法如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

不同的窗口类型只是影响窗口产生的数据集,但join的方式是一模一样的,这里就以最简单最容易理解的Tumbling Window Join为例介绍(下面的图来自官网):

图中随着时间(横轴)产生了4个窗口,上面的绿色代表一个流(greenStream),下面的桔色(orangeStream)是另外一条流,下面的数据就是每个窗口中2个流join产生的数据。我写了一个模拟这个例子的代码:

public class WindowJoinDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());

    orangeStream.join(greenStream)
        .where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
        .apply((JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>)
            (orangeStreamTuple2, greenStreamTuple2) -> orangeStreamTuple2.f1 + "," + greenStreamTuple2.f1)
        .print();

    env.execute();
  }

  static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);

      ctx.emitWatermark(new Watermark(1000));
      // window: [1000, 2000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);

      ctx.emitWatermark(new Watermark(2000));
      // window: [2000, 3000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);

      ctx.emitWatermark(new Watermark(3000));
      // window: [3000, 4000)
      ctx.collectWithTimestamp(new Tuple2<>("another_key", 4), 3500);
    }

    @Override
    public void cancel() {
    }
  }

  static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 0);
      // window: [1000, 2000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 2), 1500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 1500);
      // window: [2000, 3000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 2500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 5), 2500);
      // window: [3000, 4000)
      ctx.collectWithTimestamp(new Tuple2<>("key", 6), 3500);
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 3500);
      ;
    }

    @Override
    public void cancel() {
    }
  }
}

代码运行如下:

0,0
0,1
1,0
1,1
2,3
3,3
4,4
5,4

Interval Join也非常简单(假设是A join B),定义一个时间段[a.timestamp + lowerBound, a.timestamp + upperBound],然后落在这个时间区间的元素都符合b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]。IntervalJoin目前只支持Event Time。 其语法如下:

stream.keyBy(<KeySelector>)
    .intervalJoin(otherStream.keyBy(<KeySelector>))
    .between(<lowerBoundTime>, <upperBoundTime>)
    .process (<ProcessJoinFunction>);

这里再借用官方的一个图:

上面的图中orangeStream interval-join greenStream,然后时间区间是[orangeElem.ts - 2 <= greenElem.ts <= orangeElem.ts + 1],下面是每个interval中通过join产生的结果。我写了一个模拟上面示例的代码:

public class IntervalJoinDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new OrangeSource());

    orangeStream.keyBy(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .intervalJoin(greenStream.keyBy(greenStreamTuple2 -> greenStreamTuple2.f0))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
          @Override
          public void processElement(Tuple2<String, Integer> orangeTuple2, Tuple2<String, Integer> greenTuple2, Context ctx, Collector<String> out) throws Exception {
            out.collect(orangeTuple2.f1 + "," + greenTuple2.f1);
          }
        }).print();

    env.execute();
  }

  static class GreenSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
      // window: [0, 1)
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      // window: [1, 2)
      ctx.collectWithTimestamp(new Tuple2<>("key", 1), 1);
      // window: [6, 7)
      ctx.collectWithTimestamp(new Tuple2<>("key", 6), 6);
      // window: [7, 8)
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
    }

    @Override
    public void cancel() {
    }
  }

  static class OrangeSource extends RichSourceFunction<Tuple2<String, Integer>> {

    @Override
    public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

      ctx.emitWatermark(new Watermark(0));
      ctx.collectWithTimestamp(new Tuple2<>("key", 0), 0);
      ctx.emitWatermark(new Watermark(2));
      ctx.collectWithTimestamp(new Tuple2<>("key", 2), 2);
      ctx.emitWatermark(new Watermark(3));
      ctx.collectWithTimestamp(new Tuple2<>("key", 3), 3);
      ctx.emitWatermark(new Watermark(4));
      ctx.collectWithTimestamp(new Tuple2<>("key", 4), 4);
      ctx.emitWatermark(new Watermark(5));
      ctx.collectWithTimestamp(new Tuple2<>("key", 5), 5);
      ctx.emitWatermark(new Watermark(7));
      ctx.collectWithTimestamp(new Tuple2<>("key", 7), 7);
    }

    @Override
    public void cancel() {
    }
  }
}

输出如下:

0,0
0,1
2,0
2,1
5,6
7,6
7,7

总的来说,join其实和数据库的inner join语义是完全一样的,在一组数据集中相同key的元素上面执行inner join,这个数据集可以通过窗口产生,也可以通过定义一个时间段产生。

cogroup

前面说了join只能执行inner join。如果你需要其它join怎么办?cogroup就是你想要的,其语法如下:

dataStream.coGroup(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>
    .apply (new CoGroupFunction () {...});

相比于join,congroup是更一般的实现,会在CoGroupFunction中将窗口中2个流的元素以2个迭代器的方式暴露给开发者,让开发者按照自己的意图自行编写操作逻辑,所以它不仅可以实现其它类型的join,还可以实现更一般的操作。不过在Stack Overflow下上面看到Flink PMC成员提到join的执行策略可以使用基于排序或者哈希,而cogroup只能基于排序,因此join一般会比cogroup高效一些,所以join能满足的场景,就尽量优先用join吧。下面看个使用的例子:

public class CoGroupDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new WindowJoinDemo.GreenSource());
    DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new WindowJoinDemo.OrangeSource());

    orangeStream.coGroup(greenStream)
        .where(orangeStreamTuple2 -> orangeStreamTuple2.f0)
        .equalTo(greenStreamTuple2 -> greenStreamTuple2.f0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
        .apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
          @Override
          public void coGroup(Iterable<Tuple2<String, Integer>> orange, Iterable<Tuple2<String, Integer>> green, Collector<String> out) throws Exception {
            StringBuffer sb = new StringBuffer();
            sb.append("window:{ orange stream elements: [ ");
            orange.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
            sb.append("], green stream elements: [ ");
            green.forEach(tuple2 -> sb.append(tuple2.f1).append(" "));
            sb.append("]");
            out.collect(sb.toString());
          }
        })
        .print();

    env.execute();
  }
}

输出如下:

window:{ orange stream elements: [ 0 1 ], green stream elements: [ 0 1 ]
window:{ orange stream elements: [ 2 3 ], green stream elements: [ 3 ]
window:{ orange stream elements: [ 4 5 ], green stream elements: [ 4 ]
window:{ orange stream elements: [ ], green stream elements: [ 4 ]
window:{ orange stream elements: [ 6 7 ], green stream elements: [ ]

connect

connect用于连接2个流,这2个流可以是不同的类型,然后通过2个算子对流中的元素进行不同的处理。connect可以单独使用,也可以配合Broadcast机制一起使用。connect的主要使用场景是一个主流(数据流)和一个辅助流(比如配置、规则)连接,通过辅助流动态的控制主流的行为。下面给出2段代码示例,功能都是完全一样的:通过辅助流来改变主流输出某个数的倍数。一个用单独的connect实现,一个配合broadcast实现。

单独的connect:

public class ConnectDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 主数据流
    DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        long i = 0;
        while (true) {
          ctx.collect(i++);
          Thread.sleep(500);
        }
      }

      @Override
      public void cancel() {
      }
    });

    // 规则数据流
    DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        ctx.collect("one");
        Thread.sleep(5000);
        ctx.collect("two");
        Thread.sleep(5000);
        ctx.collect("three");

        Thread.sleep(Long.MAX_VALUE);
      }

      @Override
      public void cancel() {
      }
    });

    dataStream.connect(ruleStream)
        .flatMap(new CoFlatMapFunction<Long, String, Object>() {
          String rule;

          @Override
          public void flatMap1(Long value, Collector<Object> out) throws Exception {
            if ("one".equalsIgnoreCase(rule)) {
              out.collect(value);
            } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
              out.collect(value);
            } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
              out.collect(value);
            }
          }

          @Override
          public void flatMap2(String value, Collector<Object> out) throws Exception {
            System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
            rule = value;
          }
        }).print();

    env.execute();
  }
}

输出如下:

update rule, old rule = null, new rule = one
1
2
3
4
5
6
7
8
9
update rule, old rule = one, new rule = two
10
12
14
16
18
update rule, old rule = two, new rule = three
21
24
27
30
33
36
...

配合broadcast实现:

public class BroadcastDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 主数据流
    DataStream<Long> dataStream = env.addSource(new SourceFunction<Long>() {
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
        long i = 0;
        while (true) {
          ctx.collect(i++);
          Thread.sleep(500);
        }
      }

      @Override
      public void cancel() {
      }
    });

    // 规则数据流
    DataStream<String> ruleStream = env.addSource(new SourceFunction<String>() {
      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        ctx.collect("one");
        Thread.sleep(5000);
        ctx.collect("two");
        Thread.sleep(5000);
        ctx.collect("three");

        Thread.sleep(Long.MAX_VALUE);
      }

      @Override
      public void cancel() {
      }
    });
    MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
        "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(new TypeHint<String>() {
        }));
    BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

    dataStream.connect(ruleBroadcastStream)
        .process(new BroadcastProcessFunction<Long, String, Long>() {
          String rule;

          @Override
          public void processElement(Long value, ReadOnlyContext ctx, Collector<Long> out) throws Exception {
            if ("one".equalsIgnoreCase(rule)) {
              out.collect(value);
            } else if ("two".equalsIgnoreCase(rule) && (value % 2 == 0)) {
              out.collect(value);
            } else if ("three".equalsIgnoreCase(rule) && (value % 3 == 0)) {
              out.collect(value);
            }
          }

          @Override
          public void processBroadcastElement(String value, Context ctx, Collector<Long> out) throws Exception {
            System.out.printf("update rule, old rule = %s, new rule = %s\n", rule, value);
            rule = value;
          }
        }).print();

    env.execute();
  }
}

输出与单独connect的输出一模一样,这里就不再附运行结果了。

Iterate

Iterate功能如其名,就是迭代,有点类似于状态机。我们可以通过Iterate算子实现流里面元素的迭代计算,直到它符合某个条件,这在一些自学习的场景中比较常见。这里不太严谨的将它也归类为多流算子,是因为它需要处理“回炉改造”的元素构成的“新流”。下面的代码实现了一个非常简单的功能:在一个原始流上面执行减一操作(minusOne),减完之后检查元素的值是否大于0;如果大于0继续迭代减一,直到其小于0,退出迭代,进入后续操作(这里直接print了)。

public class IterateDemo {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    List<Tuple2<String, Integer>> tuple2s = new ArrayList<>(4);
    tuple2s.add(new Tuple2<>("Jim", 3));
    tuple2s.add(new Tuple2<>("John", 2));
    tuple2s.add(new Tuple2<>("Lily", 1));
    tuple2s.add(new Tuple2<>("Lucy", 4));

    // source
    DataStream<Tuple2<String, Integer>> source = env.fromCollection(tuple2s);
    source.print();
    // 得到一个IterativeStream
    IterativeStream<Tuple2<String, Integer>> iteration = source.iterate();
    // 执行常规计算
    DataStream<Tuple2<String, Integer>> minusOne = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
        System.out.printf("[minusOne] %s: %d\n", value.f0, value.f1);
        return new Tuple2<>(value.f0, value.f1 - 1);
      }
    });
    // 找出需要迭代的元素形成一个流
    DataStream<Tuple2<String, Integer>> stillGreaterThanZero = minusOne.filter(new FilterFunction<Tuple2<String, Integer>>() {
      @Override
      public boolean filter(Tuple2<String, Integer> value) throws Exception {
        boolean greaterThanZero = value.f1 > 0;
        if (greaterThanZero) {
          System.out.printf("%s is still greater than zero(now: %d), back to minusOne\n", value.f0, value.f1);
        }
        return greaterThanZero;
      }
    });
    // 将需要迭代的流传递给closeWith方法
    iteration.closeWith(stillGreaterThanZero);

    minusOne.filter(tuple2 -> tuple2.f1 <= 0).print();

    env.execute();
  }
}

输出如下:

(Jim,3)
(John,2)
(Lily,1)
(Lucy,4)
[minusOne] Jim: 3
Jim is still greater than zero(now: 2), back to minusOne
[minusOne] John: 2
John is still greater than zero(now: 1), back to minusOne
[minusOne] Lily: 1
(Lily,0)
[minusOne] Lucy: 4
Lucy is still greater than zero(now: 3), back to minusOne
[minusOne] Jim: 2
Jim is still greater than zero(now: 1), back to minusOne
[minusOne] John: 1
(John,0)
[minusOne] Lucy: 3
Lucy is still greater than zero(now: 2), back to minusOne
[minusOne] Jim: 1
(Jim,0)
[minusOne] Lucy: 2
Lucy is still greater than zero(now: 1), back to minusOne
[minusOne] Lucy: 1
(Lucy,0)

一个典型的Flink流处理程序往往就是由上面介绍的这些算子组成的,可以看到各个算子的使用都不算复杂,所以要编写一个流处理程序也并不困难。难的是如何编写一个正确、稳健、高效的程序,以及如何有针对性的调优,而要做好这些,就需要理解流处理背后的运行原理和关键设计。

下篇文章介绍另外一个非常重要的算子:Flink Async I/O。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK