58

PySpark 通过Arrow加速

 5 years ago
source link: http://www.jianshu.com/p/9c35da38a3c1?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前言

PySpark是Spark 实现 Unify BigData && Machine Learning目标的基石之一。通过PySpark,我们可以用Python在一个脚本里完成数据加载,处理,训练,预测等完整Pipeline,加上DB良好的notebook的支持,数据科学家们会觉得非常开心。当然缺点也是有的,就是带来了比较大的性能损耗。

性能损耗点分析

如果使用PySpark,大概处理流程是这样的(注意,这些都是对用户透明的)

  1. python通过socket调用Spark API(py4j完成),一些计算逻辑,python会在调用时将其序列化,一并发送给Spark。
  2. Spark 触发计算,比如加载数据,然后把数据转成内部存储格式InternalRow,接着启动Python Deamon, Python Deamon再启动多个Worker,
  3. 数据通过socket协议发送给Python Worker(不跨网络),期间需要将InternalRow转化为Java对象,然后再用Java Pickle进行序列化(一次),这个时候才能通过网络发送给Worker
  4. Worker接收后,一条一条反序列化(python pickle,两次),然后转化为Python对象进行处理。拿到前面序列化好的函数反序列化,接着用这个函数对这些数据处理,处理完成后,再用pickle进行序列化(三次),发送给Java Executor.
  5. Java Executor获取数据后,需要反序列化(四次),然后转化为InternalRow继续进行处理。

所以可以看到,前后需要四次编码/解码动作。序列化反序列化耗时应该占用额外耗时的70%左右。我们说,有的时候把序列化框架设置为Kyro之后,速度明显快了很多,可见序列化的额外耗时是非常明显的。

前面是一个点,第二个点是,数据是按行进行处理的,一条一条,显然性能不好。

第三个点是,Socket协议通讯其实还是很快的,而且不跨网络,只要能克服前面两个问题,那么性能就会得到很大的提升。 另外可以跟大家说的是,Python如果使用一些C库的扩展,比如Numpy,本身也是非常快的。

如何开启Arrow进行加速,以及背后原理

开启方式很简单,启动时加上一个配置即可:

if __name__ == '__main__':
    conf = SparkConf()
    conf.set("spark.sql.execution.arrow.enabled", "true")

你也可以在submit命令行里添加。

那么Arrow是如何加快速度的呢?主要是有两点:

  1. 序列化友好
  2. 向量化

序列化友好指的是,Arrow提供了一个内存格式,该格式本身是跨应用的,无论你放到哪,都是这个格式,中间如果需要网络传输这个格式,那么也是序列化友好的,只要做下格式调整(不是序列化)就可以将数据发送到另外一个应用里。这样就大大的降低了序列化开销。

向量化指的是,首先Arrow是将数据按block进行传输的,其次是可以对立面的数据按列进行处理的。这样就极大的加快了处理速度。

实测效果

为了方便测试,我定义了一个基类:

from pyspark import SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os

os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"

class _SparkBase(object):
    @classmethod
    def start(cls, conf=SparkConf()):
        cls.sc = SparkContext(master='local[*]', appName=cls.__name__, conf=conf)
        cls.sql = SQLContext(cls.sc)
        cls.session = SparkSession.builder.getOrCreate()
        cls.dataDir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"

    @classmethod
    def shutdown(cls):
        cls.session.stop()
        cls.session = None
        cls.sc.stop()
        cls.sc = None

接着提供了一个性能测试辅助类:

import time
from functools import wraps
import logging

logger = logging.getLogger(__name__)

PROF_DATA = {}


def profile(fn):
    @wraps(fn)
    def with_profiling(*args, **kwargs):
        start_time = time.time()

        ret = fn(*args, **kwargs)

        elapsed_time = time.time() - start_time

        if fn.__name__ not in PROF_DATA:
            PROF_DATA[fn.__name__] = [0, []]
        PROF_DATA[fn.__name__][0] += 1
        PROF_DATA[fn.__name__][1].append(elapsed_time)

        return ret

    return with_profiling


def print_prof_data(clear):
    for fname, data in PROF_DATA.items():
        max_time = max(data[1])
        avg_time = sum(data[1]) / len(data[1])
        logger.warn("Function %s called %d times. " % (fname, data[0]))
        logger.warn('Execution time max: %.3f, average: %.3f' % (max_time, avg_time))
    if clear:
        clear_prof_data()


def clear_prof_data():
    global PROF_DATA
    PROF_DATA = {}

很简单,就是wrap一下实际的函数,然后进行时间计算。现在,我们写一个PySpark的类:

import logging
from random import Random

import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import *

from example.allwefantasy.base.spark_base import _SparkBase
import example.allwefantasy.time_profile as TimeProfile
import pandas as pd

logger = logging.getLogger(__name__)
class PySparkOptimize(_SparkBase):
    def trick1(self):   
        pass 

if __name__ == '__main__':
    conf = SparkConf()
    conf.set("spark.sql.execution.arrow.enabled", "true")
    PySparkOptimize.start(conf=conf)
    PySparkOptimize().trick1()
    PySparkOptimize.shutdown()

这样骨架就搭建好了。

我们写第一个方法,trick1,做一个简单的计数:

def trick1(self):
        df = self.session.range(0, 1000000).select("id", F.rand(seed=10).alias("uniform"),
                                                   F.randn(seed=27).alias("normal"))
        # 更少的内存和更快的速度
        TimeProfile.profile(lambda: df.toPandas())()
        TimeProfile.print_prof_data(clear=True)

并且将前面的arrow设置为false.结果如下:

Function <lambda> called 1 times. 
Execution time max: 6.716, average: 6.716

然后同样的代码,我们把arrow设置为true,是不是会好一些呢?

Function <lambda> called 1 times. 
Execution time max: 2.067, average: 2.067

当然我这个测试并不严谨,但是对于这种非常简单的示例,提升还是有效三倍的,不是么?而这,只是改个配置就可以达成了。

分组聚合使用Pandas处理

另外值得一提的是,PySpark是不支持自定义聚合函数的,现在如果是数据处理,可以把group by的小集合发给pandas处理,pandas再返回,比如

def trick7(self):
        df = self.session.createDataFrame(
            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

        @F.pandas_udf("id long", F.PandasUDFType.GROUPED_MAP)  
        def normalize(pdf):
            v = pdf.v
            return pdf.assign(v=(v - v.mean()) / v.std())[["id"]]

        df.groupby("id").apply(normalize).show()

这里是id进行gourp by ,这样就得到一张id列都是1的小表,接着呢把这个小表转化为pandas dataframe处理,处理完成后,还是返回一张小表,表结构则在注解里定义,比如只返回id字段,id字段是long类型。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK