5

CoProcessFunction实战三部曲之二:状态处理

 3 years ago
source link: http://www.cnblogs.com/bolingcavalry/p/14083884.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

  • 本文是《CoProcessFunction实战三部曲》的第二篇,咱们要实战的是双流连接场景下,处理一号流中的数据时,还要结合该key在二号流中的情况;
  • 最简单的例子:aaa在一号流中的value和二号流的value相加,再输出到下游,如下图所示,一号流中的value存入state,在二号流中取出并相加,将结果输出给下游:
    n2quqqJ.png!mobile
  • 本篇的内容就是编码实现上图的功能;

参考文章

理解状态: 《深入了解ProcessFunction的状态操作(Flink-1.10)》

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示( https://github.com/zq2599/blog_demos ):

名称 链接 备注 项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页 git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议 git仓库地址(ssh) [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinkstudy文件夹下,如下图红框所示:

V3IjMra.png!mobile

编码

  1. 字符串转Tuple2的Map函数,以及抽象类AbstractCoProcessFunctionExecutor都和上一篇 《CoProcessFunction实战三部曲之一:基本功能》 一模一样;
  2. 新增AbstractCoProcessFunctionExecutor的子类AddTwoSourceValue.java,源码如下,稍后会说明几个关键点:
package com.bolingcavalry.coprocessfunction;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author will
 * @email [email protected]
 * @date 2020-11-11 09:48
 * @description 功能介绍
 */
public class AddTwoSourceValue extends AbstractCoProcessFunctionExecutor {

    private static final Logger logger = LoggerFactory.getLogger(AddTwoSourceValue.class);

    @Override
    protected CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> getCoProcessFunctionInstance() {
        return new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            // 某个key在processElement1中存入的状态
            private ValueState<Integer> state1;

            // 某个key在processElement2中存入的状态
            private ValueState<Integer> state2;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化状态
                state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState1", Integer.class));
                state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState2", Integer.class));
            }

            @Override
            public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                logger.info("处理元素1:{}", value);

                String key = value.f0;

                Integer value2 = state2.value();

                // value2为空,就表示processElement2还没有处理或这个key,
                // 这时候就把value1保存起来
                if(null==value2) {
                    logger.info("2号流还未收到过[{}],把1号流收到的值[{}]保存起来", key, value.f1);
                    state1.update(value.f1);
                } else {
                    logger.info("2号流收到过[{}],值是[{}],现在把两个值相加后输出", key, value2);

                    // 输出一个新的元素到下游节点
                    out.collect(new Tuple2<>(key, value.f1 + value2));

                    // 把2号流的状态清理掉
                    state2.clear();
                }
            }

            @Override
            public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                logger.info("处理元素2:{}", value);

                String key = value.f0;

                Integer value1 = state1.value();

                // value1为空,就表示processElement1还没有处理或这个key,
                // 这时候就把value2保存起来
                if(null==value1) {
                    logger.info("1号流还未收到过[{}],把2号流收到的值[{}]保存起来", key, value.f1);
                    state2.update(value.f1);
                } else {
                    logger.info("1号流收到过[{}],值是[{}],现在把两个值相加后输出", key, value1);

                    // 输出一个新的元素到下游节点
                    out.collect(new Tuple2<>(key, value.f1 + value1));

                    // 把1号流的状态清理掉
                    state1.clear();
                }
            }
        };
    }

    public static void main(String[] args) throws Exception {
        new AddTwoSourceValue().execute();
    }
}
  1. 关键点之一:对于aaa这个key,无法确定会先出现在一号源还是二号源,如果先出现在一号源,就应该在processElement1中将value保存在state1中,这样等到aaa再次出现在二号源时,processElement2就可以从state1中取出一号源的value,相加后输出到下游;
  2. 关键点之二:如果输出到下游,就表示数据已经处理完毕,此时要把保存的状态清理掉;
  3. 如果您想了解低阶函数中的状态存取的更多细节,请参考 《深入了解ProcessFunction的状态操作(Flink-1.10)》

验证

  1. 分别开启本机的9998和9999端口,我这里是MacBook,执行nc -l 9998和nc -l 9999
  2. 启动Flink应用,如果您和我一样是Mac电脑,直接运行AddTwoSourceValue.main方法即可(如果是windows电脑,我这没试过,不过做成jar在线部署也是可以的);
  3. 在监听9998端口的控制台输入aaa,111,此时flink控制台输出如下,可见processElement1方法中,读取state2为空,表示aaa在二号流还未出现过,此时的aaa是首次出现,应该放入state中保存:
22:35:12,135 INFO  AddTwoSourceValue - 处理元素1:(aaa,111)
22:35:12,136 INFO  AddTwoSourceValue - 2号流还未收到过[aaa],把1号流收到的值[111]保存起来
  1. 在监听9999端口的控制台输入bbb,123,flink日志如下所示,表示bbb也是首次出现,把值保存在state中:
22:35:34,473 INFO  AddTwoSourceValue - 处理元素2:(bbb,123)
22:35:34,473 INFO  AddTwoSourceValue - 1号流还未收到过[bbb],把2号流收到的值[123]保存起来
  1. 在监听9999端口的控制台输入aaa,222,flink日志如下,很明显,之前保存在state中的值被取出来了,因此processElement2方法中,aaa在两个数据源的值111和222会被相加后输出到下游,下游是print,直接打印出来了:
22:35:38,072 INFO  AddTwoSourceValue - 处理元素2:(aaa,222)
22:35:38,072 INFO  AddTwoSourceValue - 1号流收到过[aaa],值是[111],现在把两个值相加后输出
(aaa,333)
  • 至此,双流场景下的状态互通实践咱们已经完成了,接下来的文章,会加上定时器和旁路输出,将双流场景的数据处理考虑得更加全面;

你不孤单,欣宸原创一路相伴


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK