

nifi从入门到实战(保姆级教程)——flow - ilyyin
source link: https://www.cnblogs.com/ilyyin/p/16439338.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nifi从入门到实战(保姆级教程)——flow - ilyyin - 博客园
posts - 7,comments - 16,views -
4991
本文章首发于博客园,转载请标明出处
经过前两篇文章(环境篇,身份验证),我们已经有了nifi可以运行的基础,今天就来实现一个案例吧。
假设我们要从ftp上获取一个zip包,里面有两个csv文件,一个是manufacture.csv,一个是brand.csv.然后要把这两个文件导入到sqlserver数据库中。其中brand是manufacture的下一级,但是brand里没有manufacture的主键,必须要通过一些关键字段的匹配来找出它们。
在实现这个场景之前,我们需要认识一下nifi中的几个重要组件。
Processor : 主要用来处理flowfile,也就是我们的数据。nifi提供了上百个不同功能的processor,一般的需求都能满足。当然它也支持自定义processor,需要用java自行开发。
Processor Group :简单地理解就是把processor的流程组合成一个整体。只有Processor Group有version,所以它对于后续流程的迁移很重要。
Input Port,Output Port : 这两个主要是用于联接group.
有这些了解后就开始吧!
先看看流程的整体吧

- 首先拖拽一个group在画布中,并为这个group命名为Import,如下图双击group进入。再建一个group,命名为getfiles.这个group主要负责从ftp上获取文件,并解压。GetFTP:主要填以下几个属性。Delete Origianl默认为true,会删除ftp上的文件,所以最好设置为false.类似的Processor还有getfile,使用时一定要注意。
因为我们获取的是一个zip包,所以需要解压。这个比较简单,默认就行了。如果压缩文件有密码,设置一下password属性就好了。接下来就有点复杂了。因为我们的manufacture和brand是要进不同的表,所以就要路由了。这里就要用到route的processor,我用的是RouteText,也可以用RouteOnAttribute,只是一些设置不同。后面我也会用到。添加了两个路由属性:fabricantes,modelos.这个名字你可以随便取。如果filename包含manufacture就走fabricantes分支,包含brand就走modelos分支。
后面我做了一个延时,大家可以根据实际情况自由选择。这里我也介绍一下。先用UpdateAttribute添加一个属性delay,值为当前时间加20s.再用RouteOnAttribute来在规定时间内死循环,直到当前时间大于规定时间。
最后用两个output port结束当前group. - 将brand的数据存储到SQL SERVER的一张临时表里。建立一个group,名为tmp_barnd.这个group一开始必须是input port,用于接收上一个group传出的数据。
SplitRecord:这里用到两个controller service: CSVReader,JsonRecordSetWriter.根据实际情况修改一下相应属性。我觉得比较重要的是Value Separator(默认是","但是很自定义的csv可能是";""),Character Set(默认是UTF-8,比如我的文档里有特殊符号,用的是ISO-8859-1)。因为是进数据库,所以为了防止SQL注入,需要先做一些准备工作。
经过上一步,数据已经被拆分成一条条的json,现在就用EvaluateJsonPath提取相应的字段再用UpateAttribute组装成Sql语句需要的参数。关于sql.args.[*].type的值,请参考java.sql.Types最后就是执行SQL语句了。这里有很多选择,可以用PutSQL,ExcuteSQL等。SQL Statement是这样的:
点击查看代码
if not EXISTS (SELECT 1 FROM tmp_modelos
WHERE MODELO=? AND FABRICANTE=? AND DESCRIPCION=? AND TIPO_VEHICULO=?)
INSERT INTO tmp_modelos (MODELO, FABRICANTE, DESCRIPCION, DESCRIPCION_ADDICIONAL, TIPO_VEHICULO) VALUES (?, ?, ?, ?, ?);
?代表参数,有多少?,sql.args属性就相对有几个,否则执行时会报参数不匹配。
DBCPConnectionPool的设置如下:

整个流程上要用的processor和controller service差不多就是上面这些,剩下的就是大家按需求组合了。
我剩下两个group里的流程是这样的。


还有一个很重要的,就是nifi所用的表达式,大家可以参考一下官方文档
好了,至此,我们的流程就已经画完了。接下来就是运行调试了。下篇再见!
Recommend
-
53
套路虽然要有,但一定也要用心。 我曾经当过社区运营,当时手里也有些优质的KOL,一直为我产出不同的内容,组里的同事也问过我:为什么你的KOL这么死忠(原话如此,死忠确实是有点过),达人系列的文章分享一下我在达人方面的经历和理解。 今天主要分享找达人时候...
-
47
README.md
-
23
写在前面 这是NLP保姆级教程的第二篇----基于RNN的文本分类实现(Text RNN) 参考的的论文是来自2016年复旦大学IJCAI上的发表的关于循环神经网络在多任务文本分类上的应用:Recurrent Ne...
-
19
Overview Apache NiFi is a visual data flow based system which performs data routing, transformation and system mediation logic on data between sources or endpoints. NiFi was developed originally by the US National Security Ag...
-
15
Custom NiFi Load Balancing Processor
-
11
Timothy Spann Posted on Nov 17 ...
-
6
nifi从入门到实战(保姆级教程)——身份认证 - ilyyin - 博客园 强少爷的小胖纸
-
10
Dockerfile 是一个用来构建镜像的文本文件,文本内容包含了一条条构建镜像所需的指令和说明。...
-
8
万字保姆级教程!Stable Diffusion完整入门指南
-
12
In an era where environmental awareness is paramount, the need for accurate and timely air quality data is crucial. One key pollutant that demands attention is PM2.5, referring to fine particulate matter with a diameter of 2.5 micrometers or small...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK