Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析
source link: http://mp.weixin.qq.com/s?__biz=MzA4NzA5NzE5Ng%3D%3D&%3Bmid=2650228889&%3Bidx=1&%3Bsn=19fa5502ebc68e5d7d0608540a538708
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
点击关注上方“ 知了小巷 ”,
设为“置顶或星标”,第一时间送达干货。
ApplicationMaster<-->ResourceManager
“通用”YARN应用涉及的角色及交互:
RM:ResourceManager
AM:ApplicationMaster
NM:NodeManager
交互中用到的主要通信协议:
ApplicationClientProtocol
ApplicationMasterProtocol
ContainerManagementProtocol
Client<-->ResourceManager
客户端程序与RM进行交互,通过YarnClient对象来实现。
ApplicationMaster<-->ResourceManager
AM与RM进行交互,通过AMRMClientAsync对象来实现,
AMRMClientAsync.CallbackHandler异步处理事件信息。
ApplicationMaster<-->NodeManager
AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,
NMClientAsync.CallbackHandler异步处理Container事件。
接口请求和响应的proto message定义:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。
Hadoop版本3.2.1
Flink版本1.10
1.以Flink中Yarn per-job模式下
JobManager------
进程YarnJobClusterEntrypoint为例
// 起点是 YarnJobClusterEntrypoint#main 方法
// 落点是 YarnResourceManager
/**
* The yarn implementation of the resource manager. Used when the system is started
* via the resource framework YARN.
*/
public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>
implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
// 传说中的ApplicationMaster
...
/** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */
private NMClientAsync nodeManagerClient;
...
}
AMRMClientAsync
abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。
需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。
2.简单实例MyCallbackHandler
AMRMClientAsync客户端生命周期
3.AMRMClientAsync部分源码
4.AMRMClientAsyncImpl部分源码
5.AMRMClient部分源码
package org.apache.hadoop.yarn.client.api;
import ...
// 抽象类AMRMClient
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
...
}
6.AMRMClientImpl部分源码
7.ApplicationMasterProtocol部分源码
ApplicationMasterProtocol接口比较简单,只有三个方法
package org.apache.hadoop.yarn.api;
import ...
// 接口ApplicationMasterProtocol
public interface ApplicationMasterProtocol {
// 向RM注册自己(AM)
public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException;
// 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了
public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)
throws YarnException, IOException;
// AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse
// 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager)
// 最多执行一次,不会重复和过度分配
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;
}
8.ApplicationMasterProtocolPBClientImpl部分源码
package org.apache.hadoop.yarn.api.impl.pb.client;
import ...
// 客户端ApplicationMasterProtocol接口的实现
public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {
private ApplicationMasterProtocolPB proxy;
public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);
// 底层会调用java.lang.reflect.Proxy#newProxyInstance
proxy =
(ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,
addr, conf);
}
...
}
9.ApplicationMasterProtocolPB
package org.apache.hadoop.yarn.api;
import ...
@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB",
protocolVersion = 1)
public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {
}
10.ApplicationMasterProtocolService的定义
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "ApplicationMasterProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_service_protos.proto";
service ApplicationMasterProtocolService {
rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);
}
11.ApplicationMasterProtocolPBServiceImpl部分源码
ApplicationMasterProtocolPB接口的服务端(RM)实现
package org.apache.hadoop.yarn.api.impl.pb.service;
import ...
@Private
public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {
private ApplicationMasterProtocol real;
// ResourceManager启动时会通过此构造方法初始化real对象
public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) {
this.real = impl;
}
...
}
12.ApplicationMasterService部分源码
package org.apache.hadoop.yarn.server.resourcemanager;
import ...
@SuppressWarnings("unchecked")
@Private
public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol {
// 最终会调用到这里的方法并返回结果
...
}
【END】
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK