63

想要理解 spark RDD 就自己写一个

 6 years ago
source link: http://mp.weixin.qq.com/s/1yNWykDCJifmXcARDaDxbQ
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

想要理解 spark RDD 就自己写一个

Original 孙彪彪 一生数据人 2017-12-27 11:12 Posted on

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载

我们都知道, RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。

我举个简单的例子

val textFile = sc.textFile("hdfs://...")
      val counts = textFile.flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
      counts.collect()

这个例子从hdfs上读取文件, 然后每行按照空格分割成单词, 然后reduce出每个单词的数目, 然后把结果保存到hdfs, 我们来看下这段代码经过 DAG 划分究竟变成什么样

调用 spark 框架的算子, spark 在后面后给你生成一个个 RDD,  然后根据宽依赖的窄依赖,会划分为不同的stage, 像是用篱笆隔开了一样, 如果中间有宽依赖,就用刀切一刀,我们例子中 reduceByKey 算子对应的ShuffledRDD 的 Dependency 就是 ShuffleDependency, 会被隔开,生成两个 stage,中间经过 shuffle write, shuffle read 过程连接。

对于划分出来的每个 stage 都抽象为一个  TaskSet任务集 交给  TaskScheduler 来进行进一步的调度运行。

我们来看一张图, 来理清里面的概念, 我们用户编程使用的 RDD, 每个 RDD都有一个分区数,  这个分区数目创建 RDD 的时候有一个初始值,运行过程中,根据配置的 parallelism 参数 和 shuffle 过程中显示指定的分区数目 来调整个数。

我们可以看到, 一个 task 对应一个 stage 里面一个分区数据的处理任务,  task 又分为 ShuffleMapTask (中间阶段的任务) 和 ResultTask(最后一个阶段的任务)。

最后 的 collect 操作会把 ResultTask 生成的结果从 executor 拉到 driver 上(准确来讲应该是 executor 上报给 driver)返还给用户数据集

我们上面举的例子中就涉及到一个 RDD 中 compute 和 getPartitions 方法,   getPartitions 这个方法返回一个分区数据的包装对象, 可以根据这个对象定位到当前分区的数据,一个 stage 内的一个分区对应一个 task, 这个 task 调度到 executor 上运行的时候会根据这个对象取到当前task 需要处理的数据

compute 方法就是当前 RDD 的计算逻辑, 比如 textFile.flatMap(line => line.split(" ")) 这个算子会生成一个 MapPartitonRDD,

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  ...

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  ...
}

这个 MapPartitionRDD 的 compute 方法其实就是包装的 你传入的 line => line.split(" ") 函数,

但是要注意了, 每个RDD的一个分区并不对应实际调度运行的task, 对于 map 操作,实际上是组织成一个  计算 pipeline, 假如你定义 RDD.map(f1).map(f2).map(f3), 实际上会生成三个  MapPartitionRDD,如果每个 RDD 的每个分区都生成一个 task去调度运行,这中间的调度和数据传输是不能容忍的, spark 框架会帮你处理成一个 task, 这个 task 的 运算逻辑是f3(f2(f1(Iterator[U])))。

一个 task 对应一个 stage 里面一个分区数据的处理任务, stage 的划分具体可以参考 我之前的文章

对 spark 中 DAGScheduler 阶段划分的一次探索

一般来说,自定义一个RDD最核心涉及的两个需要重载是 compute 和 getPartitions。前者是从RDD中取值的方法,后者是RDD如何存放数据的方法。

接下来我们定义一个数据源的RDD, 这个 RDD 模拟提供一个日志文件中的数据,划分多个分区, 每个分区处理部分行数据, 这个RDD 的计算逻辑就是提供日志数据, 提供一个数据的迭代器

package org.apache.spark.rdd

import org.apache.spark.SparkContext
import  org.apache.spark.Partition
import  org.apache.spark.TaskContext

class linePartition(idx: Int, val eles: Array[String])  extends Partition {
  override def index: Int = idx
}

class StringGenerateRDD (
                                       sc: SparkContext,
                                       str: String ,
                                       numPartitions: Int
                                     ) extends RDD[String](sc, Nil) {

  val stringSource = new StringBuilder()
  val currentOffset = 0L;

  override  def getPartitions : Array[Partition] =
   {
    val splitStr = str.split("\n");
    val  array = new Array[Partition](numPartitions)
    for (i <- 0 until  numPartitions) {
        val start = ((i * splitStr.length) / numPartitions).toInt
        val end = (((i + 1) * splitStr.length) / numPartitions).toInt
        array(i) = new linePartition(i,   splitStr.slice(start, end))
    }

    array
  }

  override  def compute(split: Partition, context: TaskContext): Iterator[String] = {

    val eles = split.asInstanceOf[linePartition].eles
    eles.toIterator
  }
}

下面我们写一个例子来使用这个RDD, 并且统计单词个数

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.StringGenerateRDD

/**
  * Created by sunbiaobiao on 12/20/17.
  */
object StringGenerateRDDTest  extends  App {
  val conf = new SparkConf().setAppName("StringGenerateRDDTest")
  val sc = new SparkContext(conf)

  val  rdd = new StringGenerateRDD(sc,"sun biao biao\n sun biao biao\nsun biao biao\n sun biao biao\nsun biao biao\n sun biao biao\nsun biao biao\n sun biao biao\nsun biao biao\n sun biao biao\n", 3)

  rdd.flatMap(l => l.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).collect().foreach(println)

}

怎么样,是不是很简单,分分钟写一个, 如果你碰到 spark 没有现有的RDD 不能满足你的需求的情况,  就可以轻松扩展一个。

欢迎关注 spark技术分享 

                

Image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK