37

[译] 浅显易懂的分布式 TensorFlow 入门教程

 5 years ago
source link: https://mp.weixin.qq.com/s/mibROQsKrBCQDmXxT2sCGQ?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【导读】分布式TensorFlow可以有效地提神经网络训练速度,但它的使用并不简单。虽然官方提供了文档和示例,如链接【1】,但是它们太难懂了。本文是一篇浅显易懂的分布式TensorFlow入门教程,可以让你对分布式TensorFlow的原理和使用有一定的了解。

1.https://github.com/tensorflow/tensorflow/blob/r1.3/tensorflow/python/training/monitored_session.py

作者 | Tutorials by Malo Marrec 

编译 | 专知

整理 | Hujun

Bb6ZnuU.jpg!web

How to Write Distributed TensorFlow Code

分布式机器学习策略

模型并行化

当模型过大以至于一台及其的内存承受不住时,可以将计算图的不同部分放到不同的机器中,模型参数的存储和更新都在这些机器中进行。

一个最基本的方法是:把网络第一层放在一台机器上,第二层放在另一台机器上。然而,这样并不好,在前向传播时,较深的层需要等待较浅的层,在发现传播时,较浅的层需要等待较深的层。当模型中有并行的操作时(如GooLeNet),这些操作可以在不同的机器上运行,避免这样的瓶颈。

QRbuiif.jpg!web

数据并行化

整个计算图被保存在一个或多个参数服务器(ps)中。训练操作在多个机器上被执行,这些机器被称作worker。这些worker读取不同的数据(data batches),计算梯度,并将更新操作发送给参数服务器。

JZjEJzQ.jpg!web

数据并行化有两种主要的方案:

  • 同步训练:所有的worker服务器同时读取参数,执行训练操作,等待所有的worker服务器都完成当前训练操作后,梯度被平均后变成一个单独的更新请求并被发送到参数服务器中。所以在任何时候,每个worker服务器看到的计算图参数都是相同的。

  • 异步训练:worker服务器会异步地从参数服务器中读取参数,执行训练操作,并将更新请求异步地发送。在任何时间,两台worker服务器可能会看到参数不同的计算图。

本文会聚焦于如何在数据并行化模型中使用异步训练方案。

构建数据并行化模型

如前面所述,我们的系统会包含三种类型的节点:

  • 一个或多个参数服务器,用来存放模型

  • 一个主worker,用来协调训练操作,负责模型的初始化,为训练步骤计数,保存模型到checkpoints中,从checkpoints中读取模型,向TensorBoard中保存summaries(需要展示的信息)。主worker还要负责分布式计算的容错机制(如果参数服务器或worker服务器崩溃)。

  • worker服务器(包括主worker服务器),用来执行训练操作,并向参数服务器发送更新操作。

也就是说最小的集群需要包含一个主worker服务器和一个参数服务器。可以将它扩展为一个主worker服务器,多个参数服务器和多个worker服务器。

最好有多个参数服务器,因为worker服务器和参数服务器之间有大量的I/O通信。如果只有2个worker服务器,可能1个参数服务器可以扛得住所有的读取和更新请求。但如果你有10个worker而且你的模型非常大,一个参数服务器可能就不够了。

在分布式TensorFlow中,同样的代码会被发送到所有的节点。虽然你的main.py、train.py等会被同时发送到worker服务器和参数服务器,每个节点会依据自己的环境变量来执行不同的代码块。

分布式TensorFlow代码的准备包括三个阶段:

  1. 定义tf.trainClusterSpec和tf.train.Server

  2. 将模型赋给参数服务器和worker服务器

  3. 配置和启动tf.train.MonitoredTrainingSession

1. 定义tf.trainClusterSpec和tf.train.Server

tf.train.ClusterSpec object将任务映射到机器,它被用在tf.train.Server的构造函数中来构造tf.train.Server,在每台机器上创建一个或多个server,并确保每台机器能知道其他的机器在做什么。它包含设备的集合(某台机器上可用的设备),以及一个tf.Session object(tf.Session object会被tf.train.MonitoredTrainingSession 用于执行计算图)。

通常情况下,一台机器上有一个任务,除非你的机器有多个GPU,在这种情况下,你会给每个GPU分配一个任务。

从TensorFlow教程中摘取:

一个tf.train.ClusterSpec表示参与分布式TensorFlow计算的进程的集合。每个tf.train.Server都在一个集群中被构建。

一个tf.train.Server实例包含了设备的集合,和一个可以参与分布式训练的tf.Session目标。一台服务器属于一个集群(由tf.train.ClusterSpec指定)

A server belongs to a cluster (specified by a  ),并且对应一个任务。服务器可以和所在集群中的所有其他服务器进行通信。

2. 为worker服务器指定模型的变量和操作

用 with tf.device 命令,你可以将节点(无论是操作还是变量)指定到一个任务或工作中。例如:

with tf.device("/job:ps/task:0"):
    X = tf.placeholder(tf.float32, [100,128,128,3], 
name="X")
with tf.device("/job:worker/task:0"):
... #training ops definition
   train_step = (
            tf.train.AdamOptimizer(learning_rate)
            .minimize(loss, global_step=global_step)
            )

不在with tf.device块内的节点,会被TensorFlow自动地分配给一个设备。

在数据并行化框架中,节点会被分配到参数服务器中,操作会被分配到worker服务器中。手动进行分配不具有扩展性(设想你有10台参数服务器,你不会想手动地为每一台分配变量)。TensorFlow提供了方便的tf.train.replica_device_setter,它可以自动地为设备分配操作。

它以一个tf.train.ClusterSpec对象作为输入,并返回一个用于传给tf.device的函数。

在我们的模型中,变量操作被存放在参数服务器中,训练操作被存放在worker服务器中。

上面定义计算图的操作变为:

with tf.device(tf.train.replica_device_setter
(cluster_spec)):
... #model definition
   X = tf.placeholder(tf.float32, [100,128,128,3], 
name="X")
... #training ops definition
   train_step = (
            tf.train.AdamOptimizer(learning_rate)
            .minimize(loss, global_step=global_step)
            )

3. 配置和启动tf.train.MonitoredTrainingSession

tf.train.MonitoredTrainingSession是tf.Session在分布式训练中的等价物。它负责设置一个主worker节点,它会:

  • 初始化计算图

  • 读取和保存checkpoints

  • 导出TensorBoard展示所需信息(summaries)

  • 启动/停止会话

参数:

tf.train.MonitoredTrainingSession的参数包含主节点、checkpoints路径、保存checkpoints以及导出TensorBoard展示所需信息的频率。

with tf.train.MonitoredTrainingSession(
        master=server.target, # as defined with tf.train.
Server
        is_chief= ..., #boolean, is this node the master?
        checkpoint_dir=..., #path to checkpoint
/tensorboard dir
        hooks = hooks    #see next section
) as sess:

对于is_chief,你需要在代码中某处定义某个节点是主节点,例如你可以从集群部署系统中获取。

设置训练步数

我猜,你曾经在tf.Session块中使用了循环,并在循环中的每个迭代中,使用一个或多个sess.run指令。

这不是MonitoredTrainingSession执行的方式,所有的实例需要合理地被终止和同步,一个checkpoint需要被保存。因此,训练的步数通过一个SessionRunHook对象列表,被直接传入MonitoredTrainingSession。

向MonitoredTrainingSession对象传入一个tf.train.StopAtStepHook钩子,这个钩子定义了训练的最后一步,之后参数服务器和worker服务器会被关闭。

注意:有一些其他类型的钩子,你可以基于tf.train.SessionRunHook定义自己的钩子,这里不详细介绍了。

代码如下:

hooks = [tf.train.StopAtStepHook(last_step = 100000)]
with tf.train.MonitoredTrainingSession(...) as sess:
   sess.run(loss)
   #run your ops here

图如下:

JZjEJzQ.jpg!web

在Clusterone中构建数据并行化模型

现在我们了解了分布式TensorFlow代码中的组件,我来提供一些在Clusterone中运行分布式TensorFlow的高层次的代码片段:

# Notes:
# You need to have the clusterone package installed 
(pip install tensorport)
# Export logs and outputs to /logs, your data is in /data.

import tensorflow as tf
from clusterone import get_data_path, get_logs_path

# Get the environment parameters for distributed 
TensorFlow
try:
    job_name = os.environ['JOB_NAME']
    task_index = os.environ['TASK_INDEX']
    ps_hosts = os.environ['PS_HOSTS']
    worker_hosts = os.environ['WORKER_HOSTS']
except:  # we are not on TensorPort, assuming local, 
single node
    task_index = 0
    ps_hosts = None
    worker_hosts = None


# This function defines the master, ClusterSpecs and 
device setters
def device_and_target():
    # If FLAGS.job_name is not set, we're running 
single-machine TensorFlow.
    # Don't set a device.
    if FLAGS.job_name is None:
        print("Running single-machine training")
        return (None, "")

    # Otherwise we're running distributed TensorFlow.
    print("Running distributed training")
    if FLAGS.task_index is None or FLAGS.task_index == "":
        raise ValueError("Must specify an explicit 
`task_index`")
    if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":
        raise ValueError("Must specify an explicit 
`ps_hosts`")
    if FLAGS.worker_hosts is None or FLAGS.worker_hosts
     == "":
        raise ValueError("Must specify an explicit 
`worker_hosts`")

    cluster_spec = tf.train.ClusterSpec({
        "ps": FLAGS.ps_hosts.split(","),
        "worker": FLAGS.worker_hosts.split(","),
    })
    server = tf.train.Server(
        cluster_spec, job_name=FLAGS.job_name, 
task_index=FLAGS.task_index)
    if FLAGS.job_name == "ps":
        server.join()

    worker_device = "/job:worker/task:{}".
    format(FLAGS.task_index)
    # The device setter will automatically place Variables 
ops on separate
    # parameter servers (ps). The non-Variable ops will 
    be placed on the workers.
    return (
        tf.train.replica_device_setter(
            worker_device=worker_device,
            cluster=cluster_spec),
        server.target,
    )

    device, target = device_and_target()


# Defining graph
with tf.device(device):
    # TODO define your graph here
    ...

# Defining the number of training steps
hooks = [tf.train.StopAtStepHook(last_step=100000)]

with tf.train.MonitoredTrainingSession(master=target,
     is_chief=(FLAGS.task_index == 0),
     checkpoint_dir=FLAGS.logs_dir,
     hooks=hooks) as sess:
    while not sess.should_stop():
        # execute training step here (read data, 
feed_dict, session)
        # TODO define training ops
        data_batch = ...
        feed_dict = {...}
        loss, _ = sess.run(...)

原文链接:

https://clusterone.com/blog/2017/09/13/distributed-tensorflow-clusterone/

-END-

专 · 知

人工智能领域26个主题知识资料全集获取 加入专知人工智能服务群:   欢迎微信扫一扫加入专知人工智能知识星球群,获取专业知识教程视频资料和与专家交流咨询!

Z7JZ3qE.jpg!web

请PC登录 www.zhuanzhi.ai 或者点击 阅读原文 ,注册登录专知,获取更多AI知识资料

JVZJFjr.jpg!web

请加专知小助手 微信(扫一扫如下二维码添加),加入专知主题群(请备注主题类型:AI、NLP、CV、 KG等)交流~

jEfQnuI.jpg!web

关注专知公众号,获取人工智能的专业知识!

点击“ 阅读原文 ”,使用 专知


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK