2

dolphinscheduler简单任务定义及复杂的跨节点传参 - funnyZpC

 1 year ago
source link: https://www.cnblogs.com/funnyzpc/p/16395094.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

dolphinscheduler简单任务定义及跨节点传参

转载请注明出处 https://www.cnblogs.com/funnyzpc/p/16395094.html

dolphinscheduler是一款非常不错的调度工具,本文我就简称ds啦,可单机可集群可容器,可调度sql存储过程http大数据,也可使用shellpythonjavaflink等语言及工具,功能强大类型丰富,适合各类调度型任务,社区及项目也十分活跃,现在github中已有8.2k的star👍
所以,本篇博文开始会逐步讲一些ds相关的东西,也期待各位同行能接触到此并能实际解决一些生产上的问题~😁

一.准备工作

阅读本博文前建议您先阅读下官方的文档[https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(虽然也会碰到一些坑😂)这里,先准备下sql表资源,以下为`postgresql`的`sql`脚本:

(表结构)

CREATE TABLE dolphinscheduler.tmp ( id int4 NOT NULL, "name" varchar(50) NULL, "label" varchar(50) NULL, update_time timestamp NULL, score int4 NULL, CONSTRAINT tmp_pkey PRIMARY KEY (id));

(表数据)

INSERT INTO tmp (id,"name","label",update_time,score) VALUES (3,'二狗子','','2022-07-06 21:49:26.872',NULL), (2,'马云云','',NULL,NULL), (1,'李思','','2022-07-05 19:54:31.880',85);

因为个人使用的postgresql的数据库,如果您是mysql或者其他数据的用户,请自行更改以上表和数据并添加到库中即可
表及数据入库,请将tmp所属的库配置到 ds后台->数据源中心->创建数据源 ,以下是我的配置,记住,这里面的所有数据库配置均遵守所属数据库类型的jdbcdriver的配置参数,配置完成也会在ds的数据库生成一条jdbc的连接地址,这点要明白~

1161789-20220707215521638-1299391700.png

二.简单的项目创建及说明

因为`ds`的任务是配置在项目下面,所以第一步得新建一个项目,这样:`ds后台`->`项目管理`->`创建项目`,这是我创建的,请看:
1161789-20220707220358155-1354523703.png

准备完项目之后,鼠标点进去,并进入到 工作流定义菜单 页面,如下图:

1161789-20220707220558630-395503237.png

先简单到解释下ds的一点儿基本结构,首先,ds一般部署在linux服务器下,创建任务的用户需要在admin账户下创建,重要的是创建的每个工作账户需要与操作系统用户一一对应,比如你创建了一个 test 的ds账户,那ds所在的服务器也必须有一个test的账户才可行,这是ds的规则,我没法解释为什么。
每个用户下(除了admin外)所能创建的调度任务均在各自创建的项目下,每个项目又分为多个任务(工作流定义),一个任务下又可分为多个任务节点,下图为任务定义:

1161789-20220707221126031-808334809.png

ok,如果已经准备好以上步骤,下面开始定义一个简单的调度任务,继续哈~

三.简单的参数传递

先看表:

1161789-20220707215732456-1084095369.png

我们先做个简单的,比如图中,如果二狗子的本名叫:李思,需要我们取id=1name放到id=3label中,并且更新update_time

  • 1.这里第一步 在工作流定义列表,点击 创建工作流 就进入一个具体的任务(工作流)的定义,同时我们使用的是sql任务,所以就需要从左侧拖动一个sql任务到画布中(右侧空白处):

    1161789-20220707222141038-1783829749.png
    因为拖动sql任务到画布会自动弹出节点定义,上图为当前节点的一个定义,重点是:数据源sql类型sql语句,如官方所说,如果将name传递到下游,则需要在自定义参数重定义这个nameout方向 类型varchar
  • 2.因为传递到参数需要写入到表,这里我们再定义一个节点,这个节点负责接收上游传递到name,执行update时使用这个name,以下是我的定义:

    1161789-20220707225105519-1768344082.png
    看到没,这里不仅仅要注意sql类型(sql类型sql语句一一对应的,类型不能错) ,还有就是前置任务一定要选中(上面定义的)node1节点。
    另外,需要注意的是当前任务是上下游传参,所以在node2中是直接使用node1中定义的name这个参数哈
  • 3.定义完成当前任务就需要保存:点右上角保存,填写并保存后点关闭以退出定义:

    1161789-20220707223821227-1869298353.png
  • 4.因为定义的任务需要上线了才可执行,所以,在工作流定义列表先点该任务的黄色按钮(任务上线),然后才是点绿色按钮(执行任务):

    1161789-20220707224054859-137107161.png
  • 5.任务执行成功与否,具体得看任务实例,这是执行node2节点的日志:

    1161789-20220707225213545-1617828046.png

    顺带再看看数据库表是否真实成功:

    1161789-20220707225249129-438164926.png

四.复杂的跨节点传参

首先看表:

1161789-20220707225419495-498573059.png

思考一个问题:可以看到李思score85,根据score应该被评为 B(>=90的为A)并写入到label字段,该怎么办呢,如果这个分数是90分又该怎么办呢,如果根本没有score(分值) 这个任务是不是就不需要更新李思的label(评分)呢?
对于上面问题可以有一些偏门的解决方法,比如在sql中塞一个异常值,这样看似不错,不过作为调度工具建议还是在condition节点或者switch节点处理是最好的,不过就目前我用的2.0.5版本的ds对于这两类任务节点是没法接收参数的,这是一个遗憾;遂~个人觉得较好的方式是在写入节点之前增加一个判断节点,将错误抛出(没有score的)最好~,对于此,我使用了一个shell的中间节点
下面是我定义的三个节点:

  • node1节点定义:

    1161789-20220707230606125-199028968.png
  • node2节点定义:

    1161789-20220707230707540-1857391333.png

(脚本内容)

#!/bin/bashecho "=====>input param start<====="echo "id=${id}"echo "score=${score}"echo "=====>input param end<=====" id=${id}echo '${setValue(id2='$id')}' if [ "${score}" -ge "90" ];then echo '${setValue(label2=level A)}' echo "level A"elif [ "${score}" -ge "80" ];then echo '${setValue(label2=level B)}' echo "level B"elif [ "${score}" -ge "60" ];then echo '${setValue(label2=level C)}' echo "level C"elif [ "${score}" -ge "0" ];then echo '${setValue(label2=F!)}' echo "F!"else echo "NO score ,please check!" exit 1fi
  • node3节点定义:

    1161789-20220707230836378-1614084387.png
  • 看一眼结果🤓:

    1161789-20220707231029842-1430668027.png

五.中间的坑

对于复杂节点传参数也碰到一些坑,这些坑大概有这些:

  • 1.对于shell脚本不熟悉的,判断节点其实还是有一些难度的,这是很重要的一点
  • 2.node2(判断节点)不能有重复的参数,不管局部的还是node1(上一级)传递过来的,均不能重复
  • 3.因为在node2(判断节点)需要将id以及label继续往下传(to node3),这时候就需要给id以及label定义一个映射的out变量(id2label2)
  • 4.node2重新设置参数麻烦,需要在shell中重新定义变量(id2label2),同时需要在shell任务内使用拼接的方式赋值(如:echo '${setValue(id2='$id')}')
  • 5.sql类型以及不同节点下不同参数时常搞错,不是任何节点都可以接收上级节点参数,以及局部变量传递变量以及全局变量优先级区别及可能造成冲突
  • 6.ds列表传参(2.0是不可以的)很鸡肋,对于列表传参又不能在下一级节点做循环赋值,这点对于ds是有改进的空间的
  • 7.等等...

对于ds还有很多可扩展的地方(因为实际需要),所以我就做了一些二次开发😂,后面会聊...大家期待哟😚


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK