0

【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC - 大数据王小皮

 1 year ago
source link: https://www.cnblogs.com/shuofxz/p/16874715.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RPC(Remote Procedure Call) 是 Hadoop 服务通信的关键库,支撑上层分布式环境下复杂的进程间(Inter-Process Communication, IPC)通信逻辑,是分布式系统的基础。允许运行于一台计算机上的程序像调用本地方法一样,调用另一台计算机的子程序。
由于 RPC 服务整体知识较多,本节仅针对对 Yarn RPC 进行简略介绍,详细内容会后续开专栏介绍。

一、RPC 通信模型介绍

为什么会有 RPC 框架?
在分布式或微服务情境下,会有大量的服务间交互,如果用传统的 HTTP 协议端口来通信,需要耗费大量时间处理网络数据交换上,还要考虑编解码等问题。如下图所示。

image.png
  • 客户端通过 RPC 框架的动态代理得到一个代理类实例,称为 Stub(桩)
  • 客户端调用接口方法(实际是 Stub 对应的方法),Stub 会构造一个请求,包括函数名和参数
  • 服务端收到这个请求后,先将服务名(函数)解析出来,查找是否有对应的服务提供者
  • 服务端找到对应的实现类后,会传入参数调用
  • 服务端 RPC 框架得到返回结果后,再进行封装返回给客户端
  • 客户端的 Stub 收到返回值后,进行解析,返回给调用者,完成 RPC 调用。

二、Hadoop RPC 介绍

Hadoop RPC 是 Hadoop 自己实现的一个 RPC 框架,主要有以下几个特点:

  • 透明性:像调用本地方法一样调用远程方法。
  • 高性能:Hadoop 各个系统均采用 Master/Slave 结构,Master 是一个 RPC Server 用于处理各个 Slave 节点发送的请求,需要有高性能。
  • 可控性:由于 JDK 中的 RPC 框架 RMI 重量级过大,且封装度太高,不方便控制和修改。因此实现了自己的 RPC 框架,以保证轻量级、高性能、可控性。

框架原理和整体执行流程与第一节介绍的 RPC 框架一致,感兴趣可深入源码进行了解。

二)总体架构

Hadoop RPC 架构底层依靠 Java 的 nio、反射、动态代理等功能实现「客户端 - 服务器(C/S)」通信模型。
上层封装供程序调用的 RPC 接口。

image.png

三、案例 demo

下面两个案例的 demo 已上传至 github。有帮助的话点个⭐️。
https://github.com/Simon-Ace/hadoop_rpc_demo

一)RPC Writable 案例实现

1、新建一个 maven 工程,添加依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.8.5</version>
</dependency>

2、定义 RPC 协议

public interface BusinessProtocol {
    void mkdir(String path);
    String getName(String name);
    long versionID = 345043000L;
}

3、定义协议实现

public class BusinessIMPL implements BusinessProtocol {
    @Override
    public void mkdir(String path) {
        System.out.println("成功创建了文件夹 :" + path);
    }

    @Override
    public String getName(String name) {
        System.out.println("成功打了招呼: hello :" + name);
        return "bigdata";
    }
}

4、通过 Hadoop RPC 构建一个 RPC 服务端

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

public class MyServer {
    public static void main(String[] args) {
        try {
            // 构建一个 RPC server 端,提供了一个 BussinessProtocol 协议的 BusinessIMPL 服务实现
            RPC.Server server = new RPC.Builder(new Configuration())
                    .setProtocol(BusinessProtocol.class)
                    .setInstance(new BusinessIMPL())
                    .setBindAddress("localhost")
                    .setPort(6789)
                    .build();

            server.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

5、构建一个 RPC 客户端

import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.net.InetSocketAddress;

public class MyClient {
    public static void main(String[] args) {
        try {
        	// 获取代理类实例,也就是 Stub
            BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,
                    new InetSocketAddress("localhost", 6789), new Configuration());

            // 通过 Stub 发送请求,实际使用就像调用本地方法一样
            proxy.mkdir("/tmp/ABC");
            String res = proxy.getName("Simon");
            System.out.println("从 RPC 服务端接收到的返回值:" + res);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

6、测试,先启动服务端,再启动客户端
服务端输出

成功创建了文件夹 :/tmp/ABC
成功打了招呼: hello :Simon

客户端输出

从 RPC 服务端接收到的返回值:bigdata

二)RPC Protobuf 案例实现

项目结构如下

image.png

对 proto 文件格式不熟悉的同学,参考上一篇文章《2-1 Yarn 基础库概述》

MyResourceTrackerMessage.proto 定义数据格式

syntax = "proto3";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerMessageProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

message MyRegisterNodeManagerRequestProto {
    string hostname = 1;
    int32 cpu = 2;
    int32 memory = 3;
}

message MyRegisterNodeManagerResponseProto {
    string flag = 1;
}

MyResourceTracker.proto 定义 rpc 接口

syntax = "proto3";

import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

service MyResourceTrackerService {
    rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}

2、对 proto 文件编译,生成 java 类

# 在项目根目录执行,路径按照自己的进行修改
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto

protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto

3、定义调用方法接口 MyResourceTracker

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;

public interface MyResourceTracker {
    MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}

4、对调用方法接口的实现(服务端)

import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;

public class MyResourceTrackerImpl implements MyResourceTracker {
    @Override
    public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
            MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {

        // 输出注册的消息
        String hostname = request.getHostname();
        int cpu = request.getCpu();
        int memory = request.getMemory();
        System.out.println("NodeManager 的注册消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);

        // 省略处理逻辑
        // 构建一个响应对象,用于返回
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =
                MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();
        // 直接返回 True
        builder.setFlag("true");
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();
        return response;
    }
}

5、编写 proto 的协议接口

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import org.apache.hadoop.ipc.ProtocolInfo;

@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {
}

6、编写 proto 的协议接口实现(服务端)

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;

public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {
    final private MyResourceTracker server;

    public MyResourceTrackerServerSidePB(MyResourceTracker server) {
        this.server = server;
    }

    @Override
    public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
            RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {
        try {
            return server.registerNodeManager(request);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

7、RPC Server 的实现

import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;

import java.io.IOException;

public class ProtobufRpcServer {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();

        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // 构建 Rpc Server
        RPC.Server server = new RPC.Builder(conf)
                .setProtocol(MyResourceTrackerPB.class)
                .setInstance(MyResourceTrackerProto.MyResourceTrackerService
                        .newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))
                .setBindAddress("localhost")
                .setPort(9998)
                .setNumHandlers(1)
                .setVerbose(true)
                .build();

        // Rpc Server 启动
        server.start();
    }
}

8、RPC Client 的实现

import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;

import java.io.IOException;
import java.net.InetSocketAddress;

public class ProtobufRpcClient {
    public static void main(String[] args) throws IOException {
        // 设置 RPC 引擎为 ProtobufRpcEngine
        Configuration conf = new Configuration();
        String hostname = "localhost";
        int port = 9998;
        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // 获取代理
        MyResourceTrackerPB protocolProxy = RPC
                .getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);

        // 构建请求对象
        MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =
                MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();
        MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =
                builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();

        // 发送 RPC 请求,获取响应
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;
        try {
            response = protocolProxy.registerNodeManager(null, bigdata02);
        } catch (ServiceException e) {
            e.printStackTrace();
        }

        // 处理响应
        String flag = response.getFlag();
        System.out.println("最终注册结果: flag = " + flag);
    }
}

9、测试
先启动服务端,在启动客户端。

本节介绍了 Hadoop 底层通信库 RPC。首先介绍了 RPC 的框架和原理,之后对 Hadoop 自己实现的 RPC 进行了介绍,并给出了两个 demo 实践。
强烈建议了解基础知识后,跟着 demo 实现一个案例出来,可以更好的帮助你理解。
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo


参考文章:
YARN-RPC网络通信架构设计
YARN-高并发RPC源码实现
Hadoop3.2.1 【 HDFS 】源码分析 : RPC原理 [八] Client端实现&源码
Hadoop RPC机制详解
Hadoop2源码分析-RPC探索实战
《Hadoop 技术内幕 - 深入解析 Yarn 结构设计与实现原理》3.3 节

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK