41

Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650228889&%3Bidx=1&%3Bsn=19fa5502ebc68e5d7d0608540a538708
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击关注上方“ 知了小巷 ”,

设为“置顶或星标”,第一时间送达干货。

ApplicationMaster<-->ResourceManager

“通用”YARN应用涉及的角色及交互:

RM:ResourceManager

AM:ApplicationMaster

NM:NodeManager

交互中用到的主要通信协议:

ApplicationClientProtocol

ApplicationMasterProtocol

ContainerManagementProtocol

Client<-->ResourceManager

客户端程序与RM进行交互,通过YarnClient对象来实现。

ApplicationMaster<-->ResourceManager

AM与RM进行交互,通过AMRMClientAsync对象来实现,

AMRMClientAsync.CallbackHandler异步处理事件信息。

ApplicationMaster<-->NodeManager

AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,

NMClientAsync.CallbackHandler异步处理Container事件。

接口请求和响应的proto message定义:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。

Hadoop版本3.2.1

Flink版本1.10

1.以Flink中Yarn per-job模式下

JobManager------

进程YarnJobClusterEntrypoint为例

// 起点是 YarnJobClusterEntrypoint#main 方法

// 落点是 YarnResourceManager

ANFJBbv.png!web

/**

* The yarn implementation of the resource manager. Used when the system is started

* via the resource framework YARN.

*/

public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>

implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {

// 传说中的ApplicationMaster

...

/** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */

private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;


/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */

private NMClientAsync nodeManagerClient;

...

}

AMRMClientAsync

abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。

需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。

2.简单实例MyCallbackHandler

IBzmqe7.png!web

AMRMClientAsync客户端生命周期

IVj2myj.png!web

3.AMRMClientAsync部分源码

RbYjU3N.png!web

jMNnAnz.png!web

4.AMRMClientAsyncImpl部分源码

Efq2Qry.png!web

5.AMRMClient部分源码

package org.apache.hadoop.yarn.client.api;


import ...


// 抽象类AMRMClient

public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends

AbstractService {

...

}

6.AMRMClientImpl部分源码

MfIzYjZ.png!web

7.ApplicationMasterProtocol部分源码

ApplicationMasterProtocol接口比较简单,只有三个方法

package org.apache.hadoop.yarn.api;


import ...

// 接口ApplicationMasterProtocol

public interface ApplicationMasterProtocol {

// 向RM注册自己(AM)

public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)

throws YarnException, IOException;

// 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了

public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)

throws YarnException, IOException;

// AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse

// 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager)

// 最多执行一次,不会重复和过度分配

public AllocateResponse allocate(AllocateRequest request)

throws YarnException, IOException;

}

8.ApplicationMasterProtocolPBClientImpl部分源码

package org.apache.hadoop.yarn.api.impl.pb.client;


import ...


// 客户端ApplicationMasterProtocol接口的实现

public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {


private ApplicationMasterProtocolPB proxy;


public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,

Configuration conf) throws IOException {

RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);

// 底层会调用java.lang.reflect.Proxy#newProxyInstance

proxy =

(ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,

addr, conf);

}

...

}

9.ApplicationMasterProtocolPB

package org.apache.hadoop.yarn.api;


import ...


@Private

@Unstable

@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB",

protocolVersion = 1)

public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {


}

10.ApplicationMasterProtocolService的定义

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto

option java_package = "org.apache.hadoop.yarn.proto";

option java_outer_classname = "ApplicationMasterProtocol";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

package hadoop.yarn;


import "yarn_service_protos.proto";


service ApplicationMasterProtocolService {

rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);

rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);

rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);

}

11.ApplicationMasterProtocolPBServiceImpl部分源码

ApplicationMasterProtocolPB接口的服务端(RM)实现

package org.apache.hadoop.yarn.api.impl.pb.service;


import ...


@Private

public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {


private ApplicationMasterProtocol real;

// ResourceManager启动时会通过此构造方法初始化real对象

public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) {

this.real = impl;

}

...

}

12.ApplicationMasterService部分源码

package org.apache.hadoop.yarn.server.resourcemanager;


import ...


@SuppressWarnings("unchecked")

@Private

public class ApplicationMasterService extends AbstractService implements

ApplicationMasterProtocol {

// 最终会调用到这里的方法并返回结果

...

}

【END】

nIzeUzu.png!web 

mInQFzm.gif


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK