46

Ray 之设计动机及使用

 3 years ago
source link: https://mp.weixin.qq.com/s/pqSGzIdhZtAsZIyVzqSIaw
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

What is Ray?

关于 Ray,其官网有一个比较简单的介绍:

Ray is a fast and simple framework for building and running distributed applications. Ray 是一个用于快速、简单构建和运行分布式应用的框架。

Ray 是继 Spark 之后由 UC Berkeley AMP 实验室又推出一重磅高性能 AI 计算引擎,它在设计时主要是为了解决在 Reinforcement Learning(RL, 强化学习 [1] )场景下遇到的一些问题。

机器学习三种不同的训练方法的区别(这里介绍这个背景,主要为了理解 Ray 最初设计的动机或初衷):

1. 监督学习:有标签、直接反馈、预测未来结果; 2. 非监督学习:无标签、无反馈、寻找隐藏的结构; 3. 强化学习:决策流程、奖励系统、学习一系列的行动(典型的);

Ray 设计的动机

Ray 设计的初衷是为了解决 RL 中遇到的问题,所以要想理解 Ray 的设计动机,就需要先对 RL 场景有一个简单的认识。维基百科对 RL 有简单的介绍:强化学习是机器学习中的一个领域,强调如何基于环境而行动,以取得最大化的预期利益。即有机体(agent)如何在环境给予的奖励或惩罚的刺激下,逐步形成对刺激的预期,产生能获得最大利益的习惯性行为。

强化学习 RL 背景

整个强化学习系统通常是由:agent、State(状态)、Reward(奖赏)、Action(动作)和 Environment(环境)五部分组成:

iQV7R3E.png!mobile

1. Agent:它是整个 RL 的核心,它能够感知 Environment 的 State,并且根据 Environment 提供的 Reward,通过学习选择一个合适的 Action 来最大化长期的 Reward 值; 2. Environment:它会接收 Agent 执行的一系列的 Action,并且对这一系列的 Action 的结果做相应的评价,换成一种可量化的 Reward 反馈给 Agent; 3. Reward:它是 Environment 提供给 Agent 的一个量化的反馈信号,用于评价 Agent 在某一时刻所做 Action 的结果;

简单总结一下,RL 本质上就是利用 Environment 提供的 Reward 作为反馈,学习一系列的环境状态(State)到动作(Action)的映射,Action 选择的原则是最大化未来累积的 Reward 的概率。

强化学习 RL 遇到的问题

RL 会在一个未知的环境连续运行,并且会依赖环境的反馈做处理。RL 应用的核心是通过学习到的环境状态(State)到动作(Action)的映射来最大化 Reward 的概率,现实中应用场景如游戏比赛等。要想实现这一目的,RL 应用就需要三种功能:

1. RL 依赖于  Simulation (仿真模式)来评估策略,Simulation 可以探索更多的 Action 选择来协助训练; 2. RL 算法需要依赖 分布式训练 根据 Simulation 的结果来改进策略; 3. RL 还需要  Serving  根据训练好的策略及当前的环境做出 Action 决策;

上面这是一个典型的 RL 应用场景,在 Ray 的论文 Ray: A Distributed Framework for Emerging AI Applications [2] 中有下面一个示例:

BjUnIvB.png!mobile

根据场景的需求,对系统也提出了新的要求:

1. 支持并行、实时处理大量的数据; 2. 支持多种不同运行时长(几分钟或长期运行)和资源需求(CPU or GPU)的任务类型; 3. 支持动态的 task graph,因为 Simulation 或环境的结果都可能会随时会改变 task graph;

简单来说: we need a dynamic computation framework that handles millions of heterogeneous tasks per second at millisecond-level latencies

针对这种需求,首先现有的框架是无法满足的,但是也有相应的解决方案,比如:使用 Horovod 做分布式训练、使用 Clipper 做 Serving 决策、使用 CIEL 做 Simulation,将这些系统耦合在一起去提供服务,但是在实际的使用中,这会带来很多的问题,比如:调度、容错、数据交互等,当使用多个系统时,这些问题就变得异常复杂。因此,UC Berkeley AMP 实验室设计了 Ray 来解决这个问题。

Ray 核心能力

根据论文及官网的介绍,Ray 当前提供的能力有:

1. Dynamic task graph :相比于 Spark、Flink,Ray 可以支持动态 Graph,这个让 Ray 具有了 FAAS 的一些能力,使得 Ray 的扩展性比较强; 2. Stateless tasks and actors combined :Ray 提供了 task(无状态)模型和 Actor(有状态)模型的统一抽象,做到了支持有状态和无状态的计算; 3. Shared object store with zero copy deserialization :每个节点通过共享存储维护了一块局部的对象存储,并且利用专门的 Apache Arrow 格式来进行不同节点间的数据交换; 4. Bottom-up scheduling for low latency :Ray 在实现时,调度部分分为本地调度器和全局调度器,通过这种方式 Ray 可以做到每秒百万级 task 调度(声称); 5. Clean Python API :Ray 本就是为了解决 RL 的问题而设计,一开始就选择了 Python 作为对外的接口。

从上面的总结中,可以看出 Ray 在设计上还是一款比较优秀的分布式计算引擎,提供的能力很强大。

How to use Ray?

Ray 由于最开始是为了 RL 场景而设计,所以一开始就选用了 Python 作为其面向用户的 API 接口,在 Ray 中有两个核心的概念:

1. Function:普通的 remote 函数,只能处理无状态的计算需求; 2. Actor:有状态的计算;

Ray API

简单看下 Ray 的相关 API(参考: 高性能分布式执行框架——Ray [3] ):

Ray API 一个比较大的特点以及设计理念就是:期望用单机编程的体验来实现分布式的效果

1. ray.init() :在 PythonShell 中,使用  ray.init() 可以在本地启动 ray,包括 Driver、HeadNode(Master)和若干 Slave; 2. @ray.remote :在 python 中使用这个注解表示声明了一个 remote function,它是 Ray 的基本任务调度单元,它在定义后,会被立马序列化存储到 RedisServer 中,并且分配一个唯一的 ID,这样就能保证集群所有节点都能看到这个函数的定义; 3. ray.put() :将 python 对象存储本地 ObjectStore 中,并且异步返回一个唯一的 ObjectID,通过这个 ID,Ray 可以访问集群中任何一个节点上的对象; 4. ray.get() :使用这个方法可以通过 ObjectID 获取 ObjectStore 内的对象并将之转换为 Python 对象,这个方法是阻塞的,会等到结果返回; 5. ray.wait() :操作支持批量的任务等待,基于此可以实现一次性获取多个 ObjectID 对应的数据; 6. ray.error_info() :使用  ray.error_info()  可以获取任务执行时产生的错误信息;

Ray Function

Ray Function 的示例如下:

import ray

# 本地启动 ray,如果想指定已有集群,在 init 方法中指定 RedisServer 即可

ray.init()



# 可以声明一个 remote function;

# remote 函数是 Ray 的基本任务调度单元,remote 函数定义后会立即被序列化存储到 RedisServer中,并且分配了一个唯一的 ID,这样就保证了集群的所有节点都可以看到这个函数的定义

@ray.remote

def f1(x):

return x * x



@ray.remote

def f2(y):

return y +10



@ray.remote

def f3(x, y):

return x + y



# 这里拿到的都是 future,相当于异步调用,只要调用 get 接口才会去拿计算的结果;通过 function.remote() 的方式调用这个函数

# 这里的 x/y 实际上拿到的都是 Object IDs

x = f1.remote(2)

y = f2.remote(3)

# remote function 可以组合在一起使用

z = f3.remote(x, y)



# get 接口可以通过 ObjectID 获取 ObjectStore 内的对象并将之转换为 Python 对象

# get 接口在调用时会阻塞,知道获取结果

print(ray.get(z))



####### 输出结果 ########

17

Ray Actor

Actor 与 remote function 的区别是,actor 是有状态的计算,在计算中会记录相应的状态(在 Python 的 class 定义前使用 @ray.remote 可以声明 Actor)。

import ray
ray.init()



# 在 Python 的 class 定义前使用 @ray.remote 来声明 Actor
# actor 会记录相应的状态
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0



def increment(self):
self.n += 1



def read(self):
return self.n



# 使用 Class.remote() 创建 actor
counters = Counter.remote() for i in range(4)]



# 使用 actor.function.remote() 调用 actor 的方法,其他与普通的 remote function 就很类似了
# 这里的 features 也是 Object IDs;
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))



[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))



[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))



############# 输出结果 #########
# 这里调用三次,可以看到对应 actor 的状态是有记录的
[1, 1, 1, 1]
[2, 2, 2, 2]
[3, 3, 3, 3]

References

[1] 强化学习:  https://zh.wikipedia.org/wiki/%e5%bc%ba%e5%8c%96%e5%ad%a6%e4%b9%a0

[2] Ray: A Distributed Framework for Emerging AI Applications:  https://www.usenix.org/system/files/osdi18-moritz.pdf

[3] 高性能分布式执行框架——Ray:  https://www.cnblogs.com/fanzhidongyzby/p/7901139.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK