

【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC - 大数据王小皮
source link: https://www.cnblogs.com/shuofxz/p/16874715.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RPC(Remote Procedure Call) 是 Hadoop 服务通信的关键库,支撑上层分布式环境下复杂的进程间(Inter-Process Communication, IPC)通信逻辑,是分布式系统的基础。允许运行于一台计算机上的程序像调用本地方法一样,调用另一台计算机的子程序。
由于 RPC 服务整体知识较多,本节仅针对对 Yarn RPC 进行简略介绍,详细内容会后续开专栏介绍。
一、RPC 通信模型介绍
为什么会有 RPC 框架?
在分布式或微服务情境下,会有大量的服务间交互,如果用传统的 HTTP 协议端口来通信,需要耗费大量时间处理网络数据交换上,还要考虑编解码等问题。如下图所示。
- 客户端通过 RPC 框架的动态代理得到一个代理类实例,称为 Stub(桩)
- 客户端调用接口方法(实际是 Stub 对应的方法),Stub 会构造一个请求,包括函数名和参数
- 服务端收到这个请求后,先将服务名(函数)解析出来,查找是否有对应的服务提供者
- 服务端找到对应的实现类后,会传入参数调用
- 服务端 RPC 框架得到返回结果后,再进行封装返回给客户端
- 客户端的 Stub 收到返回值后,进行解析,返回给调用者,完成 RPC 调用。
二、Hadoop RPC 介绍
Hadoop RPC 是 Hadoop 自己实现的一个 RPC 框架,主要有以下几个特点:
- 透明性:像调用本地方法一样调用远程方法。
- 高性能:Hadoop 各个系统均采用 Master/Slave 结构,Master 是一个 RPC Server 用于处理各个 Slave 节点发送的请求,需要有高性能。
- 可控性:由于 JDK 中的 RPC 框架 RMI 重量级过大,且封装度太高,不方便控制和修改。因此实现了自己的 RPC 框架,以保证轻量级、高性能、可控性。
框架原理和整体执行流程与第一节介绍的 RPC 框架一致,感兴趣可深入源码进行了解。
二)总体架构
Hadoop RPC 架构底层依靠 Java 的 nio、反射、动态代理等功能实现「客户端 - 服务器(C/S)」通信模型。
上层封装供程序调用的 RPC 接口。
三、案例 demo
下面两个案例的 demo 已上传至 github。有帮助的话点个⭐️。
https://github.com/Simon-Ace/hadoop_rpc_demo
一)RPC Writable 案例实现
1、新建一个 maven 工程,添加依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
2、定义 RPC 协议
public interface BusinessProtocol {
void mkdir(String path);
String getName(String name);
long versionID = 345043000L;
}
3、定义协议实现
public class BusinessIMPL implements BusinessProtocol {
@Override
public void mkdir(String path) {
System.out.println("成功创建了文件夹 :" + path);
}
@Override
public String getName(String name) {
System.out.println("成功打了招呼: hello :" + name);
return "bigdata";
}
}
4、通过 Hadoop RPC 构建一个 RPC 服务端
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
public class MyServer {
public static void main(String[] args) {
try {
// 构建一个 RPC server 端,提供了一个 BussinessProtocol 协议的 BusinessIMPL 服务实现
RPC.Server server = new RPC.Builder(new Configuration())
.setProtocol(BusinessProtocol.class)
.setInstance(new BusinessIMPL())
.setBindAddress("localhost")
.setPort(6789)
.build();
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
5、构建一个 RPC 客户端
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.net.InetSocketAddress;
public class MyClient {
public static void main(String[] args) {
try {
// 获取代理类实例,也就是 Stub
BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,
new InetSocketAddress("localhost", 6789), new Configuration());
// 通过 Stub 发送请求,实际使用就像调用本地方法一样
proxy.mkdir("/tmp/ABC");
String res = proxy.getName("Simon");
System.out.println("从 RPC 服务端接收到的返回值:" + res);
} catch (IOException e) {
e.printStackTrace();
}
}
}
6、测试,先启动服务端,再启动客户端
服务端输出
成功创建了文件夹 :/tmp/ABC
成功打了招呼: hello :Simon
客户端输出
从 RPC 服务端接收到的返回值:bigdata
二)RPC Protobuf 案例实现
对 proto 文件格式不熟悉的同学,参考上一篇文章《2-1 Yarn 基础库概述》
MyResourceTrackerMessage.proto
定义数据格式
syntax = "proto3";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerMessageProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
message MyRegisterNodeManagerRequestProto {
string hostname = 1;
int32 cpu = 2;
int32 memory = 3;
}
message MyRegisterNodeManagerResponseProto {
string flag = 1;
}
MyResourceTracker.proto
定义 rpc 接口
syntax = "proto3";
import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
service MyResourceTrackerService {
rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}
2、对 proto 文件编译,生成 java 类
# 在项目根目录执行,路径按照自己的进行修改
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto
3、定义调用方法接口 MyResourceTracker
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;
public interface MyResourceTracker {
MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}
4、对调用方法接口的实现(服务端)
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
public class MyResourceTrackerImpl implements MyResourceTracker {
@Override
public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {
// 输出注册的消息
String hostname = request.getHostname();
int cpu = request.getCpu();
int memory = request.getMemory();
System.out.println("NodeManager 的注册消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);
// 省略处理逻辑
// 构建一个响应对象,用于返回
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();
// 直接返回 True
builder.setFlag("true");
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();
return response;
}
}
5、编写 proto 的协议接口
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import org.apache.hadoop.ipc.ProtocolInfo;
@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {
}
6、编写 proto 的协议接口实现(服务端)
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {
final private MyResourceTracker server;
public MyResourceTrackerServerSidePB(MyResourceTracker server) {
this.server = server;
}
@Override
public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {
try {
return server.registerNodeManager(request);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
7、RPC Server 的实现
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import java.io.IOException;
public class ProtobufRpcServer {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);
// 构建 Rpc Server
RPC.Server server = new RPC.Builder(conf)
.setProtocol(MyResourceTrackerPB.class)
.setInstance(MyResourceTrackerProto.MyResourceTrackerService
.newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))
.setBindAddress("localhost")
.setPort(9998)
.setNumHandlers(1)
.setVerbose(true)
.build();
// Rpc Server 启动
server.start();
}
}
8、RPC Client 的实现
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import java.io.IOException;
import java.net.InetSocketAddress;
public class ProtobufRpcClient {
public static void main(String[] args) throws IOException {
// 设置 RPC 引擎为 ProtobufRpcEngine
Configuration conf = new Configuration();
String hostname = "localhost";
int port = 9998;
RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);
// 获取代理
MyResourceTrackerPB protocolProxy = RPC
.getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);
// 构建请求对象
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();
MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =
builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();
// 发送 RPC 请求,获取响应
MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;
try {
response = protocolProxy.registerNodeManager(null, bigdata02);
} catch (ServiceException e) {
e.printStackTrace();
}
// 处理响应
String flag = response.getFlag();
System.out.println("最终注册结果: flag = " + flag);
}
}
9、测试
先启动服务端,在启动客户端。
本节介绍了 Hadoop 底层通信库 RPC。首先介绍了 RPC 的框架和原理,之后对 Hadoop 自己实现的 RPC 进行了介绍,并给出了两个 demo 实践。
强烈建议了解基础知识后,跟着 demo 实现一个案例出来,可以更好的帮助你理解。
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo
参考文章:
YARN-RPC网络通信架构设计
YARN-高并发RPC源码实现
Hadoop3.2.1 【 HDFS 】源码分析 : RPC原理 [八] Client端实现&源码
Hadoop RPC机制详解
Hadoop2源码分析-RPC探索实战
《Hadoop 技术内幕 - 深入解析 Yarn 结构设计与实现原理》3.3 节
__EOF__
Recommend
-
2
了解 Yarn 基础库是后面阅读 Yarn 源码的基础,本节对 Yarn 基础库做总体的介绍。并对其中使用的第三方库 Protocol Buffers 和 Avro 是什么、怎么用做简要的介绍。 一、主要使用的库 Protocol Buffers
-
6
一个庞大的分布式系统,各个组件间是如何协调工作的?组件是如何解耦的?线程运行如何更高效,减少阻塞带来的低效问题?本节将对 Yarn 的服务库和事件库进行介绍,看看 Yarn 是如何解决这些问题的。 一、服务库 对于生命周期较长的对...
-
3
当一个服务拥有太多处理逻辑时,会导致代码结构异常的混乱,很难分辨一段逻辑是在哪个阶段发挥作用的。 这时就可以引入状态机模型,帮助代码结构变得清晰。 一、状态机库概述 状态机由一组状态组成: 【初始状态 -...
-
13
本篇文章继续介绍 Yarn Application 中 ApplicationMaster 部分的编写方法。 一、Application Master 编写方法 上一节讲了 Client 提交任务给 RM 的全流程,RM 收到任务后,由 ApplicationsManager...
-
9
前面几篇文章对 Yarn 基本架构、程序基础库、应用设计方法等进行了介绍。之后几篇将开始对 Yarn 核心组件进行剖析。 ResourceManager(RM)是 Yarn 的核心管理服务,负责集群管理、任务调度、状态机管理等,本篇将对 RM 总体架构进行介绍。
-
13
本篇继续对 RM 中管理 NodeManager 的部分进行深入的讲解。主要有三个部分:检查 NM 是否存活;管理 NM 的黑白名单;响应 NM RPC 请求。 一、简介
-
10
在 YARN 中,Application 是指应用程序,它可能启动多个运行实例,每个运行实例由 —个 ApplicationMaster 与一组该 ApplicationMaster 启动的任务组成,它拥有名称、队列、优先级等属性,是一个比较宽泛的概念,可以是一个 MepReduce 作业、一个 DAG 应用程序等。YAR...
-
7
本节开始将对 Yarn 中的 NodeManager 服务进行剖析。 NodeManager 需要在每个计算节点上运行,与 ResourceManager 和 ApplicationMaster 进行交互。管理节点的计算资源以及调度容器。后续将对 NM 的功能职责、状态机、容器生命周期和资源隔离等方面进行讲解。...
-
2
一、简介# NodeManager(NM)中的状态机分为三类:Application、Container 和 LocalizedResource,它们均直接或者间接参与维护一...
-
8
不要跳过这部分知识,对了解 NodeManager 本地目录结构,和熟悉 Container 启动流程有帮助。 一、分布式缓存介绍
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK