​Spark Core基础面试题总结(上)

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650230000&%3Bidx=1&%3Bsn=3c01001ccc64ae0fac605e9350299e52
Spark Core基础面试题总结(上)

1. Spark的几种部署模式及其特点


// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
  1. 本地模式


  • local 只启动一个Executor

  • local[k] 启动k个Executor

  • local[*] 启动和CPU数目相同的Executor

  1. Standalone模式


  1. Spark on YARN


  • cluster适合生产,Driver运行在集群子节点,具有容错能力

  • client适合调试,Driver运行在客户端节点

  1. Spark on K8s



  • Standalone 模式

Spark 运行在 Kubernetes 集群上的第一种可行方式是将 Spark 以 Standalone 模式运行,但是很快社区就提出使用 Kubernetes 原生 Scheduler 的运行模式,也就是 Native 的模式。

  • Kubernetes Native 模式

Native 模式简而言之就是将 Driver 和 Executor Pod 化,用户将之前向 YARN 提交 Spark 作业的方式提交给 Kubernetes 的 apiserver,提交命令如下:

$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \

其中 master 就是 kubernetes 的 apiserver 地址。提交之后整个作业的运行方式如下,先将 Driver 通过 Pod 启动起来,然后 Driver 会启动 Executor 的 Pod。

  • Spark Operator

Google 云平台,也就是 GCP 在 github 上面开源了 Spark 的 Operator,repo 地址:




  1. 还有Mesos(略)

2. Driver端程序的功能是什么?



3. Hadoop MapReduce和Spark都是并行计算,那么他们有什么相同点和区别?


Spark用户提交的任务被称为Application,一个Application对应一个SparkContext,App中存在多个Job,没触发一次Action操作就会产生一个Job。这些Job可以并行或串行执行,每个Job中有多个Stage,Stage是Shuffle过程中DAGScheduler通过RDD之间的依赖关系划分Job而来的,每个Stage里面有多个Task,组成TaskSet由TaskScheduler分发到各个Executor中执行,Executor的生命周期是和App一样的,即使没有Job运行也是存在的,所以Task可以快速启动读取内存进行计算,Spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,group by等,而且通过DAG图可以实现良好的容错。


4. Spark中的RDD

RDD:Resilient Distributed DataSet,弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
* Internally, each RDD is characterized by five main properties:
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
* for more details on RDD internals.

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging


  • A list of partitions 一个分区列表,RDD中的数据都存在一个分区列表里面

  • A function for computing each split 作用在每一个分区中的函数

  • A list of dependencies on other RDDs 一个RDD依赖于其他多个RDD(RDD容错机制)

  • Optionally, a Partitioner for key-value RDDs KV类型的RDD

  • Optionally, a list of preferred locations to compute each split on 数据本地性,最近的数据位置

5. 宽依赖和窄依赖,groupByKey、reduceByKey、map、filter、union五种算子的宽窄依赖

  1. 窄依赖


  2. 宽依赖


Shuffle的本质就是group by,把相同类型或相同规则的数据放在一起(磁盘或网络IO进行分类)。

  1. 算子的宽窄依赖




6. Spark如何防止内存溢出?

  1. Driver端的内存溢出


# 默认1G


  1. map过程产生大量对象导致内存溢出

这种溢出的原因是在单个map中产生了大量的对象导致的,比如:rdd.map(x => for(i <-1 to 10000) yield i.toString),这个操作在RDD中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map,例如:

rdd.repartition(10000).map(x => for(i <-1 to 10000) yield i.toString)

注意 ,不能使用rdd.coalesce,这个方法只能减少分区,不能增加分区,不会有Shuffle的过程。

  1. 数据不均衡导致内存溢出


  2. Shuffle后内存溢出


# 只对HashPartitioner有效
# Spark SQL使用下面参数



  1. rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()


7. Stage、Task和Job的区别与划分方式

  • Job:一个由多个任务组成的并行计算,当需要执行一个RDD的Action操作的时候,会生成一个Job

  • Stage:每个Job被拆分成更小的被称作Stage(阶段)的Task(任务)组,Stage彼此之间是相互依赖的,各个Stage会按照执行顺序依次执行(Pipeline)

  • Task:一个将要发送到Executor中的工作单元。是Stage的一个任务执行单元,一般来说,一个RDD有多少个Partition,就会有多少个Task,因为每一个Task只是处理一个Partition上的数据。


private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite,
val resourceProfileId: Int

extends Logging {

val numPartitions = rdd.partitions.length

/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0

val name: String = callSite.shortForm
val details: String = callSite.longForm

* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).

private var _latestInfo: StageInfo =
StageInfo.fromStage(this, nextAttemptId, resourceProfileId = resourceProfileId)

* Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
* endless retries if a stage keeps failing.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).

val failedAttemptIds = new HashSet[Int]

private[scheduler] def clearFailures() : Unit = {

/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
resourceProfileId = resourceProfileId)
nextAttemptId += 1

/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo

override final def hashCode(): Int = id

override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]

def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE


private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
@transient var localProperties: Properties = new Properties,
// The default value is only used in tests.
serializedTaskMetrics: Array[Byte] =

val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None,
val isBarrier: Boolean = false) extends Serializable {

@transient lazy val metrics: TaskMetrics =

* Called by [[org.apache.spark.executor.Executor]] to run this task.
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @param resources other host resources (like gpus) that this task attempt can access
* @return the result of the task along with updates of Accumulators.

final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem,
resources: Map[String, ResourceInformation]): T = {


* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.

private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties,
val resourceProfileId: Int
val id: String = stageId + "." + stageAttemptId

override def toString: String = "TaskSet " + id

8. Spark提交作业参数



  • executor-cores 每个executor使用的内核数,默认为1 建议2-5个;一般可以设置4个

Spark standalone, YARN and Kubernetes only:
--executor-cores NUM Number of cores used by each executor. (Default: 1 in YARN and K8S modes, or all available cores on the worker in standalone mode).



"--executor-cores", "5",
  • num-executors 启动executors的数量,默认为2

Spark on YARN and Kubernetes only:
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of executors will be at least NUM.
  • executor-memory executor的内存大小,默认1GB

--executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G).
  • driver-cores Driver端使用的内核数,默认为1

Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
  • driver-memory Driver端使用的内存大小,默认512MB

--driver-memory MEM  Memory for driver (e.g. 1000M, 2G) (Default: ${mem_mb}M).

例如提交任务On YARN:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-cores 2 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 2 \
--queue thequeue \
examples/jars/spark-examples*.jar \

9. Spark中reduceByKey VS groupByKey区别与用法

reduceByKey 用于对每个Key对应的多个Value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。


similarly to a "combiner" in MapReduce.

* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.

def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(partitioner, func))

groupByKey 也是对每个Key对应的多个Value进行操作,但是只是汇总生成一个Sequence,本身不能自定义函数,只能通过额外的map(func)来实现。

* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* @note If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
* will provide much better performance.

def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =

* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
* @note If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
* will provide much better performance.

def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =


10. foreach和map的区别


Actions (launch a job to return a value to the user program)

// Actions (launch a job to return a value to the user program)

* Applies a function f to all elements of this RDD.

def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))


Transformations (return a new RDD)

// Transformations (return a new RDD)

* Return a new RDD by applying a function to all elements of this RDD.

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))



  • foreach没有返回值(准确的说是返回void),map返回集合对象。foreach用于遍历集合,而map用于映射(转换)集合到另一个集合。

  • foreach中的处理逻辑是串行的,map中的处理逻辑是并行的。

  • map是Transformation算子,foreach是Action算子。

11. map与mapPartitions的区别


* Return a new RDD by applying a function to each partition of this RDD.
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),



  1. 本质

  • map是对RDD中的每一个元素进行操作

  • mapPartitions则是对RDD中的每个分区的迭代器进行操作

  1. RDD中的每个分区数据量不大的情形

  • map操作性能低下。比如一个partition中有一万条数据,那么在分析每个分区时,function要执行和计算1万次。

  • mapPartitions性能高。使用mapPartitions操作之后,一个Task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。

  1. RDD中的每个分区数据量超大的情形,比如一个Partition有100万条数据。

  • map能正常执行完。

  • mapPartitions一次传入一个function后,可能一下子内存不够用,造成OOM(内存溢出)。

12. foreach和foreachPartition的区别



  • foreach每次处理RDD中的一条数据

  • foreachPartition每次处理RDD中每个分区的迭代器中的数据

* Applies a function f to each partition of this RDD.

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))

13. groupByKey、reduceByKey、combineByKey的区别

上面第9已经对Spark中reduceByKey VS groupByKey区别与用法做了说明。

  1. groupByKey

  • 用于对每个Key进行操作,将结果生成一个Sequence

  • groupByKey本身不能自定义函数

  • 会将所有键值对进行移动,不会进行局部merge

  • 会导致集群节点之间的开销很大,导致传输延时

  1. reduceByKey

  • 用于对每个Key对应的多个Value进行merge操作

  • 该算子能在本地先进性merge操作

  • merge操作可以通过函数进行自定义

  1. combineByKey

  • combineByKey是一个比较底层的算子

  • reduceByKey底层就是使用了combineByKey,准确一点是combineByKeyWithClassTag

14. sortByKey这个算子是全局排序吗?


* Return this RDD sorted by the given key function.

def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
.sortByKey(ascending, numPartitions)


* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).

// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
  1. 在sortByKey之前将数据使用Partitioner根据数据范围来分(keyBy)。

  2. 使得p1分区所有的数据小于p2,p2分区所有的数据小于p3,依次类推。(p1~pn是分区标识)。

  3. 然后,使用sortByKey算子针对每一个Partition进行排序,这样全局的数据就被排序了。

15. Spark中coalesce VS repartition


coalesce shuffle: Boolean = false

* Return a new RDD that is reduced into `numPartitions` partitions.
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)

repartition shuffle = true

* Return a new RDD that has exactly numPartitions partitions.
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)




  • 如果N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

  • 如果N>M,并且N和M相差不多,(假如N是1000,M是100),那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuffle设置为false,如果M>N时,coalesce是无效的,不进行Shuffle过程,父RDD和子RDD之间是窄依赖关系,无法使文件数partitions变多。总之如果shuffle为false时,传入的参数大于现有的分区数目,RDD的分区数将保持不变。也就是说 不经过Shuffle,是无法将RDD的分区数变多的

  • 如果N>M,并且N和M相差很大很大,这是要看executor数量与要生成的partition的关系。如果executor数 <= 要生成的partition数,coalesce效率高,反之如果用coalesce可能会导致(executor数 - 要生成的partition数)个executor空跑从而降低效率。如果在M为1的时候,为了使得coalesce之前的操作有更好的并行度,可以将shuffle设置为true。










