3

大数据工作流组件oozie简介

 2 years ago
source link: http://rivenzoo.github.io/2021/01/31/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%BB%84%E4%BB%B6oozie%E7%AE%80%E4%BB%8B/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

大数据工作流组件oozie简介

2021-01-31

字数统计: 1.3k字

  |   阅读时长≈ 5分钟

Oozie是一个管理 Apache Hadoop 作业的工作流调度系统。

Oozie的 workflow jobs 是由 actions 组成的 有向无环图(DAG)。

Oozie的 coordinator jobs 是由时间 (频率)和数据可用性触发的重复的 workflow jobs 。

Oozie与Hadoop生态圈的其他部分集成在一起,支持多种类型的Hadoop作业(如Java map-reduce、流式map-reduce、Pig、Hive、Sqoop和Distcp)以及特定于系统的工作(如Java程序和shell脚本),不同作业对应不同的workflow action。

Workflow:工作流,由我们需要处理的每个工作组成。

Coordinator:协调器,根据条件触发工作流执行,支持周期触发和检测数据是否准备好。

Bundle:将一堆的coordinator进行汇总处理。

16120697407602.jpg

job的组成

  • job.properties 记录了job的属性
  • workflow.xml 使用hPDL 定义任务的流程和分支
  • lib目录 用来执行具体的任务
  • coordinator.xml 定义调度策略

job.properties

KEY 含义 nameNode HDFS地址 jobTracker jobTracker(ResourceManager)地址 queueName Oozie队列(默认填写default) oozie.usr.system.libpath 是否加载用户lib目录(true/false) oozie.libpath 用户lib库所在的位置 oozie.wf.application.path Oozie流程所在hdfs地址(workflow.xml所在的地址) user.name 当前用户 oozie.coord.application.path Coordinator.xml地址(没有可以不写) oozie.bundle.application.path Bundle.xml地址(没有可以不写)

workflow.xml

16120697594642.jpg

包含控制流节点(control flow nodes)和动作节点(action nodes)

  • [控制流节点]:主要包括start、end、fork、join等,其中fork、join成对出现,在fork展开。分支,最后在join结点汇聚
    * start
    * end
  • [动作节点]:包括Hadoop任务、SSH、HTTP、EMAIL、OOZIE子任务
    * ok    --> end
    * error --> kill
    * 定义具体需要执行的job任务
    * MapReduce、shell、hive

actions 在远程系统(如Hadoop、Pig)中启动工作。在action完成时,远程系统回调Oozie通知action完成,此时Oozie将继续在workflow 中进行下一步操作。

<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf">
<start to="end"/>
<end name="end"/>
</workflow-app>

coordinator.xml

16120697726699.jpg

workflow 作业是基于常规的时间间隔(time intervals)和数据可用性(data availability)运行的。

包括controls、datasets、input-events、output-events、action节点

  • controls
元素名称 含义说明 timeout 超时时间,单位为分钟。当一个Coordinator Job启动的时候,会初始化多个Coordinator动作,timeout用来限制这个初始化过程。默认值为-1,表示永远不超时,如果为0 则总是超时。 concurrency 并发数,指多个Coordinator Job并发执行,默认值为1。 execution 配置多个Coordinator Job并发执行的策略:默认是FIFO。另外还有两种:LIFO(最新的先执行)、LAST_ONLY(只执行最新的Coordinator Job,其它的全部丢弃)。 throttle 一个Coordinator Job初始化时,允许Coordinator动作处于WAITING状态的最大数量。
  • datasets

Coordinator Job中有一个Dataset的概念,它可以为实际计算提供计算的数据,主要是指HDFS上的数据目录或文件,能够配置数据集生成的频率(Frequency)、URI模板、时间等信息

<datasets>
<include>[SHARED_DATASETS]</include>
...
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]">
<uri-template>[URI TEMPLATE]</uri-template>
</dataset>
...
</datasets>
  • input-events和output-events元素

一个Coordinator应用的输入事件指定了要执行一个Coordinator动作必须满足的输入条件,在Oozie当前版本,只支持使用dataset实例。

  • action节点定义需要运行的workflow

Coordinator 动作的状态变迁

转移前状态 转以后状态集合 WAITING READY 、 TIMEDOUT 、 KILLED READY SUBMITTED 、 KILLED SUBMITTED RUNNING 、 KILLED 、 FAILED RUNNING SUCCEEDED 、 KILLED 、 FAILED 常量表示形式 含义说明 ${coord:minutes(int n)} 返回日期时间:从一开始,周期执行n分钟 ${coord:hours(int n)} 返回日期时间:从一开始,周期执行n * 60分钟 ${coord:days(int n)} 返回日期时间:从一开始,周期执行n * 24 * 60分钟 ${coord:current(int n)} 返回日期时间:从一个Coordinator动作(Action)创建时开始计算,第n个dataset实例执行时间 ${coord:dataIn(String name)} 在输入事件(input-events)中,解析dataset实例包含的所有的URI ${coord:dataOut(String name)} 在输出事件(output-events)中,解析dataset实例包含的所有的URI ${coord:offset(int n, String timeUnit)} 表示时间偏移,如果一个Coordinator动作创建时间为T,n为正数表示向时刻T之后偏移,n为负数向向时刻T之前偏移,timeUnit表示时间单位(选项有MINUTE、HOUR、DAY、MONTH、YEAR) ${coord:nominalTime()} nominal时间等于Coordinator Job启动时间,加上多个Coordinator Job的频率所得到的日期时间。例如:start=”2009-01-01T24:00Z”,end=”2009-12-31T24:00Z”,frequency=”${coord:days(1)}”,frequency=”${coord:days(1)},则nominal时间为:2009-01-02T00:00Z、2009-01-03T00:00Z、2009-01-04T00:00Z、…、2010-01-01T00:00Z ${coord:actualTime()} Coordinator动作的实际创建时间。例如:start=”2011-05-01T24:00Z”,end=”2011-12-31T24:00Z”,frequency=”${coord:days(1)}”,则实际时间为:2011-05-01,2011-05-02,2011-05-03,…,2011-12-31 ${coord:user()} 启动当前Coordinator Job的用户名称 ${coord:dateOffset(String baseDate, int instance, String timeUnit)} 计算新的日期时间的公式:newDate = baseDate + instance * timeUnit,如:baseDate=’2009-01-01T00:00Z’,instance=’2′,timeUnit=’MONTH’,则计算得到的新的日期时间为’2009-03-01T00:00Z’。 ${coord:formatTime(String timeStamp, String format)} 格式化时间字符串,format指定模式

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK