57

Flink的sink实战之四:自定义

 3 years ago
source link: https://segmentfault.com/a/1190000037795411
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

Flink官方提供的sink服务可能满足不了我们的需要,此时可以开发自定义的sink,文本就来一起实战;

全系列链接

  1. 《Flink的sink实战之一:初探》
  2. 《Flink的sink实战之二:kafka》
  3. 《Flink的sink实战之三:cassandra3》
  4. 《Flink的sink实战之四:自定义》

继承关系

  1. 在正式编码前,要先弄清楚对sink能力是如何实现的,前面我们实战过的print、kafka、cassandra等sink操作,核心类的继承关系如下图所示:

iUjqMfV.png!mobile

  1. 可见实现sink能力的关键,是实现<font color="blue">RichFunction和SinkFunction</font>接口,前者用于资源控制(如open、close等操作),后者负责sink的具体操作,来看看最简单的PrintSinkFunction类是如何实现SinkFunction接口的invoke方法:
@Override
public void invoke(IN record) {
  writer.write(record);
}
  1. 现在对sink的基本逻辑已经清楚了,可以开始编码实战了;

内容和版本

本次实战很简单:自定义sink,用于将数据写入MySQL,涉及的版本信息如下:

  1. jdk:1.8.0_191
  2. flink:1.9.2
  3. maven:3.6.0
  4. flink所在操作系统:CentOS Linux release 7.7.1908
  5. MySQL:5.7.29
  6. IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示( https://github.com/zq2599/blo...

名称 链接 备注 项目主页 https://github.com/zq2599/blo... 该项目在GitHub上的主页 git仓库地址(https) https://github.com/zq2599/blo... 该项目源码的仓库地址,https协议 git仓库地址(ssh) [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在<font color="blue">flinksinkdemo</font>文件夹下,如下图红框所示:

rayyeaf.png!mobile

数据库准备

请您将MySQL准备好,并执行以下sql,用于创建数据库flinkdemo和表student:

create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

编码

  1. 使用 《Flink的sink实战之二:kafka》 中创建的flinksinkdemo工程;
  2. 在pom.xml中增加mysql的依赖:
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.11</version>
</dependency>
  1. 创建和数据库的student表对应的实体类Student.java:
package com.bolingcavalry.customize;

public class Student {
    private int id;
    private String name;
    private int age;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  1. 创建自定义sink类MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> {

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        //准备数据库相关实例
        buildPreparedStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();

        try{
            if(null!=preparedStatement) {
                preparedStatement.close();
                preparedStatement = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }

        try{
            if(null!=connection) {
                connection.close();
                connection = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {
        preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();
    }

    /**
     * 准备好connection和preparedStatement
     * 获取mysql连接实例,考虑多线程同步,
     * 不用synchronize是因为获取数据库连接是远程操作,耗时不确定
     * @return
     */
    private void buildPreparedStatement() {
        if(null==connection) {
            boolean hasLock = false;
            try {
                hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) {
                    Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
                }

                if(null!=connection) {
                    preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
                }
            } catch (Exception e) {
                //生产环境慎用
                e.printStackTrace();
            } finally {
                if(hasLock) {
                    reentrantLock.unlock();
                }
            }
        }
    }
}
  1. 上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本mysql驱动对应的driver和uri的写法与以前5.x版本的区别;
  2. 创建任务类StudentSink.java,用来创建一个flink任务,里面通过ArrayList创建了一个数据集,然后直接addSink,为了看清DAG,调用disableChaining方法取消了operator chain:
package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //并行度为1
        env.setParallelism(1);

        List<Student> list = new ArrayList<>();
        list.add(new Student("aaa", 11));
        list.add(new Student("bbb", 12));
        list.add(new Student("ccc", 13));
        list.add(new Student("ddd", 14));
        list.add(new Student("eee", 15));
        list.add(new Student("fff", 16));

        env.fromCollection(list)
            .addSink(new MySQLSinkFunction())
            .disableChaining();

        env.execute("sink demo : customize mysql obj");
    }
}
  1. 在flink web页面提交任务,并设置任务类:

EnauQfN.png!mobile

  1. 任务完成后,DAG图显示任务和记录数都符合预期:

n6RRbyQ.png!mobile

  1. 去检查数据库,发现数据已写入:

jmqEner.png!mobile

至此,自定义sink的实战已经完成,希望本文能给您一些参考;

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...

https://github.com/zq2599/blog_demos

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK