41

Ray源码解析之整体逻辑结构

 5 years ago
source link: http://whatbeg.com/2019/04/16/raysource-overall.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Ray Version 0.4.0

以一个Ray示例程序来说明,Ray执行多进程/分布式程序的过程。

import ray
import time

@ray.remote
def f():
    time.sleep(1)
    return 1

ray.init()
results = ray.get([f.remote() for i in range(4)])
print(results)

结果输出如下:

Process STDOUT and STDERR is being redirected to /tmp/raylogs/.
Waiting for redis server at 127.0.0.1:35084 to respond...
Waiting for redis server at 127.0.0.1:59058 to respond...
Starting local scheduler with the following resources: {'CPU': 8, 'GPU': 0}.

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui26399.ipynb?token=7e19e5b2051ef36474500c26427d304da95835e4f0933992
======================================================================

[1, 1, 1, 1]

Process finished with exit code 0

首先我们定义 remote 函数 f ,将一个普通的Python函数变为 remote 函数只需在其上加上 @ray.remote 装饰器。

remote 是一个装饰工厂函数,返回修饰函数的装饰器,主要定义代码如下:

# ray/python/ray/worker.py
def remote(*args, **kwargs):

    worker = global_worker
    
    def make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources,
                              max_calls, checkpoint_interval, func_id=None):  # 装饰器工厂函数
        # 装饰器,装饰函数,做Actor和function的区分
        def remote_decorator(func_or_class):                                  
            if inspect.isfunction(func_or_class) or is_cython(func_or_class):
                ...
                return remote_function_decorator(..)   # 是函数,调用远程函数装饰器
            if inspect.isclass(func_or_class):
                ...
                return worker.make_actor(..)           # 是actor,由全局worker创建一个actor
            raise Exception
        # 装饰器,参数是函数,起装饰函数的作用
        def remote_function_decorator(func, function_properties):
            def func_call(*args, **kwargs):
                return _submit(args=args, kwargs=kwargs)

            def _submit(...):
                ...
            def func_executor(arguments):
                """This gets run when the remote function is executed."""
                result = func(*arguments)
                return result

            def func_invoker(*args, **kwargs):
                raise Exception
            func_invoker.remote = func_call              # func.remote() 直接调用func_call
            func_invoker._submit = _submit
            func_invoker.executor = func_executor
            func_invoker.is_remote = True
            func_name = "{}.{}".format(func.__module__, func.__name__)
            func_invoker.func_name = func_name
            ...
            return func_invoker

        return remote_decorator
    if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
        # 不带参数的 @ray.remote 装饰
        return make_remote_decorator(
            num_return_vals, num_cpus, num_gpus, resources,
            max_calls, checkpoint_interval)(args[0])
    else:
        # 带参数的 @ray.remote(xx=x) 装饰
        ...
        return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
                                     resources, max_calls, checkpoint_interval)

【先验知识:Python装饰器的概念】

由此可见, remote 是一个通用的装饰器,可以装饰普通的Python函数,或者是Python的class。

进入 remote 装饰器体,首先得到全局Worker,然后定义了一个 make_remote_decorator 装饰器工厂函数,然后判断是无参装饰还是带参装饰。

如果是无参装饰,那么

@ray.remote
def f():
    time.sleep(1)
    return 1

等价于

def f():
    ...
    
f = ray.remote(f)

此时,remote的参数只有一个,那就是 f 本身,也即 args[0]

所以上述代码返回 make_remote_decorator(...)(args[0]) ,即调用过的 make_remote_decorator ,参数是 f

否则remote函数定义等价于:

def f():
    ...

f = ray.remote(num_cpus=1, ..)(f)

所以 remote 装饰器返回一个未调用的,将会在 f 上调用的 make_remote_decorator 函数。

make_remote_decorator 中再嵌套了一层装饰,本身提供对函数和actor的区分。

如果是函数,则进入 remote_function_decorator 远程函数装饰器;

否则是class,由全局worker创建一个actor。

远程函数装饰器 remote_function_decorator 的责任就是接受函数参数,返回一个函数,这个函数就是远程函数,不能直接传参调用(第29行)。将 remote() 绑定到 func_call ,接受参数后,提交任务( _submit_task )运行这个函数,最后得到结果,这个结果也是 f.remote() 调用的结果,是一个 object id ,因为返回结果存在object store中。

至此,远程函数就定义好了。我们在原始的普通Python函数 f 上,装饰了一下,得到了一个可以通过 f.remote() 来调用的远程函数,如此调用将会立马提交一个任务,供Ray引擎调度执行,返回结果。

下面是 ray.init() 过程。可以理解为初始化Ray引擎的过程,类似于启动Tensorflow的Session的过程。

ray.init() 也有带参版本和无参版本。

带参版本用于已经存在并启动一个Ray集群的情况下,直接填入该集群的redis地址,即可连接到集群,就初始化好了。

无参版本适用于单机多进程的运行,这种情况下会创建一个Ray环境,默认启动一个local scheduler,一个global scheduler,一个或多个redis server, 一个object store和一个object store manager,和若干worker进程(默认为CPU核数个)。

init() 主要逻辑为:

# ray/python/ray/worker.py
init()
  _init()
    if PYTHON_MODE:
      pass
    elif start_ray_local:  # 本地开启一个Ray主节点进程
      address_info = services.start_ray_head(..)
    else:  # 连接到已有集群
      address_info = get_address_info_from_redis(redis_address, node_ip_address)
    # 将全局worker连接到 local scheduler, Plasma 和 Redis
    connect(driver_address_info, object_id_seed=object_id_seed,
            mode=driver_mode, worker=global_worker)

四个模式:

SCRIPT_MODE:如果Worker是driver,且由Python脚本启动或者在shell中交互式运行的话,使用脚本模式。会打印任务失败信息。
WORKER_MODE:如果Worker不是driver,只是slave的话,启动WORKER_MODE,不打印关于task的任何信息。
PYTHON_MODE:如果要顺序运行或是调试,可以使用PYTHON_MODE,此时的Worker即是driver。此模式下,不会发送remote函数到调度器,而是直接以阻塞的形式执行。
SILENT_MODE:测试的时候使用SILENT_MODE。不会打印error信息,因为许多测试时故意失败的。

我们的示例代码中, ray.init() 是无参的,代表我们会在本地开启一个ray head节点进程。

此部分代码简要逻辑如下:

# ray/python/ray/services.py
start_ray_head
  | start_ray_processes
    | print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.")  # 程序输出中第一行的来源
    | if redis_address is None:
    |   start_redis(...)
        |  start_redis_instance(..)
          |  创建redis_shards个redis server
          # 等待redis server可用并响应,程序输出第2,3行的来源
          |  wait_for_redis_to_start("127.0.0.1", port)   
    | if include_log_monitor:
    |   start_log_monitor(..)
    | if include_global_scheduler:
    |   start_global_scheduler(...)
        # 开启local_scheduler并打印 Starting local scheduler ..,程序第4行的来源
        | local_scheduler_name, pid = ray.local_scheduler.start_local_scheduler(...)
    | for i in range(num_local_schedulers - len(object_store_addresses)):
    |   start_objstore(...)
        | ray.plasma.start_plasma_store(..)
        | ray.plasma.start_plasma_manager(..)
    | for i in range(len(local_scheduler_socket_names), num_local_schedulers):
    |   start_local_scheduler(...)
    # 每个local scheduler默认搭配CPU核数个workers,因此workers_per_local_scheduler[i] = #cpus
    | for i, num_local_scheduler_workers in enumerate(workers_per_local_scheduler):
    |   for j in range(num_local_scheduler_workers):
    |     start_worker(...)
    | if include_webui:
    |   start_ui(...)
    # 开启UI会打印输出中UI的部分

可以看到, start_ray_head 的过程配套启动了redis, global scheduler, local scheduler及其workers,UI等。

这些都是ray执行快速的分布式任务分发的基本组件,其中redis用来存储全局系统状态,global scheduler和local scheduler分数两级调度器,负责快速的任务调度,workers负责执行远程函数,UI负责观察运行状态,不过目前UI做的还比较简陋。

每个Worker执行一个主循环 main_loop ,循环不断地接受任务,处理任务返回……

这部分代码见 ray/python/ray/workers/default_worker.py

main_loop 的代码如下:

# ray/python/ray/worker.py
def main_loop(self):
    def exit(signum, frame):
        cleanup(worker=self)
        sys.exit(0)
    signal.signal(signal.SIGTERM, exit)
    check_main_thread()
    while True:
        # 此处调用self.local_scheduler_client.get_task()获得任务
        task = self._get_next_task_from_local_scheduler()
        self._wait_for_and_process_task(task)
        |  self._wait_for_function(function_id, task.driver_id().id())
        |  with self.lock:
           |  self._process_task(task)

初始化好以后,就可以运行 f.remote() 了,运行后还是回到装饰器里面的 _submit_task 函数,

# ray/python/ray/worker.py
def _submit(args=None, kwargs=None, num_return_vals=None,
            num_cpus=None, num_gpus=None, resources=None):
    check_connected()       # 检查worker是否连接
    check_main_thread()     # 检查是否主线程,不允许非主线程提交任务
    kwargs = {} if kwargs is None else kwargs
    args = signature.extend_args(function_signature, args, kwargs)

    if _mode() == PYTHON_MODE:
        # PYTHON模式下,并不提交任务,而是串行执行,拷贝参数以防修改
        result = func(*copy.deepcopy(args))
        return result
    # 提交任务,返回结果的object id或者一组object ids
    object_ids = _submit_task(function_id, args,
                              num_return_vals=num_return_vals,
                              num_cpus=num_cpus, num_gpus=num_gpus,
                              resources=resources)
    if len(object_ids) == 1:
        return object_ids[0]
    elif len(object_ids) > 1:
        return object_ids

代码中调用的 _submit_task 是对 worker.submit_task 的一个封装:

# ray/python/ray/worker.py
def _submit_task(function_id, *args, **kwargs):
    """This is a wrapper around worker.submit_task.

    We use this wrapper so that in the remote decorator, we can call
    _submit_task instead of worker.submit_task. The difference is that when we
    attempt to serialize remote functions, we don't attempt to serialize the
    worker object, which cannot be serialized. 【这样搞一下就不需要序列化worker对象了?】
    """
    return global_worker.submit_task(function_id, *args, **kwargs)

最终,Worker的 submit_task 函数如下:

# ray/python/ray/worker.py
def submit_task(self, function_id, args, ...):
    with log_span("ray:submit_task", worker=self):
        check_main_thread()
        ...
        # 将参数put进object store,注意,如果多个函数使用的是相同的输入,直接调用的话仍然会put多次
        # 一个方法是先在调用前put参数,然后传入put后的ObjectID对象。
        args_for_local_scheduler = []
        for arg in args:
            if isinstance(arg, ray.local_scheduler.ObjectID):
                args_for_local_scheduler.append(arg)
            elif isinstance(arg, ray.actor.ActorHandleParent):
                args_for_local_scheduler.append(put(
                    ray.actor.wrap_actor_handle(arg)))
            elif ray.local_scheduler.check_simple_value(arg):
                args_for_local_scheduler.append(arg)
            else:
                args_for_local_scheduler.append(put(arg))
        ...
        # Submit the task to local scheduler.
        task = ray.local_scheduler.Task(
            self.task_driver_id,
            ray.local_scheduler.ObjectID(function_id.id()),
            args_for_local_scheduler,
            ...)
        ...
        self.task_index += 1
        self.local_scheduler_client.submit(task)

        return task.returns()

也就是说, [f.remote() for i in range(4)] 这一句,默认的全局worker会提交4个任务给local scheduler,然后local scheduler将这些任务调度到嗷嗷待哺的各个worker,前面的代码说过,默认会启动CPU核数个Worker。

运行完毕后,列表中就是返回的值的object id,我们需要使用 ray.get(id) 从object store中将真正的数据拿出来。

最后就成了 [1, 1, 1, 1] ,程序到此就结束了。

再回顾一下整个过程:

@ray.remote   # 装饰器
def f():
    time.sleep(1)
    return 1
              # 装饰完成,装饰过后的远程函数f已形成
ray.init()    # 初始化Ray引擎,会启动各个必要组件,包括调度,状态存储,对象存储和workers等
results = ray.get([f.remote() for i in range(4)])   # 提交任务,获得结果,从object store中取出
print(results)

Happy Reading!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK