# 巧用localCheckpoint加速Spark上的迭代计算（break lineage）

Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

## Spark和迭代计算

Spark是一个基于lineage的计算框架。

``````X=f(X，D)
``````

## 超长lineage问题

``````from operator import add
import time

yield (d, rank/n)

graph=...
# graph=sc.parallelize([(0,[1,2]),(1,[0,2,3]),(2,[1,2]),(3,[2,0])])
# initialize ranks of all nodes to 1.0
r = graph.map(lambda v:(v[0], 1.0))
po = r.count() * 1.0
# compute
for i in range(100):
t = time.time()
# compute the contribution of each edge
c = graph.join(r).flatMap(lambda klr:compute(klr[1][0], klr[1][1]))
# compute the new rank
# current progress
p = r.map(lambda v:v[1]**2).sum()
t = time.time() - t
print('iteration: %d, time: %f, progress: %p' % (i, t, p))
``````

``````r = sc.parallelize(r.collect())
``````

### cache函数

RDD的cache()函数强制系统在内存中为这个RDD创建一个副本，这样后后续用到它的操作就可以直接使用这个cache下来的结果了。比如：

``````r.cache()
``````

### localCheckpoint函数

RDD的`localCheckpoint()`函数和`checkpoint()`使用目的函数完全不同。`localCheckpoint`是设计出来斩断lineage的，而`checkpoint`是用来提供容错的。

``````for i in range(100):
t = time.time()
# compute the contribution of each edge
c = graph.join(r).flatMap(lambda klr:compute(klr[1][0], klr[1][1]))
# compute the new rank
# break the lineage every 5 iterations
if i % 5 == 1:
r.cache()
r.localCheckpoint()
# current progress
p = r.map(lambda v:v[1]**2).sum()
t = time.time() - t
print('iteration: %d, time: %f, progress: %p' % (i, t, p))
``````

## 参考资料：

• https://zhuanlan.zhihu.com/p/87983748
• https://livebook.manning.com/book/spark-in-action-second-edition/16-cache-and-checkpoint-enhancing-spark-s-performances/v-14/
• https://medium.com/swlh/scaling-iterative-algorithms-in-spark-3b2127de32c6
• http://spark.apache.org/docs/latest/api/python/pyspark.html