# 巧用localCheckpoint加速Spark上的迭代计算（break lineage）

## Spark和迭代计算

Spark是一个基于lineage的计算框架。

``````X=f(X，D)
``````

## 超长lineage问题

``````from operator import add
import time

yield (d, rank/n)

graph=...
# graph=sc.parallelize([(0,[1,2]),(1,[0,2,3]),(2,[1,2]),(3,[2,0])])
# initialize ranks of all nodes to 1.0
r = graph.map(lambda v:(v[0], 1.0))
po = r.count() * 1.0
# compute
for i in range(100):
t = time.time()
# compute the contribution of each edge
c = graph.join(r).flatMap(lambda klr:compute(klr[1][0], klr[1][1]))
# compute the new rank
# current progress
p = r.map(lambda v:v[1]**2).sum()
t = time.time() - t
print('iteration: %d, time: %f, progress: %p' % (i, t, p))
``````

``````r = sc.parallelize(r.collect())
``````

### cache函数

RDD的cache()函数强制系统在内存中为这个RDD创建一个副本，这样后后续用到它的操作就可以直接使用这个cache下来的结果了。比如：

``````r.cache()
``````

### localCheckpoint函数

RDD的`localCheckpoint()`函数和`checkpoint()`使用目的函数完全不同。`localCheckpoint`是设计出来斩断lineage的，而`checkpoint`是用来提供容错的。

``````for i in range(100):
t = time.time()
# compute the contribution of each edge
c = graph.join(r).flatMap(lambda klr:compute(klr[1][0], klr[1][1]))
# compute the new rank
# break the lineage every 5 iterations
if i % 5 == 1:
r.cache()
r.localCheckpoint()
# current progress
p = r.map(lambda v:v[1]**2).sum()
t = time.time() - t
print('iteration: %d, time: %f, progress: %p' % (i, t, p))
``````

