0

nifi从入门到实战(保姆级教程)——flow - ilyyin

 2 months ago
source link: https://www.cnblogs.com/ilyyin/p/16439338.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nifi从入门到实战(保姆级教程)——flow - ilyyin - 博客园

posts - 7,comments - 16,views -

4991

本文章首发于博客园,转载请标明出处

经过前两篇文章(环境篇身份验证),我们已经有了nifi可以运行的基础,今天就来实现一个案例吧。
假设我们要从ftp上获取一个zip包,里面有两个csv文件,一个是manufacture.csv,一个是brand.csv.然后要把这两个文件导入到sqlserver数据库中。其中brand是manufacture的下一级,但是brand里没有manufacture的主键,必须要通过一些关键字段的匹配来找出它们。
在实现这个场景之前,我们需要认识一下nifi中的几个重要组件。
Processor : 主要用来处理flowfile,也就是我们的数据。nifi提供了上百个不同功能的processor,一般的需求都能满足。当然它也支持自定义processor,需要用java自行开发。
Processor Group :简单地理解就是把processor的流程组合成一个整体。只有Processor Group有version,所以它对于后续流程的迁移很重要。
Input Port,Output Port : 这两个主要是用于联接group.
有这些了解后就开始吧!
先看看流程的整体吧

image
  1. 首先拖拽一个group在画布中,并为这个group命名为Import,如下图
    image
    双击group进入。再建一个group,命名为getfiles.这个group主要负责从ftp上获取文件,并解压。
    image
    GetFTP:主要填以下几个属性。
    image
    image
    Delete Origianl默认为true,会删除ftp上的文件,所以最好设置为false.类似的Processor还有getfile,使用时一定要注意。
    因为我们获取的是一个zip包,所以需要解压。这个比较简单,默认就行了。如果压缩文件有密码,设置一下password属性就好了。
    image
    接下来就有点复杂了。因为我们的manufacture和brand是要进不同的表,所以就要路由了。这里就要用到route的processor,我用的是RouteText,也可以用RouteOnAttribute,只是一些设置不同。后面我也会用到。
    image
    添加了两个路由属性:fabricantes,modelos.这个名字你可以随便取。如果filename包含manufacture就走fabricantes分支,包含brand就走modelos分支。
    后面我做了一个延时,大家可以根据实际情况自由选择。这里我也介绍一下。
    image
    先用UpdateAttribute添加一个属性delay,值为当前时间加20s.
    image
    再用RouteOnAttribute来在规定时间内死循环,直到当前时间大于规定时间。
    最后用两个output port结束当前group.
  2. 将brand的数据存储到SQL SERVER的一张临时表里。
    601923-20220703104820912-1581318418.png
    建立一个group,名为tmp_barnd.这个group一开始必须是input port,用于接收上一个group传出的数据。
    SplitRecord:
    image
    这里用到两个controller service: CSVReader,JsonRecordSetWriter.
    image
    根据实际情况修改一下相应属性。我觉得比较重要的是Value Separator(默认是","但是很自定义的csv可能是";""),Character Set(默认是UTF-8,比如我的文档里有特殊符号,用的是ISO-8859-1)。
    image
    因为是进数据库,所以为了防止SQL注入,需要先做一些准备工作。
    经过上一步,数据已经被拆分成一条条的json,现在就用EvaluateJsonPath提取相应的字段
    image
    再用UpateAttribute组装成Sql语句需要的参数。关于sql.args.[*].type的值,请参考java.sql.Types
    image
    最后就是执行SQL语句了。这里有很多选择,可以用PutSQL,ExcuteSQL等。
    image
    SQL Statement是这样的:

点击查看代码

if not EXISTS (SELECT 1 FROM tmp_modelos
WHERE MODELO=? AND FABRICANTE=? AND DESCRIPCION=? AND TIPO_VEHICULO=?)
INSERT INTO tmp_modelos (MODELO, FABRICANTE, DESCRIPCION, DESCRIPCION_ADDICIONAL, TIPO_VEHICULO) VALUES (?, ?, ?, ?, ?);

?代表参数,有多少?,sql.args属性就相对有几个,否则执行时会报参数不匹配。
DBCPConnectionPool的设置如下:

image

整个流程上要用的processor和controller service差不多就是上面这些,剩下的就是大家按需求组合了。
我剩下两个group里的流程是这样的。

601923-20220703150729027-744712958.png
601923-20220703150921123-1100506837.png

还有一个很重要的,就是nifi所用的表达式,大家可以参考一下官方文档

好了,至此,我们的流程就已经画完了。接下来就是运行调试了。下篇再见!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK