4

[源码解析] TensorFlow 分布式环境(2)---Master 静态逻辑 - 罗西的思考

 2 years ago
source link: https://www.cnblogs.com/rossiXYZ/p/16024266.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

[源码解析] TensorFlow 分布式环境(2)---Master 静态逻辑

在具体介绍 TensorFlow 分布式的各种 Strategy 之前,我们首先需要看看分布式的基础:分布式环境。只有把基础打扎实了,才能在以后的分析工作之中最大程度的扫清障碍,事半功倍。本文梳理下 Master 的静态逻辑。

本系列其他文章是:

[翻译] TensorFlow 分布式之论文篇 "TensorFlow : Large-Scale Machine Learning on Heterogeneous Distributed Systems"

[翻译] TensorFlow 分布式之论文篇 "Implementation of Control Flow in TensorFlow"

[源码解析] TensorFlow 分布式环境(1) --- 总体架构

Server 上运行了两个 RPC 服务,分别是MasterService 和 WorkerService。如果 Client 接入到Server,那么Server 就是 Master 角色,Client 访问的就是 MasterService 服务(MasterService 同时负责协调和控制多个 WorkerService 的执行过程)。

Master 这个角色的具体实现是 Master Service。Master Service是一个GRPC service,用于与一系列远端的分布式设备进行交互来协调多个worker service。

  • Master Service 对应了 "//tensorflow/core/protobuf/master_service.proto",其内部有 CreateSession,RunStep 等接口,所有的 TensorFlow Server 都实现了 Master Service。
  • 客户端可以与 Master Service 交互以执行分布式 TensorFlow 计算。
  • 一个 Master Service 会跟踪多个 "主会话(master sessions)"。每个 master sessions 封装了一个计算图及其相关状态。
  • Master session 运行在 Master 之上,在会话建立后,master 返回一个句柄给客户端,该句柄可用于关联客户端和主会话。
  • 每个 Master session 通常对应一个 "客户会话(client session)"。客户端可以通过调用 CreateSession 向 master 发送一个初始图,通过调用 ExtendSession 向图添加节点。
  • 这里需要说明下,Master 即是一个概念角色,比如 Master 节点,也有一个具体 Master 类。

2.1 接口规范

Client 通过 GrpcSession 调用 Master Service,既然是 RPC 服务,那么 Client 和 MasterService 之间就需要有一个接口规范。这个规范定义在 master_service.proto 文件中,其定义了各个接口的消息体。

service MasterService {
  // Creates a session.
  rpc CreateSession(CreateSessionRequest) returns (CreateSessionResponse);

  // Extends a session.
  rpc ExtendSession(ExtendSessionRequest) returns (ExtendSessionResponse);

  // Prepares future partial run calls.
  rpc PartialRunSetup(PartialRunSetupRequest) returns (PartialRunSetupResponse);

  // Drives the graph computation.
  rpc RunStep(RunStepRequest) returns (RunStepResponse);

  // Closes a session.
  rpc CloseSession(CloseSessionRequest) returns (CloseSessionResponse);

  // List the devices usable by the master.
  rpc ListDevices(ListDevicesRequest) returns (ListDevicesResponse);

  // Close and abandon all existing sessions.  Ongoing computations
  // will no longer affect fresh ones via the resources in containers listed in
  // the ResetRequest.  See ResetRequest for more details.
  rpc Reset(ResetRequest) returns (ResetResponse);

  // Registers a callable for execution with RunCallable.
  rpc MakeCallable(MakeCallableRequest) returns (MakeCallableResponse);

  // Executes a callable registered with MakeCallable.
  rpc RunCallable(RunCallableRequest) returns (RunCallableResponse);

  // Frees resources associated with a callable registered with MakeCallable.
  rpc ReleaseCallable(ReleaseCallableRequest) returns (ReleaseCallableResponse);
}

2.2 MasterInterface

Client 使用接口 MasterInterface 获取远端 MasterService 的服务。MasterInterface 是接口类,是 Client 与 TensorFlow Master service 进行通信的抽象接口。这个接口既支持基于 RPC 的 master 实现,也支持不需要 RPC 往返的进程内部的 master 实现。MasterInterface 所有接口都是同步接口,这样 Client 就像调用本地函数一样调用远端 MasterService 提供的服务。

MasterInterface有两种实现,都是用来和 Master service 进行通信,

  • LocalMaster 用于进程间的直接通信,此时 Client 和 Master 在同一个进程。
  • GrpcRemoteMaster 则使用 Grpc 来和 Master service 进行通信,此时 Client 和 Master 分别部署在两个不同进程。
    • 可以调用工厂方法 NewGrpcMaster 生成 GrpcRemoteMaster 实例。
    • GrpcRemoteMaster 其实就实现了 gRPC 客户端,它通过 Stub 访问远端 Master 上的 MasterService 服务,具体服务是 GrpcMasterService。
    • 因为 MasterInterface 都是同步接口,所以 Client 就好像访问本地函数一样访问 MasterService。
class MasterInterface {
 public:
  virtual ~MasterInterface() {}
  virtual Status CreateSession(CallOptions* call_options,
                               const CreateSessionRequest* request,
                               CreateSessionResponse* response) = 0;

  virtual Status ExtendSession(CallOptions* call_options,
                               const ExtendSessionRequest* request,
                               ExtendSessionResponse* response) = 0;

  virtual Status PartialRunSetup(CallOptions* call_options,
                                 const PartialRunSetupRequest* request,
                                 PartialRunSetupResponse* response) {
    return errors::Unimplemented("Partial run not implemented for this master");
  }

  virtual Status RunStep(CallOptions* call_options,
                         RunStepRequestWrapper* request,
                         MutableRunStepResponseWrapper* response) = 0;

  virtual Status RunStep(CallOptions* call_options,
                         const RunStepRequest* request,
                         RunStepResponse* response) {
    std::unique_ptr<RunStepRequestWrapper> wrapped_request(
        new ProtoRunStepRequest(request));
    std::unique_ptr<MutableRunStepResponseWrapper> wrapped_response(
        new NonOwnedProtoRunStepResponse(response));
    return RunStep(call_options, wrapped_request.get(), wrapped_response.get());
  }

  virtual MutableRunStepRequestWrapper* CreateRunStepRequest() {
    MutableProtoRunStepRequest* ret = new MutableProtoRunStepRequest;
    ret->request_.set_request_id(GetUniqueRequestId());
    return ret;
  }

  virtual MutableRunStepResponseWrapper* CreateRunStepResponse() {
    return new OwnedProtoRunStepResponse;
  }

  virtual Status CloseSession(CallOptions* call_options,
                              const CloseSessionRequest* request,
                              CloseSessionResponse* response) = 0;

  virtual Status ListDevices(CallOptions* call_options,
                             const ListDevicesRequest* request,
                             ListDevicesResponse* response) = 0;

  virtual Status Reset(CallOptions* call_options, const ResetRequest* request,
                       ResetResponse* response) = 0;

  virtual Status MakeCallable(CallOptions* call_options,
                              const MakeCallableRequest* request,
                              MakeCallableResponse* response) = 0;
  virtual Status RunCallable(CallOptions* call_options,
                             const RunCallableRequest* request,
                             RunCallableResponse* response) = 0;
  virtual Status ReleaseCallable(CallOptions* call_options,
                                 const ReleaseCallableRequest* request,
                                 ReleaseCallableResponse* response) = 0;

 protected:
  // NOTE: This should only be called by implementations of this
  // interface whose CreateRunStepResponse() method returns a
  // proto-based wrappers for the RunStepResponse message.
  RunStepResponse* get_proto_from_wrapper(
      MutableRunStepResponseWrapper* wrapper) {
    return wrapper->get_proto();
  }
};

具体使用如下,如果 Client 和 Master 在同一个进程,则直接使用 LocalMaster,否则使用 GrpcRemoteMaster 来利用 gRPC 访问远程 GrpcMasterService。图上两个矩形封装的 Master 代表实际的 Master 类,此类实现了具体 Master 功能。

图 1 Master 逻辑结构

2.3 调用

下面的伪代码说明了客户端如何与 master 交互,这其实就是分布式模式之中,使用 GrpcRemoteMaster 来通过 gRPC 与远端 MasterSerivce 服务交互的过程。

stub = NewStub("/job:mnist/replica:0/task:0")
{handle} = stub->CreateSession({graph_def})
  
do {
   stub->RunStep({handle, {feeds}, {fetches}})
   // The client can evaluate a predicate locally, based on the
   // result of fetches, to determine whether to terminate. For
   // example, it might fetch the loss and evaluate whether it is less
   // than some threshold.
} while (!should_stop({fetches}));

stub->CloseSession({handle})

3. LocalMaster

当 Client 调用时候,GrpcSession 使用 LocalMaster 获取本地master,如果没有得到,则才使用 GrpcRemoteMaster。此时 Client 和 master 没有跨节点,LocalMaster 使客户端和master之间能够直接进行进程内通信,这样就可以给同进程内部的Client提供更高效的Master服务。

3.1 定义

LocalMaster 定义如下,主要成员变量就是 master_impl_。LocalMaster 其实就是一个壳而已,直接转发给master_impl_。master_impl_ 是当 Client 和 master 没有跨节点时候,本地直接调用的类。

class LocalMaster : public MasterInterface {
 private:
  Master* master_impl_;  // Not owned.
  const int64 default_timeout_in_ms_;

  // See LocalMaster::Lookup for the factory function that creates
  // objects of this type.
  LocalMaster(Master* master_impl, const int64 default_timeout_in_ms);

  TF_DISALLOW_COPY_AND_ASSIGN(LocalMaster);
};

3.2 注册

LocalMaster 有一个静态变量 local_master_registry_ 用来注册。

typedef std::unordered_map<string, MasterInfo> LocalMasterRegistry;

LocalMasterRegistry* local_master_registry() {
  static LocalMasterRegistry* local_master_registry_ = new LocalMasterRegistry;
  return local_master_registry_;
}

在 GrpcServer 初始化时候,调用如下代码把 target="grpc://" 生成的 Master 注册到本地 LocalMaster。

LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms());

就是把 master 注册到这个static变量 local_master_registry_ 之中。

/* static */
void LocalMaster::Register(const string& target, Master* master,
                           int64 default_timeout_in_ms) {
  mutex_lock l(*get_local_master_registry_lock());
  local_master_registry()->insert(
      {target, MasterInfo(master, default_timeout_in_ms)});
}

3.3 查找

当调用 GrpcSession::Create 方法时候,如果 Client 和 Master 在同一个进程,Lookup 在本地能够找到注册的 Master,则会生成一个 LocalMaster 返回,同时 LocalMaster 的 master_impl_ 就配置成找到的 Master。如果找不到,就返回空,则 GrpcSession::Create 方法会创建一个 GrpcRemoterMaster,这样就同远端 Master 进行交互。

/* static */
std::unique_ptr<LocalMaster> LocalMaster::Lookup(const string& target) {
  std::unique_ptr<LocalMaster> ret;
  mutex_lock l(*get_local_master_registry_lock());
  auto iter = local_master_registry()->find(target);
  if (iter != local_master_registry()->end()) {
    ret.reset(new LocalMaster(iter->second.master,
                              iter->second.default_timeout_in_ms));
  }
  return ret;
}

以下是同一个进程,Lookup 可以找到的情况,生成 LocalMaster 进行本地操作。

图 2 同进程 master 操作

我们看看不同进程的情况。此时进程 1 之中的 LocalMaster 没有指向任何 Master,因为本地没有启动 Server,所以 GrpcSession::Create 方法第一步 Lookup 调用失败,返回 Null,GrpcSession::Create 方法执行第二步骤,创建 GrpcRemoteMaster,进行远程交互。进程 2 之中,LocalMaster 因为没有客户端调用 GrpcSession::Create 方法,所以也没有指向任何 Master。

图 3 跨进程 master 操作

3.4 功能

LocalMaster 调用到其内部成员变量 master_impl_ 来完成业务功能。

Status LocalMaster::CreateSession(CallOptions* call_options,
                                  const CreateSessionRequest* request,
                                  CreateSessionResponse* response) {
  Notification n;
  Status ret;
  master_impl_->CreateSession(request, response, [&n, &ret](const Status& s) {
    ret.Update(s);
    n.Notify();
  });
  TF_RETURN_IF_ERROR(
      WaitForNotification(call_options, default_timeout_in_ms_, &n));
  return ret;
}

Status LocalMaster::ExtendSession(CallOptions* call_options,
                                  const ExtendSessionRequest* request,
                                  ExtendSessionResponse* response) {
  Notification n;
  Status ret;
  master_impl_->ExtendSession(request, response, [&n, &ret](const Status& s) {
    ret.Update(s);
    n.Notify();
  });
  TF_RETURN_IF_ERROR(
      WaitForNotification(call_options, default_timeout_in_ms_, &n));
  return ret;
}

Status LocalMaster::RunStep(CallOptions* call_options,
                            RunStepRequestWrapper* request,
                            MutableRunStepResponseWrapper* response) {
  Notification n;
  Status ret;
  master_impl_->RunStep(call_options, request, response,
                        [&n, &ret](const Status& s) {
                          ret.Update(s);
                          n.Notify();
                        });
  TF_RETURN_IF_ERROR(
      WaitForNotification(call_options, default_timeout_in_ms_, &n));
  return ret;
}

4. GrpcRemoteMaster

GrpcRemoteMaster 是 gRPC 客户端的一种实现, 其终通过 Stub 调用远端 Master 上的 GrpcMasterService 服务,这样调用行为就犹如本地函数调用一样。远端 GrpcMasterService 实现了 MasterService 服务定义的所有接口,是 MasterService 服务的真正实体。当创建 GrpcRemoteMaster 实例时候,需要通过 target 来指定 Master 服务的地址和端口,并且创建对应的 RPC 通道。GrpcSession 和 GrpcRemoteMaster 从严格意义上讲都是 Client 实现的一部分。

4.1 定义

GrpcRemoteMaster 具体定义如下,主要是使用了MasterServiceStub。

// GrpcRemoteMaster is an implementation of the MasterInterface
// that uses gRPC to talk to the Master service.
class GrpcRemoteMaster : public MasterInterface {
  using MasterServiceStub = grpc::MasterService::Stub;

 public:
  explicit GrpcRemoteMaster(const SharedGrpcChannelPtr& client_channel)
      : stub_(grpc::MasterService::NewStub(client_channel)) {}

  ~GrpcRemoteMaster() override {}

  std::unique_ptr<MasterServiceStub> stub_;
};

4.2 功能

GrpcRemoteMaster 的功能很简单,就是通过 gRPC 的一 个 stub 调用远端 Master 服务的相应接口。

4.2.1 CreateSession

我们使用 CreateSession 为例看看,是使用 CallWithRetry 完成功能。

Status CreateSession(CallOptions* call_options,
                     const CreateSessionRequest* request,
                     CreateSessionResponse* response) override {
  return CallWithRetry(call_options, request, response,
                       &MasterServiceStub::CreateSession);
}

CallWithRetry 代码如下,其又是调用 s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response)) 获取 Stub 来完成功能。

template <typename Request, typename Response>
Status CallWithRetry(CallOptions* call_options, const Request* request,
                     Response* response,
                     ::grpc::Status (MasterServiceStub::*pfunc)(
                         ::grpc::ClientContext*, const Request&, Response*),
                     string trace_string = {}) {
  absl::Duration timeout = absl::Milliseconds(call_options->GetTimeout());
  absl::Time expired_time = absl::FromUnixMicros(Env::Default()->NowMicros());
  if (timeout > absl::ZeroDuration()) {
    expired_time += timeout;
  }
  Status s;
  for (int num_retries = 0;; ++num_retries) {
    ::grpc::ClientContext ctx;
    std::unique_ptr<profiler::TraceMe> trace;
    if (!trace_string.empty()) {
      trace.reset(NewTraceRpc(trace_string, &ctx));
    }
    ctx.set_fail_fast(false);
    if (timeout > absl::ZeroDuration()) {
      // We do not modify the timeout here to match legacy behavior. However,
      // this could violate the contract of tensorflow::Session. If we retry
      // an RPC just before the deadline is exceeded, we will still set the
      // timeout to the original value. This leads to the overall timeout
      // being double what was expected.
      ctx.set_deadline(absl::ToChronoTime(absl::Now() + timeout));
    }
    s = FromGrpcStatus((stub_.get()->*pfunc)(&ctx, *request, response));
    if (!errors::IsUnavailable(s)) {
      return s;
    }
    // TODO(b/117162170): we may want to make this configurable.
    constexpr int kMaxRetries = 10;
    if (num_retries >= kMaxRetries) {
      return s;
    }
    absl::Time now = absl::FromUnixMicros(Env::Default()->NowMicros());
    const absl::Time deadline_with_backoff =
        now + absl::Microseconds(ComputeBackoffMicroseconds(num_retries));
    // Wait for a short period of time before retrying the RPC.  If our
    // backoff would put us past the RPC deadline, we truncate it to ensure
    // our RPC starts before the deadline.
    const auto backoff_until = (timeout <= absl::ZeroDuration() ||
                                expired_time > deadline_with_backoff)
                                   ? deadline_with_backoff
                                   : expired_time;
    Env::Default()->SleepForMicroseconds(
        absl::ToInt64Microseconds(backoff_until - now));
    now = absl::FromUnixMicros(Env::Default()->NowMicros());
    if (now > expired_time && timeout > absl::ZeroDuration()) {
      // If timeout_in_ms is set, exit the retry loop on timeout.
      return errors::DeadlineExceeded(ctx.debug_error_string());
    }
  }
}

4.2.2 Master Service Stub

接下来我们看看 Stub,这是依据 "//tensorflow/core/protobuf/master_service.proto" 来使用 grpc 实现的。

class Stub final : public StubInterface {
 public:
  Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
  ::grpc::Status CreateSession(::grpc::ClientContext* context,
                               const CreateSessionRequest& request,
                               CreateSessionResponse* response) override;
  ::grpc::Status ExtendSession(::grpc::ClientContext* context,
                               const ExtendSessionRequest& request,
                               ExtendSessionResponse* response) override;
  ::grpc::Status PartialRunSetup(::grpc::ClientContext* context,
                                 const PartialRunSetupRequest& request,
                                 PartialRunSetupResponse* response) override;
  ::grpc::Status RunStep(::grpc::ClientContext* context,
                         const RunStepRequest& request,
                         RunStepResponse* response) override;
  ::grpc::Status CloseSession(::grpc::ClientContext* context,
                              const CloseSessionRequest& request,
                              CloseSessionResponse* response) override;
  ::grpc::Status ListDevices(::grpc::ClientContext* context,
                             const ListDevicesRequest& request,
                             ListDevicesResponse* response) override;
  ::grpc::Status Reset(::grpc::ClientContext* context,
                       const ResetRequest& request,
                       ResetResponse* response) override;
  ::grpc::Status MakeCallable(::grpc::ClientContext* context,
                              const MakeCallableRequest& request,
                              MakeCallableResponse* response) override;
  ::grpc::Status RunCallable(::grpc::ClientContext* context,
                             const RunCallableRequest& request,
                             RunCallableResponse* response) override;
  ::grpc::Status ReleaseCallable(::grpc::ClientContext* context,
                                 const ReleaseCallableRequest& request,
                                 ReleaseCallableResponse* response) override;

 private:
  std::shared_ptr< ::grpc::ChannelInterface> channel_;
  const ::grpc::internal::RpcMethod rpcmethod_CreateSession_;
  const ::grpc::internal::RpcMethod rpcmethod_ExtendSession_;
  const ::grpc::internal::RpcMethod rpcmethod_PartialRunSetup_;
  const ::grpc::internal::RpcMethod rpcmethod_RunStep_;
  const ::grpc::internal::RpcMethod rpcmethod_CloseSession_;
  const ::grpc::internal::RpcMethod rpcmethod_ListDevices_;
  const ::grpc::internal::RpcMethod rpcmethod_Reset_;
  const ::grpc::internal::RpcMethod rpcmethod_MakeCallable_;
  const ::grpc::internal::RpcMethod rpcmethod_RunCallable_;
  const ::grpc::internal::RpcMethod rpcmethod_ReleaseCallable_;
};

具体远端的对应方法是:

static const char* grpcMasterService_method_names[] = {
    "/tensorflow.MasterService/CreateSession",
    "/tensorflow.MasterService/ExtendSession",
    "/tensorflow.MasterService/PartialRunSetup",
    "/tensorflow.MasterService/RunStep",
    "/tensorflow.MasterService/CloseSession",
    "/tensorflow.MasterService/ListDevices",
    "/tensorflow.MasterService/Reset",
    "/tensorflow.MasterService/MakeCallable",
    "/tensorflow.MasterService/RunCallable",
    "/tensorflow.MasterService/ReleaseCallable",
};

std::unique_ptr<MasterService::Stub> MasterService::NewStub(
    const std::shared_ptr< ::grpc::ChannelInterface>& channel,
    const ::grpc::StubOptions& options) {
  std::unique_ptr<MasterService::Stub> stub(new MasterService::Stub(channel));
  return stub;
}

Stub 内部调用 grpc 完成发送功能。

::grpc::Status MasterService::Stub::CreateSession(
    ::grpc::ClientContext* context, const CreateSessionRequest& request,
    CreateSessionResponse* response) {
  return ::grpc::internal::BlockingUnaryCall(
      channel_.get(), rpcmethod_CreateSession_, context, request, response);
}

所以,如果是 GrpcRemoteMaster,则调用流程应该是:GrpcRemoteMaster 接收到 grpc session 的请求,转交给 grpc master service,这期间经历了 GrpcSession -> GrpcRemoteMaster -> GrpcMasterService -> Master -> MasterSession 一系列流程。

4.3 创建

当建立 GrpcSession 时候,create 方法之中会先查找有没有 Master。如果找到了就直接返回 LocalMaster,这部分我们前面介绍过。如果 Lookup 找不到。所以会调用 NewGrpcMaster 生成一个 GrpcRemoteMaster。

/* static */
Status GrpcSession::Create(const SessionOptions& options,
                           std::unique_ptr<GrpcSession>* out_session) {
  std::unique_ptr<GrpcSession> session(new GrpcSession(options));
  std::unique_ptr<MasterInterface> master;
  // For testing, we enable the client to disable the use of the local
  // master registry, so that the RPC stack is exercised.
  if (!options.config.rpc_options().use_rpc_for_inprocess_master()) {
    master = LocalMaster::Lookup(options.target); 
  }
  if (!master) {
    SharedGrpcChannelPtr master_channel;
    TF_RETURN_IF_ERROR(
        NewHostPortGrpcChannel(options.target.substr(kSchemePrefixLength),
                               &options.config.rpc_options(), &master_channel));
    // 建立 GrpcRemoteMaster,与远端 Master 交互
    master.reset(NewGrpcMaster(master_channel));
  } else {
    session->is_local_ = true;
  }
  session->SetRemoteMaster(std::move(master));
  *out_session = std::move(session);
  return Status::OK();
}

NewGrpcMaster 方法具体如下:

MasterInterface* NewGrpcMaster(const SharedGrpcChannelPtr& channel) {
  return new GrpcRemoteMaster(channel);
}

5. GrpcMasterService

GrpcMasterService 实现了 RPC 对应的 MasterService。GrpcMasterService 会:

  • 预先了解有哪些本地设备可以给客户使用,也会发现远端设备并且跟踪其统计数据。
  • 维护/管理实时计算图会话(MasterSession),这些会话将调用本地或者远端设备来对收到的计算图进行计算。
  • 会话功能是:对收到的计算图进行分析,剪枝,把节点放到可用设备上,通过调用 RunGraph 在工作者上进行图计算。

5.1 创建

GrpcServer 之中,master_service_ 是 GrpcMasterService 类型的变量。

  // 创建 Master 以及对应的 GrpcMasterService
  master_impl_ = CreateMaster(&master_env_);
  master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder);

GrpcServer 使用 master_thread_ 线程来执行 GrpcMasterService 的 HandleRPCsLoop方法。

master_thread_.reset(
    env_->StartThread(ThreadOptions(), "TF_master_service",
                      [this] { master_service_->HandleRPCsLoop(); }));

5.2 定义

GrpcMasterService 定义如下,master_impl_ 是 Server 传入的 master 指针,是一个 Master 类的实例:

class GrpcMasterService : public AsyncServiceInterface {
  Master* master_impl_ = nullptr;  // Not owned.
  std::unique_ptr<::grpc::ServerCompletionQueue> cq_;
  grpc::MasterService::AsyncService master_service_;

  mutex mu_;
  bool is_shutdown_ TF_GUARDED_BY(mu_);
  const ConfigProto default_session_config_;
  ::grpc::Alarm* shutdown_alarm_ = nullptr;

  template <class RequestMessage, class ResponseMessage>
  using MasterCall = Call<GrpcMasterService, grpc::MasterService::AsyncService,
                          RequestMessage, ResponseMessage>;
}

GrpcMasterService 初始化时候,会得到 grpc 的消息队列 cq_。

GrpcMasterService(Master* master, const ConfigProto& default_session_config,
                  ::grpc::ServerBuilder* builder)
    : master_impl_(master),
      is_shutdown_(false),
      default_session_config_(default_session_config) {
  builder->RegisterService(&master_service_);
  cq_ = builder->AddCompletionQueue();
}

5.3 主循环

前面提到了,master_thread_ 线程来执行 GrpcMasterService 的 HandleRPCsLoop 方法。HandleRPCsLoop 会调用 GrpcMasterService 内部函数来进行处理RPC消息。主循环 HandleRPCsLoop 代码如下:

void HandleRPCsLoop() override {
  ENQUEUE_REQUEST(CreateSession, true);
  ENQUEUE_REQUEST(ExtendSession, false);
  for (int i = 0; i < 100; ++i) {
    ENQUEUE_REQUEST(PartialRunSetup, false);
    ENQUEUE_REQUEST(RunStep, true);
  }
  ENQUEUE_REQUEST(CloseSession, false);
  ENQUEUE_REQUEST(ListDevices, false);
  ENQUEUE_REQUEST(Reset, false);
  ENQUEUE_REQUEST(MakeCallable, false);
  for (int i = 0; i < 100; ++i) {
    ENQUEUE_REQUEST(RunCallable, true);
  }
  ENQUEUE_REQUEST(ReleaseCallable, false);

  void* tag;
  bool ok;
  while (cq_->Next(&tag, &ok)) {
    UntypedCall<GrpcMasterService>::Tag* callback_tag =
        static_cast<UntypedCall<GrpcMasterService>::Tag*>(tag);
    if (callback_tag) {
      callback_tag->OnCompleted(this, ok);
    } else {
      // NOTE(mrry): A null callback_tag indicates that this is
      // the shutdown alarm.
      cq_->Shutdown();
    }
  }
}

上面代码之中有一些最佳实践,具体就是围绕 ENQUEUE_REQUEST 做了一些处理:

  • this->cq_ 是 grpc 队列。
  • ENQUEUE_REQUEST 宏会为给定的 RPC 方法名称创建一个新请求(比如 ENQUEUE_REQUEST(GetStatus, false) 就会生成一个 GetStatus 请求),这些请求将在 this->cq_ 之上进行排队。
  • 预先把一定数量的要处理的任务放入 cq_,如果任务被任务响应 handler 调用,则 handler 会调用ENQUEUE_REQUEST() 往队列之中补充一个同样的调用,这样可以确保完成队列 cq_ 有足够的任务来处理传入的请求,这样处理将不会阻塞,整体处理速度会提高。
  • 代码最后的 while 循环将读取 gRPC 队列中的内容,就是 gRPC 调用之后的收尾工作。
#define ENQUEUE_REQUEST(method, supports_cancel)                              \
  do {                                                                        \
    mutex_lock l(mu_);                                                        \
    if (!is_shutdown_) {                                                      \
      Call<GrpcMasterService, grpc::MasterService::AsyncService,              \
           method##Request, method##Response>::                               \
          EnqueueRequest(&master_service_, cq_.get(),                         \
                         &grpc::MasterService::AsyncService::Request##method, \
                         &GrpcMasterService::method##Handler,                 \
                         (supports_cancel));                                  \
    }                                                                         \
  } while (0)

5.4 消息处理

在具体消息响应之中,会调用 master_impl_ 进行处理,当 Master 处理完成之后,处理函数将回调一个 lambda 表达式,向 Client 返回的响应消息。可以看到,代码在最后会使用 ENQUEUE_REQUEST 再插入一个同样类型的请求,比如下面最后会返回给 Client 一个 CreateSessionResponse。

// RPC handler for creating a session.
void CreateSessionHandler(
    MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
  CreateSessionRequest* rewritten_req = new CreateSessionRequest;
  rewritten_req->mutable_config()->MergeFrom(default_session_config_);
  rewritten_req->MergeFrom(call->request);
  master_impl_->CreateSession(rewritten_req, &call->response,
                              [call, rewritten_req](const Status& status) {
                                call->SendResponse(ToGrpcStatus(status));
                                delete rewritten_req;
                              });
  ENQUEUE_REQUEST(CreateSession, true);
}

5.5 功能

GrpcMasterService 提供的 API 如下:

static const char* grpcMasterService_method_names[] = {
    "/tensorflow.MasterService/CreateSession",
    "/tensorflow.MasterService/ExtendSession",
    "/tensorflow.MasterService/PartialRunSetup",
    "/tensorflow.MasterService/RunStep",
    "/tensorflow.MasterService/CloseSession",
    "/tensorflow.MasterService/ListDevices",
    "/tensorflow.MasterService/Reset",
    "/tensorflow.MasterService/MakeCallable",
    "/tensorflow.MasterService/RunCallable",
    "/tensorflow.MasterService/ReleaseCallable",
};

我们举出三个具体功能分析一下:

5.5.1 CreateSession

CreateSessionRequest 消息之中会带有 Client 设定的计算图和配置信息。Master 接收到请求之后,为这个 Client 建立一个 MasterSession 实例,并建立一个唯一地标识该 MasterSession 实例的 session_handle。这是通过 Master 类成员变量 std::unordered_map<string, MasterSession*> sessions_ 来完成的,session_handle 就是 string 类型。

Master 返回消息 CreateSessionResponse 给 Client。CreateSessionResponse 消息中携带:

  • session_handle。Client 的 GrpcSession 据此和 Master 端的 MasterSession 建立关联,后续交互之中,Client 在消息内均会携带此 session_handle,随后,Client 与 Master 的所有交互中,在请求 消息中通过携带 session_handle,Master 通过它在 std::unordered_map<string, MasterSession*> sessions_ 会找到相对应的 MasterSession 实例。
  • 初始 graph_version。用于后续发起 ExtendSession 操作,往原始的计算图中追加新的节点。

图 4 CreateSession

具体响应代码如下:

// RPC handler for creating a session.
void CreateSessionHandler(
    MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
  CreateSessionRequest* rewritten_req = new CreateSessionRequest;
  rewritten_req->mutable_config()->MergeFrom(default_session_config_);
  rewritten_req->MergeFrom(call->request);
  master_impl_->CreateSession(rewritten_req, &call->response,
                              [call, rewritten_req](const Status& status) {
                                call->SendResponse(ToGrpcStatus(status));
                                delete rewritten_req;
                              });
  ENQUEUE_REQUEST(CreateSession, true);
}

5.5.2 ExtendSession

当建立 Session 之后,Client 可以通过 ExtendSession 告诉 Master 我需要拓展原有计算图的规模 (只能追加子图,不能修改或删除)。

在请求消息 ExtendSessionRequest 中有:

  • session_handle :用来查找哪一个 MasterSession 实例;
  • graph_def :需要加到计算图上的节点;
  • current_graph_version :需要拓展的计算图版本号;

在在响应消息 ExtendSessionResponse 中返回 new_graph_version,其用于下一此 ExtendSession 操作。

图 5 ExtendSession

具体代码如下:

// RPC handler for extending a session.
void ExtendSessionHandler(
    MasterCall<ExtendSessionRequest, ExtendSessionResponse>* call) {
  master_impl_->ExtendSession(&call->request, &call->response,
                              [call](const Status& status) {
                                call->SendResponse(ToGrpcStatus(status));
                              });
  ENQUEUE_REQUEST(ExtendSession, false);
}

5.5.3 RunStep

客户端会迭代执行 RunStep,请求消息 RunStepRequest 的变量较多,比如:

  • session_handle :用来查找哪一个 MasterSession 实例;
  • feed :输入的 NamedTensor 列表;
  • fetch :待输出 Tensor 的名称列表;
  • target :执行节点列表;

响应消息 RunStepResponse 主要携带:

  • tensor :输出的 Tensor 列表;

图 6 RunStep

消息定义具体如下:

message RunStepRequest {
  // REQUIRED: session_handle must be returned by a CreateSession call
  // to the same master service.
  string session_handle = 1;

  // Tensors to be fed in the step. Each feed is a named tensor.
  repeated NamedTensorProto feed = 2;

  // Fetches. A list of tensor names. The caller expects a tensor to
  // be returned for each fetch[i] (see RunStepResponse.tensor). The
  // order of specified fetches does not change the execution order.
  repeated string fetch = 3;

  // Target Nodes. A list of node names. The named nodes will be run
  // to but their outputs will not be fetched.
  repeated string target = 4;

  // Options for the run call.
  RunOptions options = 5;

  // Partial run handle (optional). If specified, this will be a partial run
  // execution, run up to the specified fetches.
  string partial_run_handle = 6;

  // If true then some errors, e.g., execution errors that have long
  // error messages, may return an OK RunStepResponse with the actual
  // error saved in the status_code/status_error_message fields of the
  // response body. This is a workaround since the RPC subsystem may
  // truncate long metadata messages.
  bool store_errors_in_response_body = 7;

  // Unique identifier for this request. Every RunStepRequest must
  // have a unique request_id, and retried RunStepRequest must have
  // the same request_id. If request_id is zero, retry detection is disabled.
  int64 request_id = 8;
}

message RunStepResponse {
  // NOTE: The order of the returned tensors may or may not match
  // the fetch order specified in RunStepRequest.
  repeated NamedTensorProto tensor = 1;

  // Returned metadata if requested in the options.
  RunMetadata metadata = 2;

  // If store_errors_in_response_body is true in the request, then
  // optionally the server may return an OK status for the RPC and
  // fill the true status into the fields below, to allow for messages
  // that are too long to fit in metadata.
  error.Code status_code = 3;
  string status_error_message = 4;
}

具体代码如下:

// RPC handler for running one step in a session.
void RunStepHandler(MasterCall<RunStepRequest, RunStepResponse>* call) {
  auto* trace = TraceRpc("RunStep/Server", call->client_metadata());
  CallOptions* call_opts = new CallOptions;
  if (call->request.options().timeout_in_ms() > 0) {
    call_opts->SetTimeout(call->request.options().timeout_in_ms());
  } else {
    call_opts->SetTimeout(default_session_config_.operation_timeout_in_ms());
  }
  RunStepRequestWrapper* wrapped_request =
      new ProtoRunStepRequest(&call->request);
  MutableRunStepResponseWrapper* wrapped_response =
      new NonOwnedProtoRunStepResponse(&call->response);
  call->SetCancelCallback([call_opts]() { call_opts->StartCancel(); });
  master_impl_->RunStep(
      call_opts, wrapped_request, wrapped_response,
      [call, call_opts, wrapped_request, trace](const Status& status) {
        call->ClearCancelCallback();
        delete call_opts;
        delete wrapped_request;
        delete trace;
        if (call->request.store_errors_in_response_body() && !status.ok()) {
          call->response.set_status_code(status.code());
          call->response.set_status_error_message(status.error_message());
          call->SendResponse(ToGrpcStatus(Status::OK()));
        } else {
          call->SendResponse(ToGrpcStatus(status));
        }
      });
  ENQUEUE_REQUEST(RunStep, true);
}

6. 业务实现 Master 类

6.1 创建

前面提到了,GrpcServer 之中建立的是 Master 类的实例。

std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) {
  return std::unique_ptr<Master>(new Master(master_env, 0.0));
}

这样,在收到 Client 的消息后,在具体消息响应之中,GrpcMasterService 的线程会调用 master_impl_ 进行处理,就是把业务逻辑委托给 Master 类来实现。所以我们接下来就看看 Master 如何处理。

// RPC handler for creating a session.
void CreateSessionHandler(
    MasterCall<CreateSessionRequest, CreateSessionResponse>* call) {
  CreateSessionRequest* rewritten_req = new CreateSessionRequest;
  rewritten_req->mutable_config()->MergeFrom(default_session_config_);
  rewritten_req->MergeFrom(call->request);
  master_impl_->CreateSession(rewritten_req, &call->response,
                              [call, rewritten_req](const Status& status) {
                                call->SendResponse(ToGrpcStatus(status));
                                delete rewritten_req;
                              });
  ENQUEUE_REQUEST(CreateSession, true);
}

6.2 定义

Master 其实不是 MasterInterface 的派生类,其定义在tensorflow/core/distributed_runtime/master.cc。可以从成员变量 sessions_ 上看出来,主要就是管理 MasterSession。

class Master {

 private:
  typedef Master ME;

  // Not owned.
  MasterEnv* env_ = nullptr;

  // Owned.
  mutex mu_;

  // shutdown_ is set to true by the dtor.
  condition_variable shutdown_cv_;
  bool shutdown_ TF_GUARDED_BY(mu_) = false;
  Thread* gc_thread_;

  // Maps session handles to sessions.
  std::unordered_map<string, MasterSession*> sessions_ TF_GUARDED_BY(mu_);

  // Moving average of step times.
  MovingAverage last_1000_steps_ TF_GUARDED_BY(mu_);

  // Cumulative number of steps executed.
  int64 step_count_ TF_GUARDED_BY(mu_);

  // If a session is not active for this many seconds, it will be
  // closed automatically.
  const double session_gc_seconds_;

  // Used to track ids for incoming requests so we can detect duplicates.
  RecentRequestIds recent_request_ids_;
};

6.3 功能

我们回忆一下之前提到的。

分布式运行的核心是如何操作计算图,但是计算功能被拆分为 Client,Master 和 Worker 三个角色。

Client 负责构造计算图,Worker 负责执行具体计算,但是 Worker 怎么知道应该计算什么?TensorFlow 在两者之间插入了一个 Master 角色来负责协调,调度。

虽然 Master 不是 MasterInterface 的派生类,但时其实现了 MasterService 的具体业务。Master 具体负责:

  • Master 预先知道本地有哪些设备可以作为客户使用的设备,也会发现远程设备,并跟踪这些远程设备的统计数据。
  • 一个 Master 包含多个 "主会话(master sessions)"。每个 master sessions 封装了一个计算图及其相关状态。
  • 主会话将:
    • 精简优化计算图,比如剪枝/分割/插入发送和接受算子。
    • 协调/调度资源。比如哪个计算应该在哪个设备运行,具体就是按照 graph -> Partition -> Device 这个策略把子图划分到硬件设备之上。
    • 把分割之后的各个子图发送给各个 worker,具体每一个子图对应一个 MasterSession。并最终通过在工作者上启动 RunGraph 来驱动图的计算。
  • Master 维护实时图计算会话的状态。

至此,Master 的静态结构我们已经介绍完毕,具体 Master 功能我们将在后文 Session 部分进行具体介绍。

最后,强烈推荐两个大神:

0xFF 参考

TensorFlow Internals

TensorFlow架构与设计:概述

TensorFlow内核剖析

TensorFlow架构与设计:OP本质论

[译] TensorFlow 白皮书

2017TensorFlow开发者峰会

https://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/

TensorFlow 拆包(五):Distributed

TensorFlow Architecture

『深度长文』Tensorflow代码解析(五)

什么是in-graph replication和between-graph replication?

[腾讯机智] TensorFlow源码解析(1): 创建会话

05tensorflow分布式会话

第八节,配置分布式TensorFlow

TensorFlow 分布式(Distributed TensorFlow)

tensorflow源码解析之distributed_runtime

Distributed TensorFlow: A Gentle Introduction

一文说清楚Tensorflow分布式训练必备知识

TensorFlow中的Placement启发式算法模块——Placer

TensorFlow的图切割模块——Graph Partitioner

TensorFlow中的通信机制——Rendezvous(一)本地传输

TensorFlow分布式采坑记

TensorFlow技术内幕(九):模型优化之分布式执行

Tensorflow架构流程]


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK