33

​Spark Core基础面试题总结(上)

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650230000&%3Bidx=1&%3Bsn=3c01001ccc64ae0fac605e9350299e52
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击关注上方“ 知了小 ”,

设为“置顶或星标”,第一时间送达干货。

Spark Core基础面试题总结(上)

1. Spark的几种部署模式及其特点

SparkSubmit#prepareSubmitEnvironment

// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
  1. 本地模式

Spark不一定非要跑在Hadoop集群(主要是YARN),可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分为三类:

  • local 只启动一个Executor

  • local[k] 启动k个Executor

  • local[*] 启动和CPU数目相同的Executor

  1. Standalone模式

分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础。

  1. Spark on YARN

分布式部署集群,资源和任务监控交给YARN管理,Spark客户端直接连接YARN,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

  • cluster适合生产,Driver运行在集群子节点,具有容错能力

  • client适合调试,Driver运行在客户端节点

  1. Spark on K8s

最新文档:

http://spark.apache.org/docs/latest/running-on-kubernetes.html

  • Standalone 模式

Spark 运行在 Kubernetes 集群上的第一种可行方式是将 Spark 以 Standalone 模式运行,但是很快社区就提出使用 Kubernetes 原生 Scheduler 的运行模式,也就是 Native 的模式。

  • Kubernetes Native 模式

Native 模式简而言之就是将 Driver 和 Executor Pod 化,用户将之前向 YARN 提交 Spark 作业的方式提交给 Kubernetes 的 apiserver,提交命令如下:

$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar

其中 master 就是 kubernetes 的 apiserver 地址。提交之后整个作业的运行方式如下,先将 Driver 通过 Pod 启动起来,然后 Driver 会启动 Executor 的 Pod。

  • Spark Operator

Google 云平台,也就是 GCP 在 github 上面开源了 Spark 的 Operator,repo 地址:

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

这里有详细的使用文档:

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md

  1. 还有Mesos(略)

2. Driver端程序的功能是什么?

一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点。

功能:负责向集群申请资源,向Master注册信息,负责作业的调度,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler和TaskScheduler。

3. Hadoop MapReduce和Spark都是并行计算,那么他们有什么相同点和区别?

两者都是用MR模型进行并行计算,Hadoop的一个作业称为Job(在YARN上也是Application),Job里面分为MapTask和ReduceTask,每个Task都是在自己的进程中进行的,当Task结束时,进程也会结束。

Spark用户提交的任务被称为Application,一个Application对应一个SparkContext,App中存在多个Job,没触发一次Action操作就会产生一个Job。这些Job可以并行或串行执行,每个Job中有多个Stage,Stage是Shuffle过程中DAGScheduler通过RDD之间的依赖关系划分Job而来的,每个Stage里面有多个Task,组成TaskSet由TaskScheduler分发到各个Executor中执行,Executor的生命周期是和App一样的,即使没有Job运行也是存在的,所以Task可以快速启动读取内存进行计算,Spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,group by等,而且通过DAG图可以实现良好的容错。

Hadoop的job只有map和reduce操作,表达能力比较欠缺而且在MR过程中会重复的读写HDFS,造成大量IO操作,多个Job需要自己管理依赖关系。

4. Spark中的RDD

RDD:Resilient Distributed DataSet,弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
* for more details on RDD internals.
*/

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
{...}

RDD五大特性:

  • A list of partitions 一个分区列表,RDD中的数据都存在一个分区列表里面

  • A function for computing each split 作用在每一个分区中的函数

  • A list of dependencies on other RDDs 一个RDD依赖于其他多个RDD(RDD容错机制)

  • Optionally, a Partitioner for key-value RDDs KV类型的RDD

  • Optionally, a list of preferred locations to compute each split on 数据本地性,最近的数据位置

5. 宽依赖和窄依赖,groupByKey、reduceByKey、map、filter、union五种算子的宽窄依赖

  1. 窄依赖

    父RDD的每一个分区最多被一个子RDD的分区所使用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD的分区。map和filter、union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。

  2. 宽依赖

    子RDD的分区依赖于父RDD的所有分区,Shuffle类操作的结果。

Shuffle的本质就是group by,把相同类型或相同规则的数据放在一起(磁盘或网络IO进行分类)。

  1. 算子的宽窄依赖

    对RDD进行map、filter、union等transformation一般是窄依赖。

    宽依赖一般是对RDD进行groupByKey、reduceByKey等操作,就是对RDD中的partition中的数据进行重分区(Shuffle)。

    join操作既可能是宽依赖,也可能是窄依赖,当要对RDD进行join操作时,如果RDD进行过重分区则为窄依赖,否则为宽依赖。

6. Spark如何防止内存溢出?

  1. Driver端的内存溢出

可以增加Driver的内存参数:

# 默认1G
spark.driver.memory

这个参数用来设置Driver端的内存。在Spark程序中,SparkContext、DAGScheduler都是运行在Driver端的。对应RDD的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。

  1. map过程产生大量对象导致内存溢出

这种溢出的原因是在单个map中产生了大量的对象导致的,比如:rdd.map(x => for(i <-1 to 10000) yield i.toString),这个操作在RDD中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map,例如:

rdd.repartition(10000).map(x => for(i <-1 to 10000) yield i.toString)

注意 ,不能使用rdd.coalesce,这个方法只能减少分区,不能增加分区,不会有Shuffle的过程。

  1. 数据不均衡导致内存溢出

    数据不均衡,除了有可能导致内存溢出外,也有可能导致性能问题,解决方法和上面类似,就是调用repartition重新分区。

  2. Shuffle后内存溢出

    Shuffle内存溢出的情况可以说基本上都是Shuffle后,单个文件过大导致的。在Spark中,join、reduceByKey这一类的操作,都会有Shuffle的过程,在Shuffle的时候,需要传入一个分区器Partitioner,大部分Spark中的Shuffle操作,默认的Partitioner都是HashPartitioner,默认值是父RDD中最大的分区数,这个参数通过

# 只对HashPartitioner有效
spark.default.parallelism
# Spark SQL使用下面参数
spark.sql.shuffle.partitions

控制。

如果是别的Partitioner导致的Shuffle内存溢出,就需要从Partitioner的代码增加partitions数量。

  1. rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

内存不足时,rdd.cache会丢失数据,再次使用的时候会重算,StorageLevel.MEMORY_AND_DISK_SER内存不足时会存储在磁盘,避免重新计算,只是会消耗一点IO时间。

7. Stage、Task和Job的区别与划分方式

  • Job:一个由多个任务组成的并行计算,当需要执行一个RDD的Action操作的时候,会生成一个Job

  • Stage:每个Job被拆分成更小的被称作Stage(阶段)的Task(任务)组,Stage彼此之间是相互依赖的,各个Stage会按照执行顺序依次执行(Pipeline)

  • Task:一个将要发送到Executor中的工作单元。是Stage的一个任务执行单元,一般来说,一个RDD有多少个Partition,就会有多少个Task,因为每一个Task只是处理一个Partition上的数据。

Stage

private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite,
val resourceProfileId: Int
)

extends Logging {

val numPartitions = rdd.partitions.length

/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0

val name: String = callSite.shortForm
val details: String = callSite.longForm

/**
* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/

private var _latestInfo: StageInfo =
StageInfo.fromStage(this, nextAttemptId, resourceProfileId = resourceProfileId)

/**
* Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
* endless retries if a stage keeps failing.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/

val failedAttemptIds = new HashSet[Int]

private[scheduler] def clearFailures() : Unit = {
failedAttemptIds.clear()
}

/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
resourceProfileId = resourceProfileId)
nextAttemptId += 1
}

/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo

override final def hashCode(): Int = id

override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]

def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}
}

Task

private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
@transient var localProperties: Properties = new Properties,
// The default value is only used in tests.
serializedTaskMetrics: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance(
).serialize(TaskMetrics.registered).array(),

val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None,
val isBarrier: Boolean = false) extends Serializable {

@transient lazy val metrics: TaskMetrics =
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))

/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @param resources other host resources (like gpus) that this task attempt can access
* @return the result of the task along with updates of Accumulators.
*/

final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem,
resources: Map[String, ResourceInformation]): T = {
SparkEnv.get.blockManager.registerTask(taskAttemptId)
...
}
...
}

TaskSet

/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/

private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties,
val resourceProfileId: Int
)
{
val id: String = stageId + "." + stageAttemptId

override def toString: String = "TaskSet " + id
}

8. Spark提交作业参数

源码在spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

在提交任务时的几个重要参数:

  • executor-cores 每个executor使用的内核数,默认为1 建议2-5个;一般可以设置4个

Spark standalone, YARN and Kubernetes only:
--executor-cores NUM Number of cores used by each executor. (Default: 1 in YARN and K8S modes, or all available cores on the worker in standalone mode).

源码里有个测试类:

spark/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

"--executor-cores", "5",
  • num-executors 启动executors的数量,默认为2

Spark on YARN and Kubernetes only:
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of executors will be at least NUM.
  • executor-memory executor的内存大小,默认1GB

--executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G).
  • driver-cores Driver端使用的内核数,默认为1

Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
  • driver-memory Driver端使用的内存大小,默认512MB

--driver-memory MEM  Memory for driver (e.g. 1000M, 2G) (Default: ${mem_mb}M).

例如提交任务On YARN:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-cores 2 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 2 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10

9. Spark中reduceByKey VS groupByKey区别与用法

reduceByKey 用于对每个Key对应的多个Value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

JavaPairRDD#reduceByKey#

similarly to a "combiner" in MapReduce.

/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/

def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(partitioner, func))

groupByKey 也是对每个Key对应的多个Value进行操作,但是只是汇总生成一个Sequence,本身不能自定义函数,只能通过额外的map(func)来实现。

/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*
* @note If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
* will provide much better performance.
*/

def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*
* @note If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
* will provide much better performance.
*/

def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

在大的数据集上,reduceByKey(func)的效果比groupByKey()的效果更好一些。因为reduceByKey()会在Shuffle之前对数据进行合并,传输速度优于groupByKey(网络IO)。

10. foreach和map的区别

先看源码,foreach是RDD中Actions里的第一个方法:

Actions (launch a job to return a value to the user program)

// Actions (launch a job to return a value to the user program)

/**
* Applies a function f to all elements of this RDD.
*/

def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

map则是RDD中Transformations里的第一个方法:

Transformations (return a new RDD)

// Transformations (return a new RDD)

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

两个方法的共同点:都是用于遍历集合对象,并对每一项执行指定的方法。

两者的差异:

  • foreach没有返回值(准确的说是返回void),map返回集合对象。foreach用于遍历集合,而map用于映射(转换)集合到另一个集合。

  • foreach中的处理逻辑是串行的,map中的处理逻辑是并行的。

  • map是Transformation算子,foreach是Action算子。

11. map与mapPartitions的区别

mapPartitions:

/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

相同点:map与mapPartitions都属于Transformation算子

区别:

  1. 本质

  • map是对RDD中的每一个元素进行操作

  • mapPartitions则是对RDD中的每个分区的迭代器进行操作

  1. RDD中的每个分区数据量不大的情形

  • map操作性能低下。比如一个partition中有一万条数据,那么在分析每个分区时,function要执行和计算1万次。

  • mapPartitions性能高。使用mapPartitions操作之后,一个Task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。

  1. RDD中的每个分区数据量超大的情形,比如一个Partition有100万条数据。

  • map能正常执行完。

  • mapPartitions一次传入一个function后,可能一下子内存不够用,造成OOM(内存溢出)。

12. foreach和foreachPartition的区别

相同:foreach和foreachPartition都属于Action算子

区别:

  • foreach每次处理RDD中的一条数据

  • foreachPartition每次处理RDD中每个分区的迭代器中的数据

/**
* Applies a function f to each partition of this RDD.
*/

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

13. groupByKey、reduceByKey、combineByKey的区别

上面第9已经对Spark中reduceByKey VS groupByKey区别与用法做了说明。

  1. groupByKey

  • 用于对每个Key进行操作,将结果生成一个Sequence

  • groupByKey本身不能自定义函数

  • 会将所有键值对进行移动,不会进行局部merge

  • 会导致集群节点之间的开销很大,导致传输延时

  1. reduceByKey

  • 用于对每个Key对应的多个Value进行merge操作

  • 该算子能在本地先进性merge操作

  • merge操作可以通过函数进行自定义

  1. combineByKey

  • combineByKey是一个比较底层的算子

  • reduceByKey底层就是使用了combineByKey,准确一点是combineByKeyWithClassTag

14. sortByKey这个算子是全局排序吗?

sortByKey是全局排序。RDD#sortBy

/**
* Return this RDD sorted by the given key function.
*/

def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}

OrderedRDDFunctions#sortByKey

/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/

// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
  1. 在sortByKey之前将数据使用Partitioner根据数据范围来分(keyBy)。

  2. 使得p1分区所有的数据小于p2,p2分区所有的数据小于p3,依次类推。(p1~pn是分区标识)。

  3. 然后,使用sortByKey算子针对每一个Partition进行排序,这样全局的数据就被排序了。

15. Spark中coalesce VS repartition

先看源码:

coalesce shuffle: Boolean = false

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.
*/

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}

repartition shuffle = true

/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

通常认为coalesce不产生Shuffle会比repartition产生Shuffle效率高,而实际情况往往要根据具体问题具体分析,coalesce效率不一定高,有时还可能有大坑,所以还是要慎用。

两个算子都是对RDD的分区进行重新划分,repartition调用了coalesce,把默认为false的shuffle参数置为了true。

假设有一个RDD有N个分区,需要重新划分为M个分区:

  • 如果N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

  • 如果N>M,并且N和M相差不多,(假如N是1000,M是100),那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuffle设置为false,如果M>N时,coalesce是无效的,不进行Shuffle过程,父RDD和子RDD之间是窄依赖关系,无法使文件数partitions变多。总之如果shuffle为false时,传入的参数大于现有的分区数目,RDD的分区数将保持不变。也就是说 不经过Shuffle,是无法将RDD的分区数变多的

  • 如果N>M,并且N和M相差很大很大,这是要看executor数量与要生成的partition的关系。如果executor数 <= 要生成的partition数,coalesce效率高,反之如果用coalesce可能会导致(executor数 - 要生成的partition数)个executor空跑从而降低效率。如果在M为1的时候,为了使得coalesce之前的操作有更好的并行度,可以将shuffle设置为true。

猜你喜欢:

Spark技术栈-Scala

数据中台实战系列笔记

浅谈OLAP系统核心技术点(建议收藏)

HBase基础面试题总结

Hive基础面试题总结

MapReduce和YARN基础面试题总结

HDFS基础面试题总结

nIzeUzu.png!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK