3

分布式机器学习:逻辑回归的并行化实现(PySpark) - orion-orion

 1 year ago
source link: https://www.cnblogs.com/orion-orion/p/16318810.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. 梯度计算式导出

我们在博客《统计学习:逻辑回归与交叉熵损失(Pytorch实现)》中提到,设w为权值(最后一维为偏置),样本总数为N,{(xi,yi)}Ni=1为训练样本集。样本维度为D,xi∈RD+1(最后一维扩充),yi∈{0,1}。则逻辑回归的损失函数为:

l(w)=N∑i=1[yilogπw(xi)+(1−yi)log(1−πw(xi))]
πw(x)=p(y=1∣x;w)=11+exp(−wTx)

写成这个形式就已经可以用诸如Pytorch这类工具来进行自动求导然后采用梯度下降法求解了。不过若需要用表达式直接计算出梯度,我们还需要将损失函数继续化简为:

l(w)=−N∑i=1(yiwTxi−log(1+exp(wTxi)))

可将梯度表示如下:

∇wl(w)=−N∑i=1(yi−1exp(−wTx)+1)xi

2. 基于Spark的并行化实现

逻辑回归的目标函数常采用梯度下降法求解,该算法的并行化可以采用如下的Map-Reduce架构:

o_220225103222_%E5%90%8C%E6%AD%A5%E6%A2%AF%E5%BA%A6%E6%B1%82%E5%92%8C%E7%AE%97%E6%B3%95.png

先将第t轮迭代的权重广播到各worker,各worker计算一个局部梯度(map过程),然后再将每个节点的梯度聚合(reduce过程),最终对参数进行更新。

在Spark中每个task对应一个分区,决定了计算的并行度(分区的概念详间我们上一篇博客《Spark: 单词计数(Word Count)的MapReduce实现(Java/Python)》 )。在Spark的实现过程如下:

  • map阶段: 各task运行map()函数对每个样本(xi,yi)计算梯度gi, 然后对每个样本对应的梯度运行进行本地聚合,以减少后面的数据传输量。如第1个task执行reduce()操作得到˜g1=∑3i=1gi 如下图所示:

  • reduce阶段:使用reduce()将所有task的计算结果收集到Driver端进行聚合,然后进行参数更新。

o_4365badd.png

在上图中,训练数据用points:PrallelCollectionRDD来表示,参数向量用w来表示,注意参数向量不是RDD,只是一个单独的参与运算的变量。

此外需要注意一点,虽然每个task在本地进行了局部聚合,但如果task过多且每个task本地聚合后的结果(单个gradient)过大那么统一传递到Driver端仍然会造成单点的网络平均等问题。为了解决这个问题,Spark设计了性能更好的treeAggregate()操作,使用树形聚合方法来减少网络和计算延迟。

3. PySpark实现代码

PySpark的完整实现代码如下:

from sklearn.datasets import load_breast_cancer
import numpy as np
from pyspark.sql import SparkSession
from operator import add
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

n_slices = 3  # Number of Slices
n_iterations = 300  # Number of iterations
alpha = 0.01  # iteration step_size


def logistic_f(x, w):
    return 1 / (np.exp(-x.dot(w)) + 1)


def gradient(point: np.ndarray, w: np.ndarray) -> np.ndarray:
    """ Compute linear regression gradient for a matrix of data points
    """
    y = point[-1]    # point label
    x = point[:-1]   # point coordinate
    # For each point (x, y), compute gradient function, then sum these up
    return - (y - logistic_f(x, w)) * x


if __name__ == "__main__":

    X, y = load_breast_cancer(return_X_y=True)

    D = X.shape[1]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=0)
    n_train, n_test = X_train.shape[0], X_test.shape[0]

    spark = SparkSession\
        .builder\
        .appName("Logistic Regression")\
        .getOrCreate()

    matrix = np.concatenate(
        [X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)

    points = spark.sparkContext.parallelize(matrix, n_slices).cache()

    # Initialize w to a random value
    w = 2 * np.random.ranf(size=D + 1) - 1
    print("Initial w: " + str(w))

    for t in range(n_iterations):
        print("On iteration %d" % (t + 1))
        g = points.map(lambda point: gradient(point, w)).reduce(add)
        w -= alpha * g

        y_pred = logistic_f(np.concatenate(
            [X_test, np.ones((n_test, 1))], axis=1), w)
        pred_label = np.where(y_pred < 0.5, 0, 1)
        acc = accuracy_score(y_test, pred_label)
        print("iterations: %d, accuracy: %f" % (t, acc))

    print("Final w: %s " % w)
    print("Final acc: %f" % acc)

    spark.stop()

注意spark.sparkContext.parallelize(matrix, n_slices)中的n_slices就是Spark中的分区数。我们在代码中采用breast cancer数据集进行训练和测试,该数据集是个二分类数据集。模型初始权重采用随机初始化。

最后,我们来看一下算法的输出结果。

初始权重如下:

Initial w: [-0.0575882   0.79680833  0.96928013  0.98983501 -0.59487909 -0.23279241
 -0.34157571  0.93084048 -0.10126002  0.19124314  0.7163746  -0.49597826
 -0.50197367  0.81784642  0.96319482  0.06248513 -0.46138666  0.76500396
  0.30422518 -0.21588114 -0.90260279 -0.07102884 -0.98577817 -0.09454256
  0.07157487  0.9879555   0.36608845 -0.9740067   0.69620032 -0.97704433
 -0.30932467]

最终的模型权重与在测试集上的准确率结果如下:

Final w: [ 8.22414803e+02  1.48384087e+03  4.97062125e+03  4.47845441e+03
  7.71390166e+00  1.21510016e+00 -7.67338147e+00 -2.54147183e+00
  1.55496346e+01  6.52930570e+00  2.02480712e+00  1.09860082e+02
 -8.82480263e+00 -2.32991671e+03  1.61742379e+00  8.57741145e-01
  1.30270454e-01  1.16399854e+00  2.09101988e+00  5.30845885e-02
  8.28547658e+02  1.90597805e+03  4.93391021e+03 -4.69112527e+03
  1.10030574e+01  1.49957834e+00 -1.02290791e+01 -3.11020744e+00
  2.37012097e+01  5.97116694e+00  1.03680530e+02] 
Final acc: 0.923977

可见我们的算法收敛良好。

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK