7

Spark SQL 字段血缘在 vivo 互联网的实践

 2 years ago
source link: https://www.51cto.com/article/707062.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者:vivo互联网服务器团队-Hao Guangshi

字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?

有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。

平台计划将 Hive 任务迁移到 Spark SQL 上,同时也需要实现字段血缘的功能。

二、前期调研

开发前我们做了很多相关调研,从中得知 Spark 是支持扩展的:允许用户对 Spark SQL 的 SQL 解析、逻辑计划的分析和检查、逻辑计划的优化、物理计划的形成等进行扩展。

该方案可行,且对 Spark 的源码没有改动,代价也比较小,确定使用该方案。

三、Spark SQL 扩展

3.1 Spark 可扩展的内容

SparkSessionExtensions是比较重要的一个类,其中定义了注入规则的方法,现在支持以下内容:

  • 【Analyzer Rules】逻辑计划分析规则
  • 【Check Analysis Rules】逻辑计划检查规则
  • 【Optimizer Rules.】 逻辑计划优化规则
  • 【Planning Strategies】形成物理计划的策略
  • 【Customized Parser】自定义的sql解析器
  • 【(External) Catalog listeners catalog】监听器

在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。

而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。

3.2 实现自己的扩展

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
  override def apply(spark: SparkSessionExtensions): Unit = {

    //字段血缘
    spark.injectCheckRule(FieldLineageCheckRuleV3)

    //sql解析器
    spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }

  }
}

上面按照这种方式实现扩展,并在 apply 方法中把自己需要的规则注入到 SparkSessionExtensions 即可,除了以上四种可以注入的以外还有其他的规则。要让 ExtralSparkExtension 起到作用的话我们需要在spark-default.conf下配置

spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension

在启动 Spark 任务的时候即可生效。

注意到我们也实现了一个自定义的SQL解析器,其实该解析器并没有做太多的事情。只是在判断如果该语句包含insert的时候就将 SQLText(SQL语句)设置到一个为FIELD_LINE_AGE_SQL,之所以将SQLText放到FIELD_LINE_AGE_SQL里面。因为在 DheckRule 里面是拿不到SparkPlan的我们需要对SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的实现也特别简单,重要的在另一个线程实现里面。

这里我们只关注了insert语句,因为插入语句里面有从某些个表里面输入然后写入到某个表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{

  override def parsePlan(sqlText: String): LogicalPlan = {
    val lineAgeEnabled = SparkSession.getActiveSession
      .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
    logDebug(s"SqlText: $sqlText")
    if(sqlText.toLowerCase().contains("insert")){
      if(lineAgeEnabled){
        if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
          //线程本地变量在这里
          FIELD_LINE_AGE_SQL.set(sqlText)
        }
        FIELD_LINE_AGE_SQL_COULD_SET.remove()
      }
    }
    delegate.parsePlan(sqlText)
  }
  //调用原始的sqlparser
  override def parseExpression(sqlText: String): Expression = {
    delegate.parseExpression(sqlText)
  }
  //调用原始的sqlparser
  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
    delegate.parseTableIdentifier(sqlText)
  }
  //调用原始的sqlparser
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
    delegate.parseFunctionIdentifier(sqlText)
  }
  //调用原始的sqlparser
  override def parseTableSchema(sqlText: String): StructType = {
    delegate.parseTableSchema(sqlText)
  }
  //调用原始的sqlparser
  override def parseDataType(sqlText: String): DataType = {
    delegate.parseDataType(sqlText)
  }
}

3.3 扩展的规则类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {

  val executor: ThreadPoolExecutor =
    ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

  override def apply(plan: LogicalPlan): Unit = {
    val sql = FIELD_LINE_AGE_SQL.get
    FIELD_LINE_AGE_SQL.remove()
    if(sql != null){
      //这里我们拿到sql然后启动一个线程做剩余的解析任务
      val task = new FieldLineageRunnableV3(sparkSession,sql)
      executor.execute(task)
    }

  }
}

很简单,我们只是拿到了 SQL 然后便启动了一个线程去得到 SparkPlan,实际逻辑在

FieldLineageRunnableV3。

3.4 具体的实现方法

3.4.1 得到 SparkPlan

我们在 run 方法中得到 SparkPlan:


override def run(): Unit = {
  val parser = sparkSession.sessionState.sqlParser
  val analyzer = sparkSession.sessionState.analyzer
  val optimizer = sparkSession.sessionState.optimizer
  val planner = sparkSession.sessionState.planner
      ............
  val newPlan = parser.parsePlan(sql)
  PASS_TABLE_AUTH.set(true)
  val analyzedPlan = analyzer.executeAndCheck(newPlan)

  val optimizerPlan = optimizer.execute(analyzedPlan)
  //得到sparkPlan
  val sparkPlan = planner.plan(optimizerPlan).next()
  ...............
if(targetTable != null){
  val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
  val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
  //projection
  projectionLineAge(levelProject, sparkPlan.child)
  //predication
  predicationLineAge(predicates, sparkPlan.child)
  ...............

为什么要使用 SparkPlan 呢?当初我们考虑的时候,物理计划拿取字段关系的时候是比较准的,且链路比较短也更直接。

在这里补充一下 Spark SQL 解析的过程如下:

994b14663aa7efd4c84215c7d9efa71b9cd5fa.png

经过SqlParser后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过Analyzer会分析一些绑定信息,例如表验证、字段信息、函数信息;经过Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是RBO,当然 Spark 还支持CBO的优化;经过SparkPlanner后就成了可执行的物理计划。

我们看一个逻辑计划与物理计划对比的例子:

一个 SQL 语句:

select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

逻辑计划是这样的:

61e74931494ba47bc1e7922401b38a5b437b84.jpg

物理计划是这样的:

49cae1e871fa13fc2366472309574d82120fa6.jpg

显然简化了很多。

得到 SparkPlan 后,我们就可以根据不同的SparkPlan节点做迭代处理。

我们将字段血缘分为两种类型:projection(select查询字段)、predication(wehre查询条件)。

这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。

想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

e9a9de5819d51ca6259615516b9d83a5cf0417.jpg

那么我们迭代查询的结果应该为

id ->tab1.id ,
name->tab1.name,tabb2.name,
age→tabb2.age。

注意到有该变量

val levelProject = new ArrayBuffer
[ArrayBuffer[NameExpressionHolder]](),通过projecti-onLineAge 迭代后 levelProject 
存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。

例子及效果:

SQL:


with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
   select C.id,concat(C.name,B.name) as name, B.age from
     B,C where C.id = B.id

{
  "edges": [
    {
      "sources": [
        3
      ],
      "targets": [
        0
      ],
      "expression": "id",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        4,
        7
      ],
      "targets": [
        1
      ],
      "expression": "name",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        5
      ],
      "targets": [
        2
      ],
      "expression": "age",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        6,
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "INNER",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        6,
        5
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
      "edgeType": "PREDICATE"
    }
  ],
  "vertices": [
    {
      "id": 0,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.id"
    },
    {
      "id": 1,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.name"
    },
    {
      "id": 2,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.age"
    },
    {
      "id": 3,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.id"
    },
    {
      "id": 4,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.name"
    },
    {
      "id": 5,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.age"
    },
    {
      "id": 6,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.id"
    },
    {
      "id": 7,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.name"
    }
  ]
}

在 Spark SQL 的字段血缘实现中,我们通过其自扩展,首先拿到了 insert 语句,在我们自己的检查规则中拿到

SQL 语句,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终得到了物理计划。

我们通过迭代物理计划,根据不同执行计划做对应的转换,然后就得到了字段之间的对应关系。当前的实现是比较简单的,字段之间是直线的对应关系,中间过程被忽略,如果想实现字段的转换的整个过程也是没有问题的。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK