3

深入浅出掌握grpc通信框架

 1 year ago
source link: https://blog.51cto.com/u_15327484/5429193
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Alluxio底层使用到了Grpc作为底层通讯框架,为了弄清楚Alluxio服务端的线程模型,必然需要对Grpc代码有所掌握。本文先介绍Grpc底层HTTP2的知识,通过一个GRPC项目,深入GRPC源码,探索GRPC的线程模型。

2. Grpc简介

GRPC是Google 推出的RPC框架。并且支持多种语言。GRPC的几种模式:
单向RPC

客户端发出单个请求,获得单个响应。客户端调用服务端的某个方法。客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。如下:

rpc SayHello (HelloRequest) returns (HelloReply) {}

服务端流式 RPC

客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。个人认为当有客户端需要主动从服务端读取数据的时候可以用。如下:

rpc RecordRoute(stream Point) returns (RouteSummary) {}

客户端流式 RPC

客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。个人认为应该是客户端需要把数据发送给服务端的时候使用。如下:

rpc ListFeatures(Rectangle) returns (stream Feature) {}

双向流式 RPC

是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。如下:

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

3. HTTP多路复用模型发展历程

当客户端想要发送100次HTTP请求时,默认情况就是发送第一个HTTP请求,收到响应后,才能发送下一个HTTP请求,这个效率非常低。在HTTP协议发展过程中,针对这个特性逐步进行了优化,逐步发展出HTTP多路复用模型。

3.1 HTTP1.0

对于HTTP1.0协议,客户端连续发送100个HTTP请求要经历下面的流程:

  1. 每发送一个HTTP请求,就需要建立一个TCP连接,经历三次握手。
  2. 每个TCP连接都要经历拥塞控制,通过慢启动探测网络的拥塞情况,TCP的滑动窗口才从0上升到最大值(滑动窗口控制TCP数据报的传输的并发度)。
  3. HTTP是无状态协议,必须等前面的HTTP的响应处理,才能发下一个请求。如果HTTP请求乱序发送,无法确定HTTP响应是针对哪个请求的,因此HTTP是无状态协议。

上面HTTP1.0协议中,第1、2条流程可以知道HTTP性能非常差,TCP连接+拥塞控制慢启动,增加了HTTP请求的额外耗时。第3条流程使得HTTP请求无法并发。可以针对耗时和并发这两点来优化HTTP协议。

3.2 HTTP1.1

HTTP1.1针对HTTP的缺点,可以通过在发送http的请求头中设置Connection: keep-alive进行连接复用,将放到同一个socket服务端的请求放到一个TCP连接中,避免多次连接。使用连接复用时,还是要经历请求->响应->请求->响应…的过程。对于多个HTTP请求,它们依然是串行的:

深入浅出掌握grpc通信框架_netty

为了并行发送HTTP请求,即一次性把所有HTTP请求发送出去,最后按照发送顺序接收响应,HTTP1.1提出了pipeline管线化技术。如下图所示,客户端一次性发送所有HTTP请求,服务端依次处理所有请求并依次返回响应:

深入浅出掌握grpc通信框架_grpc_02

pipeline技术有一个致命缺陷,就是线头阻塞(Head-of-line blocking),即服务端一旦处理某个HTTP较慢,后面的HTTP请求均要进行阻塞等待,因此性能不好。由于这个缺陷,HTTP1.1管线化技术并未普及。

3.3 HTTP2

HTTP2对HTTP1.1中的管线化技术进行改进,将所有的HTTP请求分批发送到服务端,每个批次有一个StreamID,即流ID。每一个流中的HTTP请求要依次处理,不用的流中的HTTP请求可以并行处理。这样,一个流中的HTTP请求即使发生阻塞,也不会影响其他流中的HTTP请求:

深入浅出掌握grpc通信框架_grpc_03

如下所示,所有流共享一条TCP连接,不同流HTTP请求并发发送,同一个流中的HTTP请求依次发送:

深入浅出掌握grpc通信框架_grpc_04

HTTP2优点如下:

  1. 一个客户端与一个服务端端口连接时,即使发送多个请求,只会使用一个TCP连接。
  2. Request通过streamId并行发送请求/响应,实现并发HTTP发送的效果。

4.项目代码

项目定义一个proto文件,编译成为Java类和Grpc代码。通过grpc代码发送HTTP请求。本文通过debug这个项目,逐步深入了解GRPC通信框架。

4.1 maven依赖

 <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <grpc.version>1.6.1</grpc.version>
    <protobuf.version>3.3.0</protobuf.version>
 </properties>

 <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.52.Final</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.9.0</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <protoSourceRoot>${project.basedir}/src/main/java/proto</protoSourceRoot>
                    <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
                    <clearOutputDirectory>false</clearOutputDirectory>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

4.2 proto定义

syntax = "proto3";
option java_outer_classname = "Hello";
package protobuf;
option java_generic_services = true;
service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
  rpc SayHi (stream HelloRequest) returns (HelloResponse);
  rpc SayGood (HelloRequest) returns (stream HelloResponse);
  rpc SayBad (stream HelloRequest) returns (stream HelloResponse);
  rpc SayOK (stream HelloRequest) returns (stream HelloResponse);
}
message HelloRequest {
  string greeting = 1;
}
message HelloResponse {
  string reply = 1;
}

生成两个类:

1.Hello类就是序列化类,包含请求类HelloRequest和响应类HelloResponse。
2.HelloServiceGrpc则是代理类,客户端调用这个类远程调用服务端的方法。

深入浅出掌握grpc通信框架_netty_05

序列化类如下所示:

深入浅出掌握grpc通信框架_grpc_06

代理类如下所示:

  1. HelloServiceBlockingStub表示阻塞式调用,收到响应前一直阻塞。
  2. HelloServiceStub表示异步调用,传入回调方法,请求完后不阻塞等待响应,异步线程收到响应时,直接调用回调方法处理。
深入浅出掌握grpc通信框架_netty_07

4.3 服务端代码

服务端启动方法监听19999端口,并指定StreamHelloServiceImpl类为服务端处理请求的逻辑:

public class StreamServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        io.grpc.Server server = ServerBuilder.forPort(19999).addService(new StreamHelloServiceImpl()).build().start();
        System.out.println("start server");
        server.awaitTermination();
    }
}

服务端处理逻辑分为两个方法:

  1. 当客户端远程调用sayHello方法时,服务端调用sayHello处理客户端请求。当客户端发出一次请求时,服务端响应三次。
  2. 当客户端远程调用sayOK方法时,服务端调用sayOK处理客户端请求。当客户端发出一次请求时,服务端响应三次。
public class StreamHelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
    @Override
    public StreamObserver<Hello.HelloRequest> sayBad(StreamObserver<Hello.HelloResponse> responseObserver) {
        return new StreamObserver<Hello.HelloRequest>() {
            @Override
            public void onNext(Hello.HelloRequest value) {
                System.out.println("receive : " + value.getGreeting());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad1: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad2: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad3: " + value.getGreeting()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("error");
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public StreamObserver<Hello.HelloRequest> sayOK(StreamObserver<Hello.HelloResponse> responseObserver) {
        return new StreamObserver<Hello.HelloRequest>() {
            @Override
            public void onNext(Hello.HelloRequest value) {
                System.out.println("receive : " + value.getGreeting());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok1: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok2: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok3: " + value.getGreeting()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("error");
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
                responseObserver.onCompleted();
            }
        };
    }
}

本文中只讨论双向流式调用,其他的调用方式同理。

4.4 客户端代码

客户端指定一个回调对象streamObserver,用于打印服务端的响应。在main方法中向服务端发出sayBad和sayOK两种rpc请求,每种请求发送两条消息:

public class StreamClient {

    static StreamObserver<Hello.HelloResponse> streamObserver = new StreamObserver<Hello.HelloResponse>(){


        @Override
        public void onNext(Hello.HelloResponse value) {
           System.out.println(value.getReply());
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(t.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("completed");
        }
    };

    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 19999).usePlaintext(true).build();
        HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(channel);
        StreamObserver<Hello.HelloRequest> helloRequestStreamObserver = helloServiceStub.sayBad(streamObserver);
        helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm sad").build());
        helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm happy").build());
        StreamObserver<Hello.HelloRequest> helloOKStreamObserver = helloServiceStub.sayOK(streamObserver);
        helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm sad").build());
        helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm happy").build());
        Thread.sleep(1000);
        //channel.shutdown();
    }
}

4.4 运行结果

服务端收到客户端请求:

深入浅出掌握grpc通信框架_grpc_08

客户端收到服务端响应:

深入浅出掌握grpc通信框架_grpc_09

5. 客户端执行流程

5.1 创建ManagedChannelImpl对象

创建ManagedChannel实现类:

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9999).usePlaintext(true).build();

调用forAddress方法,输入目的端的ip地址和端口:

public static ManagedChannelBuilder<?> forAddress(String name, int port) {
  return ManagedChannelProvider.provider().builderForAddress(name, port);
}

ManagedChannelProvider.provider()方法负责找到提供ManagedChannel实现的类:

public static ManagedChannelProvider provider() {
  if (provider == null) {
    throw new ProviderNotFoundException("No functional channel service provider found. "
        + "Try adding a dependency on the grpc-okhttp or grpc-netty artifact");
  }
  return provider;
}

如下,默认通过SPI加载ManagedChannelProvider实现类:

private static final ManagedChannelProvider provider
    = load(ManagedChannelProvider.class.getClassLoader());

load方法最终通过SPI加载实现类:

public static Iterable<ManagedChannelProvider> getCandidatesViaServiceLoader(
    ClassLoader classLoader) {
  return ServiceLoader.load(ManagedChannelProvider.class, classLoader);
}

发现grpc-netty包实现ManagedChannelProvider接口:

深入浅出掌握grpc通信框架_grpc_10

其实现类是:io.grpc.netty.NettyChannelProvider
后续通过NettyChannelProvider对象创建NettyChannelBuilder对象,NettyChannelBuilder负责构建NettyChannel。如下记录目的地址和端口:

public NettyChannelBuilder builderForAddress(String name, int port) {
  return NettyChannelBuilder.forAddress(name, port);
}

通过debug发现,builderForAddress方法给NettyChannelBuilder的祖父类初始化字符串成员变量:
深入浅出掌握grpc通信框架_netty_11
设置使用明文传输,数据不需要加密:

public NettyChannelBuilder usePlaintext(boolean skipNegotiation) {
  if (skipNegotiation) {
    negotiationType(NegotiationType.PLAINTEXT);
  } else {
    negotiationType(NegotiationType.PLAINTEXT_UPGRADE);
  }
  return this;
}

通过debug发现,usePlaintext方法最终给NettyChannelBuilder初始化enum类型测成员变量:
深入浅出掌握grpc通信框架_netty_12
最终通过NettyChannelBuilder#build()方法生成ManagedChannelImpl对象,NettyChannelBuilder作为其形式参数,它记录了地址和port等信息:

public ManagedChannel build() {
  return new ManagedChannelImpl(
      this,
      buildTransportFactory(),
      // TODO(carl-mastrangelo): Allow clients to pass this in
      new ExponentialBackoffPolicy.Provider(),
      SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
      GrpcUtil.STOPWATCH_SUPPLIER,
      getEffectiveInterceptors());
}

NettyChannelBuilder返回了一个ManagedChannelImpl对象,其构造参数较多,如下所示:

ManagedChannelImpl(
    AbstractManagedChannelImplBuilder<?> builder,
    ClientTransportFactory clientTransportFactory,
    BackoffPolicy.Provider backoffPolicyProvider,
    ObjectPool<? extends Executor> oobExecutorPool,
    Supplier<Stopwatch> stopwatchSupplier,
    List<ClientInterceptor> interceptors) {
  this.target = checkNotNull(builder.target, "target");
  this.nameResolverFactory = builder.getNameResolverFactory();
  this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
  this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
  this.loadBalancerFactory =
      checkNotNull(builder.loadBalancerFactory, "loadBalancerFactory");
  this.executorPool = checkNotNull(builder.executorPool, "executorPool");
  this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
  this.executor = checkNotNull(executorPool.getObject(), "executor");
  this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
  this.delayedTransport.start(delayedTransportListener);
  this.backoffPolicyProvider = backoffPolicyProvider;
  this.transportFactory =
      new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
  this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
  this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
  if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
    this.idleTimeoutMillis = builder.idleTimeoutMillis;
  } else {
    checkArgument(
        builder.idleTimeoutMillis
            >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
        "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
    this.idleTimeoutMillis = builder.idleTimeoutMillis;
  }
  this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
  this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
  this.userAgent = builder.userAgent;

  log.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
}

NettyChannelBuilder传递给ManagedChannelImpl的几个参数非常关键,下面展开看下传递的几个参数。

5.1.1 第一个参数:AbstractManagedChannelImplBuilder

第一个参数就是AbstractManagedChannelImplBuilder类型,它是抽象类,具体的实现就是NettyChannelBuilder,通过初始化,该对象包含服务端的ip+端口,同时通过NegotiationType.PLAINTEXT指定了明文传输。

5.1.2 第二个参数:ClientTransportFactory

grpc-netty包通过调用NettyChannelBuilder#buildTransportFactory方法,创建了NettyTransportFactory对象,NettyTransportFactory类是ClientTransportFactory的实现类。NettyTransportFactory 它负责创建NettyClientTransport,表示通过Netty作为数据传输框架:

protected ClientTransportFactory buildTransportFactory() {
  return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
      negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
      maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);
}

NettyTransportFactory接收来自NettyChannelBuilder的参数,最终通过NettyTransportFactory#newClientTransport创建NettyClientTransport成员。一个NettyClientTransport对象管理当前客户端与服务端的指定端口的服务的连接:

    public ConnectionClientTransport newClientTransport(
        SocketAddress serverAddress, String authority, @Nullable String userAgent) {
      checkState(!closed, "The transport factory is closed.");

      TransportCreationParamsFilter dparams =
          transportCreationParamsFilterFactory.create(serverAddress, authority, userAgent);

      final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
      Runnable tooManyPingsRunnable = new Runnable() {
        @Override
        public void run() {
          keepAliveTimeNanosState.backoff();
        }
      };
      NettyClientTransport transport = new NettyClientTransport(
          dparams.getTargetServerAddress(), channelType, channelOptions, group,
          dparams.getProtocolNegotiator(), flowControlWindow,
          maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
          keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
          tooManyPingsRunnable);
      return transport;
    }

NettyClientTransport维护netty的成员变量:

class NettyClientTransport implements ConnectionClientTransport {
  private final LogId logId = LogId.allocate(getClass().getName());
  private final Map<ChannelOption<?>, ?> channelOptions;
  //rpc服务端的socket地址
  private final SocketAddress address;
  //NioSocketChannel类型
  private final Class<? extends Channel> channelType;
  //Netty的NioEventLoopGroup
  private final EventLoopGroup group;
  private final ProtocolNegotiator negotiator;
  private final AsciiString authority;
  private final AsciiString userAgent;
  private final int flowControlWindow;
  private final int maxMessageSize;
  private final int maxHeaderListSize;
  private KeepAliveManager keepAliveManager;
  private final long keepAliveTimeNanos;
  private final long keepAliveTimeoutNanos;
  private final boolean keepAliveWithoutCalls;
  private final Runnable tooManyPingsRunnable;

  private ProtocolNegotiator.Handler negotiationHandler;
  //客户端的Handler
  private NettyClientHandler handler;
  // We should not send on the channel until negotiation completes. This is a hard requirement
  // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
  //NioSocketChannel对象
  private Channel channel;
  /** If {@link #start} has been called, non-{@code null} if channel is {@code null}. */
  private Status statusExplainingWhyTheChannelIsNull;
}

NettyClientTransport有两个重要方法:start和newStream方法:
start方法负责通过netty连接服务端:

 public Runnable start(Listener transportListener) {
    lifecycleManager = new ClientTransportLifecycleManager(
        Preconditions.checkNotNull(transportListener, "listener"));
    EventLoop eventLoop = group.next();
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
      keepAliveManager = new KeepAliveManager(
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
          keepAliveWithoutCalls);
    }

    handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
        maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
    NettyHandlerSettings.setAutoWindow(handler);

    negotiationHandler = negotiator.newHandler(handler);

    Bootstrap b = new Bootstrap();
    b.group(eventLoop);
    b.channel(channelType);
    if (NioSocketChannel.class.isAssignableFrom(channelType)) {
      b.option(SO_KEEPALIVE, true);
    }
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
      // Every entry in the map is obtained from
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
      // so it is safe to pass the key-value pair to b.option().
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
    }

    /**
     * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
     * that it may begin buffering writes.
     */
    b.handler(negotiationHandler);
    ChannelFuture regFuture = b.register();
    channel = regFuture.channel();
    if (channel == null) {
      // Initialization has failed badly. All new streams should be made to fail.
      Throwable t = regFuture.cause();
      if (t == null) {
        t = new IllegalStateException("Channel is null, but future doesn't have a cause");
      }
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
      // Use a Runnable since lifecycleManager calls transportListener
      return new Runnable() {
        @Override
        public void run() {
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
          // a good idea and is probably clearer threading.
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
        }
      };
    }
    // Start the write queue as soon as the channel is constructed
    handler.startWriteQueue(channel);
    // Start the connection operation to the server.
    channel.connect(address);
    // This write will have no effect, yet it will only complete once the negotiationHandler
    // flushes any pending writes.
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
          // Need to notify of this failure, because NettyClientHandler may not have been added to
          // the pipeline before the error occurred.
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
        }
      }
    });
    // Handle transport shutdown when the channel is closed.
    channel.closeFuture().addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        // Typically we should have noticed shutdown before this point.
        lifecycleManager.notifyTerminated(
            Status.INTERNAL.withDescription("Connection closed with unknown cause"));
      }
    });

    if (keepAliveManager != null) {
      keepAliveManager.onTransportStarted();
    }

    return null;
  }

连接过程中,注册了NettyClientHandler,这个是grpc通信的重中之重,它定义了HTTP2数据包发送的流程,也定义了数据包接收的流程。NettyClientHandler同时实现了ChannelInboundHandler和ChannelOutboundHandler。继承图下所示:

深入浅出掌握grpc通信框架_netty_13

NettyClientHandler重写了ChannelOutboundHandler方法,根据不同的msg执行不同的行为:

  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
          throws Exception {
    if (msg instanceof CreateStreamCommand) {
      createStream((CreateStreamCommand) msg, promise);
    } else if (msg instanceof SendGrpcFrameCommand) {
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
    } else if (msg instanceof CancelClientStreamCommand) {
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
    } else if (msg instanceof SendPingCommand) {
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
    } else if (msg instanceof GracefulCloseCommand) {
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
    } else if (msg instanceof ForcefulCloseCommand) {
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
    } else if (msg == NOOP_MESSAGE) {
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
    } else {
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
    }
  }

最重要的,sendGrpcFrame负责向远程发送数据包:

  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
      ChannelPromise promise) {
    // Call the base class to write the HTTP/2 DATA frame.
    // Note: no need to flush since this is handled by the outbound flow controller.
    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
  }

发送数据包流程较为复杂,大致思路是通过DefaultHttp2RemoteFlowController进行流量控制,控制发送的数据大小。由DefaultHttp2FrameWriter类发送数据包,数据包分两种:header帧和payload帧。
writeHeaders方法发送header帧:

    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
            Http2Headers headers, int streamDependency, short weight, boolean exclusive,
            int padding, boolean endStream, ChannelPromise promise) {
        return writeHeadersInternal(ctx, streamId, headers, padding, endStream,
                true, streamDependency, weight, exclusive, promise);
    }

writeHeadersInternal定义Header帧的发送逻辑,简化后的处理过程如下:

   writeFrameHeaderInternal(buf, payloadLength, HEADERS, flags, streamId);
   writePaddingLength(buf, padding);

writeFrameHeaderInternal由header帧和payload帧共用的方法。定义了帧的类型,流ID,长度等等信息。Header帧使用了HEADERS类型:

    static void writeFrameHeaderInternal(ByteBuf out, int payloadLength, byte type,
            Http2Flags flags, int streamId) {
        out.writeMedium(payloadLength);
        out.writeByte(type);
        out.writeByte(flags.value());
        out.writeInt(streamId);
    }

当发送payload帧时,调用writeData方法。调用writeFrameHeaderInternal,使用DATA类型表示发送payload帧:

    writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId);
    ctx.write(frameHeader2, promiseAggregator.newPromise());
    // Write the payload.
    ByteBuf lastFrame = data.readSlice(remainingData);
    data = null;
    ctx.write(lastFrame, promiseAggregator.newPromise());

5.2 创建RPC代理对象并执行

客户端main方法中,创建RPC代理对象:

   HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(channel);
   StreamObserver<Hello.HelloRequest> helloRequestStreamObserver = helloServiceStub.sayBad(streamObserver);

HelloServiceGrpc是通过protoc编译生成的RPC代理类。有以下方法:

深入浅出掌握grpc通信框架_grpc_14

客户端执行的两个方法如下:

    public io.grpc.stub.StreamObserver<protobuf.Hello.HelloRequest> sayBad(
        io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse> responseObserver) {
      return asyncBidiStreamingCall(
          getChannel().newCall(METHOD_SAY_BAD, getCallOptions()), responseObserver);
    }

    public io.grpc.stub.StreamObserver<protobuf.Hello.HelloRequest> sayOK(
        io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse> responseObserver) {
      return asyncBidiStreamingCall(
          getChannel().newCall(METHOD_SAY_OK, getCallOptions()), responseObserver);
    }

它们都执行asyncBidiStreamingCall方法,该方法的第一个参数是通过ManagedChannelImpl创建的ClientCallImpl对象,第二个参数responseObserver就是自定义的回调方法。传入asyncStreamingRequestCall方法中:

  private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
      ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver,
      boolean streamingResponse) {
    CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call);
    startCall(
        call,
        new StreamObserverToCallListenerAdapter<ReqT, RespT>(
            responseObserver, adapter, streamingResponse),
        streamingResponse);
    return adapter;
  }

通过ManagedChannelImpl$RealChannel#newCall创建调用对象,传入要执行的方法,以及在哪个executor线程池中执行:

public final class ManagedChannelImpl extends ManagedChannel implements WithLogId {
  private class RealChannel extends Channel {
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
        CallOptions callOptions) {
      Executor executor = callOptions.getExecutor();
      if (executor == null) {
        executor = ManagedChannelImpl.this.executor;
      }
      return new ClientCallImpl<ReqT, RespT>(
          method,
          executor,
          callOptions,
          transportProvider,
          terminated ? null : transportFactory.getScheduledExecutorService())
              .setDecompressorRegistry(decompressorRegistry)
              .setCompressorRegistry(compressorRegistry);
    }
}

注意executor通过builder.executorPool创建:

    //线程池
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
    //executorPool.getObject()表示创建一个线程
    this.executor = checkNotNull(executorPool.getObject(), "executor");

而executorPool是AbstractManagedChannelImplBuilder类定义的DEFAULT_EXECUTOR_POOL:

public abstract class AbstractManagedChannelImplBuilder
        <T extends AbstractManagedChannelImplBuilder<T>> extends ManagedChannelBuilder<T> {
  private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
      SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
}

executor的线程池定义线程名是grpc-default-executor:

public final class GrpcUtil {
  public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
      new Resource<ExecutorService>() {
        private static final String NAME = "grpc-default-executor";
        @Override
        public ExecutorService create() {
          return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
        }

        @Override
        public void close(ExecutorService instance) {
          instance.shutdown();
        }

        @Override
        public String toString() {
          return NAME;
        }
      };
}

创建好调用对象ClientCallImpl对象后,中间会执行start方法,responseObserver此时已经封装称为ClientCall.Listener了:

public final class ClientCalls {
  private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
      ClientCall.Listener<RespT> responseListener, boolean streamingResponse) {
    call.start(responseListener, new Metadata());
    if (streamingResponse) {
      call.request(1);
    } else {
      // Initially ask for two responses from flow-control so that if a misbehaving server sends
      // more than one responses, we can catch it and fail it in the listener.
      call.request(2);
    }
  }
}

不管是调用那个rpc方法,都会执行ClientCallImpl#start方法,它创建并启动客户端的流DelayedStream:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
  public void start(final Listener<RespT> observer, Metadata headers) {
    checkState(stream == null, "Already started");
    checkNotNull(observer, "observer");
    checkNotNull(headers, "headers");
    
    final String compressorName = callOptions.getCompressor();
    Compressor compressor = null;
    //
    prepareHeaders(headers, decompressorRegistry, compressor);

    Deadline effectiveDeadline = effectiveDeadline();
    boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
    if (!deadlineExceeded) {
      updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
          context.getDeadline(), headers);
      ClientTransport transport = clientTransportProvider.get(
          new PickSubchannelArgsImpl(method, headers, callOptions));
      Context origContext = context.attach();
      try {
        //创建客户端流
        stream = transport.newStream(method, headers, callOptions);
      } finally {
        context.detach(origContext);
      }
    } else {
      stream = new FailingClientStream(DEADLINE_EXCEEDED);
    }

    if (callOptions.getAuthority() != null) {
      stream.setAuthority(callOptions.getAuthority());
    }
    if (callOptions.getMaxInboundMessageSize() != null) {
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
    }
    if (callOptions.getMaxOutboundMessageSize() != null) {
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
    }
    stream.setCompressor(compressor);
    stream.setDecompressorRegistry(decompressorRegistry);
    //启动客户端流
    stream.start(new ClientStreamListenerImpl(observer));

    // Delay any sources of cancellation after start(), because most of the transports are broken if
    // they receive cancel before start. Issue #1343 has more details

    // Propagate later Context cancellation to the remote side.
    context.addListener(cancellationListener, directExecutor());
    if (effectiveDeadline != null
        // If the context has the effective deadline, we don't need to schedule an extra task.
        && context.getDeadline() != effectiveDeadline
        // If the channel has been terminated, we don't need to schedule an extra task.
        && deadlineCancellationExecutor != null) {
      deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
    }
    if (cancelListenersShouldBeRemoved) {
      // Race detected! ClientStreamListener.closed may have been called before
      // deadlineCancellationFuture was set / context listener added, thereby preventing the future
      // and listener from being cancelled. Go ahead and cancel again, just to be sure it
      // was cancelled.
      removeContextListenerAndCancelDeadlineFuture();
    }
  }

启动DelayedStream时,调用父类AbstractClientStream#start方法,注意,该方法将streamObserver封装成的Listener设置成为AbstractClientStream的成员变量,共message读取时调用:

  public void start(ClientStreamListener listener) {
    checkState(this.listener == null, "already started");

    Status savedError;
    boolean savedPassThrough;
    synchronized (this) {
      this.listener = checkNotNull(listener, "listener");
      // If error != null, then cancel() has been called and was unable to close the listener
      savedError = error;
      savedPassThrough = passThrough;
      if (!savedPassThrough) {
        listener = delayedListener = new DelayedStreamListener(listener);
      }
    }
    if (savedError != null) {
      listener.closed(savedError, new Metadata());
      return;
    }

    if (savedPassThrough) {
      realStream.start(listener);
    } else {
      final ClientStreamListener finalListener = listener;
      delayOrExecute(new Runnable() {
        @Override
        public void run() {
          realStream.start(finalListener);
        }
      });
    }
  }

客户端接收到服务器响应时,最终streamObserver中定义的处理逻辑会在异步线程中执行。如下所示,EventLoop线程此时正在执行NettyChannelHandler,NettyChannelHandler的祖父类ByteToMessageDecoder定义了消息接收方法,执行channelRead方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                //将消息解码
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                //省略
            }
        }
    }
}

callDecode过程中,会调用DefaultHttp2FrameReader#processPayloadState解析服务端返回的数据包:

public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration {
    private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
                    throws Http2Exception {
        if (in.readableBytes() < payloadLength) {
            // Wait until the entire payload has been read.
            return;
        }

        // Only process up to payloadLength bytes.
        int payloadEndIndex = in.readerIndex() + payloadLength;

        // We have consumed the data, next time we read we will be expecting to read a frame header.
        readingHeaders = true;

        // Read the payload and fire the frame event to the listener.
        switch (frameType) {
            case DATA:
                readDataFrame(ctx, in, payloadEndIndex, listener);
                break;
            case HEADERS:
                readHeadersFrame(ctx, in, payloadEndIndex, listener);
                break;
            case PRIORITY:
                readPriorityFrame(ctx, in, listener);
                break;
            case RST_STREAM:
                readRstStreamFrame(ctx, in, listener);
                break;
            case SETTINGS:
                readSettingsFrame(ctx, in, listener);
                break;
            case PUSH_PROMISE:
                readPushPromiseFrame(ctx, in, payloadEndIndex, listener);
                break;
            case PING:
                readPingFrame(ctx, in.readLong(), listener);
                break;
            case GO_AWAY:
                readGoAwayFrame(ctx, in, payloadEndIndex, listener);
                break;
            case WINDOW_UPDATE:
                readWindowUpdateFrame(ctx, in, listener);
                break;
            case CONTINUATION:
                readContinuationFrame(in, payloadEndIndex, listener);
                break;
            default:
                readUnknownFrame(ctx, in, payloadEndIndex, listener);
                break;
        }
        in.readerIndex(payloadEndIndex);
    }
}

根据返回的消息类型决定如何处理,比如处理HEADER帧,中间执行AbstractClientStream$TransportState#inboundHeadersReceived

public abstract class AbstractClientStream extends AbstractStream
    implements ClientStream, MessageFramer.Sink {
  protected abstract static class TransportState extends AbstractStream.TransportState {
    protected void inboundHeadersReceived(Metadata headers) {
      Preconditions.checkState(!statusReported, "Received headers on closed stream");
      statsTraceCtx.clientInboundHeaders();

      Decompressor decompressor = Codec.Identity.NONE;
      String encoding = headers.get(MESSAGE_ENCODING_KEY);
      if (encoding != null) {
        decompressor = decompressorRegistry.lookupDecompressor(encoding);
        if (decompressor == null) {
          deframeFailed(Status.INTERNAL.withDescription(
              String.format("Can't find decompressor for %s", encoding)).asRuntimeException());
          return;
        }
      }
      setDecompressor(decompressor);

      listener().headersRead(headers);
    }
  }
}

最终调用ClientCallImpl#headersRead方法:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
    public void headersRead(final Metadata headers) {
      class HeadersRead extends ContextRunnable {
        HeadersRead() {
          super(context);
        }

        @Override
        public final void runInContext() {
          try {
            if (closed) {
              return;
            }
            observer.onHeaders(headers);
          } catch (Throwable t) {
            Status status =
                Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
            stream.cancel(status);
            close(status, new Metadata());
          }
        }
      }

      callExecutor.execute(new HeadersRead());
    }
}

在执行observer.onHeaders(headers)时,底层调用的是StreamObserverToCallListenerAdapter#onHeaders方法,为空实现:

public final class ClientCalls {
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
      extends ClientCall.Listener<RespT> {
    public void onHeaders(Metadata headers) {
    }
  }
}

当读取DATA帧时,调用readDataFrame方法,中间执行AbstractClientStream$TransportState#inboundDataReceived方法:

public abstract class AbstractClientStream extends AbstractStream
    implements ClientStream, MessageFramer.Sink {
  protected abstract static class TransportState extends AbstractStream.TransportState {
    protected void inboundDataReceived(ReadableBuffer frame) {
      Preconditions.checkNotNull(frame, "frame");
      boolean needToCloseFrame = true;
      try {
        if (statusReported) {
          log.log(Level.INFO, "Received data on closed stream");
          return;
        }

        needToCloseFrame = false;
        //解析接受到的DATA帧
        deframe(frame);
      } finally {
        if (needToCloseFrame) {
          frame.close();
        }
      }
    }
  }
}

DATA帧中的处理逻辑稍显复杂,会通过MessageDeframer#deliver进行DATA数据投递:

public class MessageDeframer implements Closeable, Deframer {
  private void deliver() {
    // We can have reentrancy here when using a direct executor, triggered by calls to
    // request more messages. This is safe as we simply loop until pendingDelivers = 0
    if (inDelivery) {
      return;
    }
    inDelivery = true;
    try {
      // Process the uncompressed bytes.
      while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) {
        switch (state) {
          //处理DATA帧中的HTTP消息头
          case HEADER:
            processHeader();
            break;
          //处理DATA帧中的HTTP消息体
          case BODY:
            // Read the body and deliver the message.
            processBody();
            // Since we've delivered a message, decrement the number of pending
            // deliveries remaining.
            pendingDeliveries--;
            break;
          default:
            throw new AssertionError("Invalid state: " + state);
        }
      }
    //省略
  }
}

处理到消息体时,调用MessageDeframer#processBody方法:

public class MessageDeframer implements Closeable, Deframer {
  private void processBody() {
    InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
    nextFrame = null;
    listener.messagesAvailable(new SingleMessageProducer(stream));

    // Done with this frame, begin processing the next header.
    state = State.HEADER;
    requiredLength = HEADER_LENGTH;
  }
}

后面就调用AbstractClientStream$TransportState#messagesAvailable方法:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
  private class ClientStreamListenerImpl implements ClientStreamListener {
    public void messagesAvailable(final MessageProducer producer) {
      class MessagesAvailable extends ContextRunnable {
        MessagesAvailable() {
          super(context);
        }

        @Override
        public final void runInContext() {
          if (closed) {
            GrpcUtil.closeQuietly(producer);
            return;
          }

          InputStream message;
          try {
            while ((message = producer.next()) != null) {
              try {
                //调用DATA的处理逻辑
                observer.onMessage(method.parseResponse(message));
              } catch (Throwable t) {
                GrpcUtil.closeQuietly(message);
                throw t;
              }
              message.close();
            }
          } catch (Throwable t) {
            GrpcUtil.closeQuietly(producer);
            Status status =
                Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
            stream.cancel(status);
            close(status, new Metadata());
          }
        }
      }
      //将Messages的处理逻辑封装成一个线程,放到线程池中执行
      callExecutor.execute(new MessagesAvailable());
    }
  }
}

callExecutor就是将ClientCallImpl初始化的executor,即传SHARED_CHANNEL_EXECUTOR线程池进行包装到SerializingExecutor类中:

    this.callExecutor = executor == directExecutor()
        ? new SerializeReentrantCallsDirectExecutor()
        : new SerializingExecutor(executor);

SerializingExecutor类依次通过异步线程池执行队列中的Runnable逻辑,保证了执行的前后顺序:

public SerializingExecutor(Executor executor) {
    Preconditions.checkNotNull(executor, "'executor' must not be null.");
    this.executor = executor;
  }

  /**
   * Runs the given runnable strictly after all Runnables that were submitted
   * before it, and using the {@code executor} passed to the constructor.     .
   */
  @Override
  public void execute(Runnable r) {
    runQueue.add(checkNotNull(r, "'r' must not be null."));
    schedule(r);
  }

  private void schedule(@Nullable Runnable removable) {
    if (running.compareAndSet(false, true)) {
      boolean success = false;
      try {
        executor.execute(this);
        success = true;
      } finally {
        // It is possible that at this point that there are still tasks in
        // the queue, it would be nice to keep trying but the error may not
        // be recoverable.  So we update our state and propagate so that if
        // our caller deems it recoverable we won't be stuck.
        if (!success) {
          if (removable != null) {
            // This case can only be reached if 'this' was not currently running, and we failed to
            // reschedule.  The item should still be in the queue for removal.
            // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
            // throw if the item to remove is null.  If removable is present in the queue twice,
            // the wrong one may be removed.  It doesn't seem possible for this case to exist today.
            // This is important to run in case of RejectedExectuionException, so that future calls
            // to execute don't succeed and accidentally run a previous runnable.
            runQueue.remove(removable);
          }
          running.set(false);
        }
      }
    }
  }

在异步线程中执行DATA处理逻辑,observer.onMessage(method.parseResponse(message))其实就是调用ClientCalls$StreamObserverToCallListenerAdapter#onMessage方法,它执行observer.onNext(message),即执行用户自定义的处理逻辑:

public final class ClientCalls {
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
      extends ClientCall.Listener<RespT> {
    public void onMessage(RespT message) {
      if (firstResponseReceived && !streamingResponse) {
        throw Status.INTERNAL
            .withDescription("More than one responses received for unary or client-streaming call")
            .asRuntimeException();
      }
      firstResponseReceived = true;
      observer.onNext(message);

      if (streamingResponse && adapter.autoFlowControlEnabled) {
        // Request delivery of the next inbound message.
        adapter.request(1);
      }
    }
  }
}

上述执行的onNext方法就是我在客户端启动类中定义的StreamObserver匿名类:

public class StreamClient {
   static StreamObserver<Hello.HelloResponse> streamObserver = new StreamObserver<Hello.HelloResponse>(){


        @Override
        public void onNext(Hello.HelloResponse value) {
           System.out.println(value.getReply());
        }

        @Override
        public void onError(Throwable t) {
            System.out.println(t.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("completed");
        }
    };
}

将DATA帧中的数据放到异步线程中执行,应该是担心客户端处理服务端响应的数据包时间过久,导致netty的EventLoop线程阻塞,这就无法处理其他socket数据了。

5.3 Netty客户端连接服务端

通过5.2节可以知道,在调用sayBad这个rpc方法时,中间会调用ClientCallImpl#start方法,如下所示:

  public void start(final Listener<RespT> observer, Metadata headers) {
    //省略
    prepareHeaders(headers, decompressorRegistry, compressor);

    Deadline effectiveDeadline = effectiveDeadline();
    boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
    if (!deadlineExceeded) {
      updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
          context.getDeadline(), headers);
      //创建DelayedClientTransport,实际最终还是创建NettyClientTransport对象
      ClientTransport transport = clientTransportProvider.get(
          new PickSubchannelArgsImpl(method, headers, callOptions));
      Context origContext = context.attach();
      try {
        //创建DelayedStream对象
        stream = transport.newStream(method, headers, callOptions);
      } finally {
        context.detach(origContext);
      }
    } else {
      stream = new FailingClientStream(DEADLINE_EXCEEDED);
    }

    if (callOptions.getAuthority() != null) {
      stream.setAuthority(callOptions.getAuthority());
    }
    if (callOptions.getMaxInboundMessageSize() != null) {
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
    }
    if (callOptions.getMaxOutboundMessageSize() != null) {
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
    }
    stream.setCompressor(compressor);
    stream.setDecompressorRegistry(decompressorRegistry);
    stream.start(new ClientStreamListenerImpl(observer));

    // Delay any sources of cancellation after start(), because most of the transports are broken if
    // they receive cancel before start. Issue #1343 has more details

    // Propagate later Context cancellation to the remote side.
    context.addListener(cancellationListener, directExecutor());
  }

通过调用自定义的get方法获取DelayedClientTransport对象:

public final class ManagedChannelImpl extends ManagedChannel implements WithLogId {
    private final ClientCallImpl.ClientTransportProvider transportProvider = new ClientCallImpl.ClientTransportProvider() {
        public ClientTransport get(LoadBalancer.PickSubchannelArgs args) {
            LoadBalancer.SubchannelPicker pickerCopy = ManagedChannelImpl.this.subchannelPicker;
            if (ManagedChannelImpl.this.shutdown.get()) {
                return ManagedChannelImpl.this.delayedTransport;
            } else if (pickerCopy == null) {
                ManagedChannelImpl.this.channelExecutor.executeLater(new Runnable() {
                    public void run() {
                        //退出空闲模式
                        ManagedChannelImpl.this.exitIdleMode();
                    }
                }).drain();
                return ManagedChannelImpl.this.delayedTransport;
            } else {
                LoadBalancer.PickResult pickResult = pickerCopy.pickSubchannel(args);
                ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, args.getCallOptions().isWaitForReady());
                return (ClientTransport)(transport != null ? transport : ManagedChannelImpl.this.delayedTransport);
            }
        }
    };
}

在返回DelayedClientTransport前,调用ManagedChannelImpl.this.exitIdleMode()退出空闲模式:

    void exitIdleMode() {
        if (!this.shutdown.get()) {
            if (this.inUseStateAggregator.isInUse()) {
                this.cancelIdleTimer();
            } else {
                this.rescheduleIdleTimer();
            }

            if (this.lbHelper == null) {
                log.log(Level.FINE, "[{0}] Exiting idle mode", this.getLogId());
                this.lbHelper = new LbHelperImpl(this.nameResolver);
                this.lbHelper.lb = this.loadBalancerFactory.newLoadBalancer(this.lbHelper);
                NameResolverListenerImpl listener = new NameResolverListenerImpl(this.lbHelper);

                try {
                    //启动nameResolver
                    this.nameResolver.start(listener);
                } catch (Throwable var3) {
                    listener.onError(Status.fromThrowable(var3));
                }

            }
        }
    }

nameResolver实现是DnsNameResolver,它负责将域名解析称为等效的ip:

  public final synchronized void start(Listener listener) {
    Preconditions.checkState(this.listener == null, "already started");
    timerService = SharedResourceHolder.get(timerServiceResource);
    //线程池,之前定义的GrpcUtil.SHARED_CHANNEL_EXECUTOR
    executor = SharedResourceHolder.get(executorResource);
    this.listener = Preconditions.checkNotNull(listener, "listener");
    //解析域名
    resolve();
  }


  private void resolve() {
    if (resolving || shutdown) {
      return;
    }
    executor.execute(resolutionRunnable);
  }

在异步线程中执行resolutionRunnable线程:

  private final Runnable resolutionRunnable = new Runnable() {
      @Override
      public void run() {
        Listener savedListener;
        synchronized (DnsNameResolver.this) {
          // If this task is started by refresh(), there might already be a scheduled task.
          if (resolutionTask != null) {
            resolutionTask.cancel(false);
            resolutionTask = null;
          }
          if (shutdown) {
            return;
          }
          savedListener = listener;
          resolving = true;
        }
        try {
          if (System.getenv("GRPC_PROXY_EXP") != null) {
            EquivalentAddressGroup server =
                new EquivalentAddressGroup(InetSocketAddress.createUnresolved(host, port));
            savedListener.onAddresses(Collections.singletonList(server), Attributes.EMPTY);
            return;
          }
          ResolutionResults resolvedInetAddrs;
          try {
            resolvedInetAddrs = delegateResolver.resolve(host);
          } catch (Exception e) {
            synchronized (DnsNameResolver.this) {
              if (shutdown) {
                return;
              }
              // Because timerService is the single-threaded GrpcUtil.TIMER_SERVICE in production,
              // we need to delegate the blocking work to the executor
              resolutionTask =
                  timerService.schedule(new LogExceptionRunnable(resolutionRunnableOnExecutor),
                      1, TimeUnit.MINUTES);
            }
            savedListener.onError(Status.UNAVAILABLE.withCause(e));
            return;
          }
          // Each address forms an EAG
          ArrayList<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
          for (InetAddress inetAddr : resolvedInetAddrs.addresses) {
            servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
          }
          //执行Listener#onAddresses,处理这个地址相应连接
          savedListener.onAddresses(servers, Attributes.EMPTY);
        } finally {
          synchronized (DnsNameResolver.this) {
            resolving = false;
          }
        }
      }
    };

然后执行到ManagedChannelImpl#onAddresses:

    public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
      if (servers.isEmpty()) {
        onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
        return;
      }
      log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}",
          new Object[] {getLogId(), servers, config});
      helper.runSerialized(new Runnable() {
          @Override
          public void run() {
            if (terminated) {
              return;
            }
            try {
              //通过balance处理刚刚解析好的地址
              balancer.handleResolvedAddressGroups(servers, config);
            } catch (Throwable e) {
              log.log(
                  Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
              // It must be a bug! Push the exception back to LoadBalancer in the hope that it may
              // be propagated to the application.
              balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
                  .withDescription("Thrown from handleResolvedAddresses(): " + e));
            }
          }
        });
    }

调用PickFirstBalancerFactory$PickFirstBalancer#handleResolvedAddressGroups方法处理连接:

public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
  static final class PickFirstBalancer extends LoadBalancer {
    public void handleResolvedAddressGroups(
        List<EquivalentAddressGroup> servers, Attributes attributes) {
      // Flatten servers list received from name resolver into single address group. This means that
      // as far as load balancer is concerned, there's virtually one single server with multiple
      // addresses so the connection will be created only for the first address (pick first).
      EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers);
      //初始情况下,没有连接,需要创建连接
      if (subchannel == null) {
        subchannel = helper.createSubchannel(newEag, Attributes.EMPTY);

        // The channel state does not get updated when doing name resolving today, so for the moment
        // let LB report CONNECTION and call subchannel.requestConnection() immediately.
        helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
        //创建连接
        subchannel.requestConnection();
      } else {
        helper.updateSubchannelAddresses(subchannel, newEag);
      }
    }
  }
}

ManagedChannelImpl$LbHelperImpl#updateBalancingState --> DelayedClientTransport#reprocess --> DelayedClientTransport$PendingStream#createRealStream创建NettyClientStream对象:

    private void createRealStream(ClientTransport transport) {
      ClientStream realStream;
      Context origContext = context.attach();
      try {
        realStream = transport.newStream(
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
      } finally {
        context.detach(origContext);
      }
      setStream(realStream);
    }

setStream则将realStream设置为成员变量:

  final void setStream(ClientStream stream) {
    synchronized (this) {
      // If realStream != null, then either setStream() or cancel() has been called.
      if (realStream != null) {
        return;
      }
      realStream = checkNotNull(stream, "stream");
    }

    drainPendingCalls();
  }

创建连接经历ManagedChannelImpl#requestConnection->InternalSubchannel#obtainActiveTransport->InternalSubchannel#startNewTransport:

  private void startNewTransport() {
    Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");

    if (addressIndex == 0) {
      connectingTimer.reset().start();
    }
    List<SocketAddress> addrs = addressGroup.getAddresses();
    final SocketAddress address = addrs.get(addressIndex);
    //创建NettyClientTransport对象
    ConnectionClientTransport transport =
        transportFactory.newClientTransport(address, authority, userAgent);
    if (log.isLoggable(Level.FINE)) {
      log.log(Level.FINE, "[{0}] Created {1} for {2}",
          new Object[] {logId, transport.getLogId(), address});
    }
    pendingTransport = transport;
    transports.add(transport);
    //执行NettyClientTransport#start方法
    Runnable runnable = transport.start(new TransportListener(transport, address));
    if (runnable != null) {
      channelExecutor.executeLater(runnable);
    }
  }

进入NettyClientTransport#start方法,可以发现,在这里初始化Netty IO连接:

  public Runnable start(Listener transportListener) {
    lifecycleManager = new ClientTransportLifecycleManager(
        Preconditions.checkNotNull(transportListener, "listener"));
    EventLoop eventLoop = group.next();
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
      keepAliveManager = new KeepAliveManager(
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
          keepAliveWithoutCalls);
    }

    handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
        maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
    NettyHandlerSettings.setAutoWindow(handler);

    negotiationHandler = negotiator.newHandler(handler);

    Bootstrap b = new Bootstrap();
    b.group(eventLoop);
    b.channel(channelType);
    if (NioSocketChannel.class.isAssignableFrom(channelType)) {
      b.option(SO_KEEPALIVE, true);
    }
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
      // Every entry in the map is obtained from
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
      // so it is safe to pass the key-value pair to b.option().
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
    }

    /**
     * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
     * that it may begin buffering writes.
     */
    b.handler(negotiationHandler);
    ChannelFuture regFuture = b.register();
    channel = regFuture.channel();
    if (channel == null) {
      // Initialization has failed badly. All new streams should be made to fail.
      Throwable t = regFuture.cause();
      if (t == null) {
        t = new IllegalStateException("Channel is null, but future doesn't have a cause");
      }
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
      // Use a Runnable since lifecycleManager calls transportListener
      return new Runnable() {
        @Override
        public void run() {
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
          // a good idea and is probably clearer threading.
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
        }
      };
    }
    // Start the write queue as soon as the channel is constructed
    handler.startWriteQueue(channel);
    // Start the connection operation to the server.
    channel.connect(address);
    // This write will have no effect, yet it will only complete once the negotiationHandler
    // flushes any pending writes.
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
          // Need to notify of this failure, because NettyClientHandler may not have been added to
          // the pipeline before the error occurred.
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
        }
      }
    });
    // Handle transport shutdown when the channel is closed.
    channel.closeFuture().addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        // Typically we should have noticed shutdown before this point.
        lifecycleManager.notifyTerminated(
            Status.INTERNAL.withDescription("Connection closed with unknown cause"));
      }
    });

    if (keepAliveManager != null) {
      keepAliveManager.onTransportStarted();
    }

    return null;
  }

到目前为止,客户端已经通过Netty与服务端建立了连接。

5.4 客户端发送HTTP2数据请求

如下,客户端发送了两个HTTP2请求:

    helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm sad").build());
    helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: i'm happy").build());

调用ClientCalls$CallToStreamObserverAdapter#onNext方法发送请求:

public final class ClientCalls {
  private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
    public void onNext(T value) {
      call.sendMessage(value);
    }
  }
}

调用ClientCallImpl#sendMessage方法,将HelloRequest对象转化为InputStream,并开始发送数据:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
  public void sendMessage(ReqT message) {
    Preconditions.checkState(stream != null, "Not started");
    Preconditions.checkState(!cancelCalled, "call was cancelled");
    Preconditions.checkState(!halfCloseCalled, "call was half-closed");
    try {
      // TODO(notcarl): Find out if messageIs needs to be closed.
      //将数据封装成InputStream
      InputStream messageIs = method.streamRequest(message);
      //发送数据
      stream.writeMessage(messageIs);
    } catch (Throwable e) {
      stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
      return;
    }
    // For unary requests, we don't flush since we know that halfClose should be coming soon. This
    // allows us to piggy-back the END_STREAM=true on the last message frame without opening the
    // possibility of broken applications forgetting to call halfClose without noticing.
    if (!unaryRequest) {
      stream.flush();
    }
  }
}

调用DelayedStream#writeMessage发送消息:

  public void writeMessage(final InputStream message) {
    checkNotNull(message, "message");
    if (passThrough) {
      realStream.writeMessage(message);
    } else {
      delayOrExecute(new Runnable() {
        @Override
        public void run() {
          realStream.writeMessage(message);
        }
      });
    }
  }

注意,这里其实并没有直接发送消息,而是执行delayOrExecute方法,将所有请求放入pendingCalls队列中,后续依次发送:

  private void delayOrExecute(Runnable runnable) {
    synchronized (this) {
      if (!passThrough) {
        pendingCalls.add(runnable);
        return;
      }
    }
    runnable.run();
  }

通过5.3节可以知道,创建了realStream后,即NettyClientStream后,会调用drainPendingCalls调用执行所有请求:

  final void setStream(ClientStream stream) {
    synchronized (this) {
      // If realStream != null, then either setStream() or cancel() has been called.
      if (realStream != null) {
        return;
      }
      realStream = checkNotNull(stream, "stream");
    }

    drainPendingCalls();
  }

drainPendingCalls中,第一个task就是创建Stream,一个rpc请求对应一个stream:

public class DefaultHttp2Connection implements Http2Connection {
    private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
        public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
            State state = activeState(streamId, IDLE, isLocal(), halfClosed);
            checkNewStreamAllowed(streamId, state);
            // Create and initialize the stream.
            DefaultStream stream = new DefaultStream(streamId, state);
            incrementExpectedStreamId(streamId);
            addStream(stream);
            stream.activate();
            return stream;
        }
    }
}

创建Stream后,通过Stream发送请求realStream.writeMessage(message);

writeMessage将要发送的消息writeQueue中,最后通过scheduleFlush调用flush方法,发送到服务端:

class WriteQueue {
  void scheduleFlush() {
    if (scheduled.compareAndSet(false, true)) {
      // Add the queue to the tail of the event loop so writes will be executed immediately
      // inside the event loop. Note DO NOT do channel.write outside the event loop as
      // it will not wake up immediately without a flush.
      channel.eventLoop().execute(later);
    }
  }
}

调用flush方法将请求发送给服务端:

  private void flush() {
    try {
      QueuedCommand cmd;
      int i = 0;
      boolean flushedOnce = false;
      while ((cmd = queue.poll()) != null) {
        channel.write(cmd, cmd.promise());
        if (++i == DEQUE_CHUNK_SIZE) {
          i = 0;
          // Flush each chunk so we are releasing buffers periodically. In theory this loop
          // might never end as new events are continuously added to the queue, if we never
          // flushed in that case we would be guaranteed to OOM.
          channel.flush();
          flushedOnce = true;
        }
      }
      // Must flush at least once, even if there were no writes.
      if (i != 0 || !flushedOnce) {
        channel.flush();
      }
    } finally {
      // Mark the write as done, if the queue is non-empty after marking trigger a new write.
      scheduled.set(false);
      if (!queue.isEmpty()) {
        scheduleFlush();
      }
    }
  }

5.5 发送第二次rpc请求

StreamObserver<Hello.HelloRequest> helloOKStreamObserver = helloServiceStub.sayOK(streamObserver);
        helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm sad").build());
        helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: i'm happy").build());    

helloServiceStub.sayOK底层仍会创建ClientCallImpl对象,对象中创建一个新的流,后续两个请求会在这个流中进行发送。

5.6 客户端线程模型总结

观察Debug消息,发现sayOk的rpc方法请求的streamId是3:

深入浅出掌握grpc通信框架_netty_15

sayBad的rpc请求的streamId是5:

深入浅出掌握grpc通信框架_grpc_16

观察到线程模型如下:

深入浅出掌握grpc通信框架_grpc_17

其中:grpc-default-executor-0就是用于处理sayBad的rpc请求的HTTP包的发送;grpc-default-executor-1就是用于处理sayBad的rpc请求的HTTP包的发送。它们属于同一个线程池。而grpc-default-worker-ELG-1-1、grpc-default-worker-ELG-1-2、grpc-default-worker-ELG-1-3就是NioEventLoop线程,具体活跃的线程就是grpc-default-worker-ELG-1-2。

接受服务端响应时,会信从线程池中新建立一个线程进行逻辑处理。如下所示,新建了两个线程:

深入浅出掌握grpc通信框架_grpc_18

其中,grpc-default-executor-2负责处理sayOk对应的rpc响应的HTTP数据包;grpc-default-executor-3负责处理sayBad对应的rpc响应的HTTP数据包。

客户端线程模型总结:

深入浅出掌握grpc通信框架_grpc_19

可见,对于GRPC来说,不同的rpc请求,就对应不同的流。

6. 服务端执行流程

6.1 服务端netty启动

启动服务端:

io.grpc.Server server = ServerBuilder.forPort(19999).addService(new StreamHelloServiceImpl()).build().start();

通过NettyServerBuilder构建服务端参数,通过build方法构建服务端ServerImpl对象。在给ServerImpl构造函数传参时,通过buildTransportServer创建NettyServer对象,传给ServerImpl构造函数:

  public Server build() {
    ServerImpl server = new ServerImpl(
        this,
        buildTransportServer(Collections.unmodifiableList(getTracerFactories())),
        Context.ROOT);
    for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
      notifyTarget.notifyOnBuild(server);
    }
    return server;
  }

其中,buildTransportServer就通过NettyServerBuilder#buildTransportServer方法创建服务端NettyServer对象:

  protected NettyServer buildTransportServer(
      List<ServerStreamTracer.Factory> streamTracerFactories) {
    ProtocolNegotiator negotiator = protocolNegotiator;
    if (negotiator == null) {
      negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
              ProtocolNegotiators.serverPlaintext();
    }

    return new NettyServer(
        address, channelType, bossEventLoopGroup, workerEventLoopGroup,
        negotiator, streamTracerFactories, maxConcurrentCallsPerConnection, flowControlWindow,
        maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
        maxConnectionIdleInNanos,
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
  }

通过ServerImpl#start方法启动grpc服务。启动包含两个部分,启动NettyServer和初始化executor:

  public ServerImpl start() throws IOException {
    synchronized (lock) {
      checkState(!started, "Already started");
      checkState(!shutdown, "Shutting down");
      // Start and wait for any port to actually be bound.
      transportServer.start(new ServerListenerImpl());
      executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
      started = true;
      return this;
    }
  }

启动netty服务端的过程中,新增了childHandler,childHandler中创建NettyServerTransport对象,包装成为ServerTransportListenerImpl,后续ServerTransportListenerImpl用处很大:

class NettyServer implements InternalServer {
  public void start(ServerListener serverListener) throws IOException {
    listener = checkNotNull(serverListener, "serverListener");

    // If using the shared groups, get references to them.
    allocateSharedGroups();

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup);
    b.channel(channelType);
    if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
      b.option(SO_BACKLOG, 128);
      b.childOption(SO_KEEPALIVE, true);
    }
    b.childHandler(new ChannelInitializer<Channel>() {
      @Override
      public void initChannel(Channel ch) throws Exception {

        long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
        if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
          // apply a random jitter of +/-10% to max connection age
          maxConnectionAgeInNanos =
              (long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
        }

        NettyServerTransport transport =
            new NettyServerTransport(
                ch, protocolNegotiator, streamTracerFactories, maxStreamsPerConnection,
                flowControlWindow, maxMessageSize, maxHeaderListSize,
                keepAliveTimeInNanos, keepAliveTimeoutInNanos,
                maxConnectionIdleInNanos,
                maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
                permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
        ServerTransportListener transportListener;
        // This is to order callbacks on the listener, not to guard access to channel.
        synchronized (NettyServer.this) {
          if (channel != null && !channel.isOpen()) {
            // Server already shutdown.
            ch.close();
            return;
          }
          // `channel` shutdown can race with `ch` initialization, so this is only safe to increment
          // inside the lock.
          eventLoopReferenceCounter.retain(); 
          //将transport对象包装成listener,listener是ServerTransportListenerImpl
          transportListener = listener.transportCreated(transport);
        }
        transport.start(transportListener);
        ch.closeFuture().addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) {
            eventLoopReferenceCounter.release();
          }
        });
      }
    });
    // Bind and start to accept incoming connections.
    ChannelFuture future = b.bind(address);
    try {
      future.await();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      throw new RuntimeException("Interrupted waiting for bind");
    }
    if (!future.isSuccess()) {
      throw new IOException("Failed to bind", future.cause());
    }
    channel = future.channel();
  }
}

在完成netty服务端的启动后,通过executorPool.getObject()创建executor,而这个executor来自executorPool,executorPool如下所示:

  private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
      SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);

executorPool创建出的executor就是grpc-default-executor线程池:

  public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
      new Resource<ExecutorService>() {
        private static final String NAME = "grpc-default-executor";
        @Override
        public ExecutorService create() {
          return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
        }

        @Override
        public void close(ExecutorService instance) {
          instance.shutdown();
        }

        @Override
        public String toString() {
          return NAME;
        }
      };

6.2 创建HTTP请求

启动完了Netty服务端,接收Netty客户端请求后,出现了grpc-default-worker-ELG-3-1线程,这个就是netty workergroup中的NioEventLoop线程:

深入浅出掌握grpc通信框架_grpc_20

该线程中,接受到了HTTP请求数据包后,通过DefaultHttp2FrameReader#processHeaderState进行处理,判断数据包是什么类型:

public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration {
    private void processHeaderState(ByteBuf in) throws Http2Exception {
        if (in.readableBytes() < FRAME_HEADER_LENGTH) {
            // Wait until the entire frame header has been read.
            return;
        }

        // Read the header and prepare the unmarshaller to read the frame.
        payloadLength = in.readUnsignedMedium();
        if (payloadLength > maxFrameSize) {
            throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,
                                  maxFrameSize);
        }
        frameType = in.readByte();
        flags = new Http2Flags(in.readUnsignedByte());
        streamId = readUnsignedInt(in);

        // We have consumed the data, next time we read we will be expecting to read the frame payload.
        readingHeaders = false;

        switch (frameType) {
            case DATA:
                verifyDataFrame();
                break;
            case HEADERS:
                verifyHeadersFrame();
                break;
            case PRIORITY:
                verifyPriorityFrame();
                break;
            case RST_STREAM:
                verifyRstStreamFrame();
                break;
            case SETTINGS:
                verifySettingsFrame();
                break;
            case PUSH_PROMISE:
                verifyPushPromiseFrame();
                break;
            case PING:
                verifyPingFrame();
                break;
            case GO_AWAY:
                verifyGoAwayFrame();
                break;
            case WINDOW_UPDATE:
                verifyWindowUpdateFrame();
                break;
            case CONTINUATION:
                verifyContinuationFrame();
                break;
            default:
                // Unknown frame type, could be an extension.
                verifyUnknownFrame();
                break;
        }
    }
}

6.2.1 收到SETTINGS帧

调试时,发现第一个HTTP帧的类型是SETTINGS,它负责告诉服务端,使用HTTP2协议进行传输,SETTINGS类型的请求使用的streamId为0,不和rpc对应的请求的streamId共用:

深入浅出掌握grpc通信框架_grpc_21

后续会经历WINDOW_UPDATE等类型的帧,不太重要,不详细说。

6.2.2 收到rpc请求帧

如下,收到来自streamId为3的rpc请求的HEADERS帧:

深入浅出掌握grpc通信框架_netty_22

此时,直接调用DefaultHttp2FrameReader#readHeadersFrame除了Header帧:

    private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
            Http2FrameListener listener) throws Http2Exception {
        final int headersStreamId = streamId;
        final Http2Flags headersFlags = flags;
        final int padding = readPadding(payload);
        verifyPadding(padding);

        // The callback that is invoked is different depending on whether priority information
        // is present in the headers frame.
        if (flags.priorityPresent()) {
            long word1 = payload.readUnsignedInt();
            final boolean exclusive = (word1 & 0x80000000L) != 0;
            final int streamDependency = (int) (word1 & 0x7FFFFFFFL);
            if (streamDependency == streamId) {
                throw streamError(streamId, PROTOCOL_ERROR, "A stream cannot depend on itself.");
            }
            final short weight = (short) (payload.readUnsignedByte() + 1);
            final int lenToRead = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);

            // Create a handler that invokes the listener when the header block is complete.
            headersContinuation = new HeadersContinuation() {
                @Override
                public int getStreamId() {
                    return headersStreamId;
                }

                @Override
                public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
                        Http2FrameListener listener) throws Http2Exception {
                    final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
                    hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
                    if (endOfHeaders) {
                        listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), streamDependency,
                                weight, exclusive, padding, headersFlags.endOfStream());
                    }
                }
            };

            // Process the initial fragment, invoking the listener's callback if end of headers.
            headersContinuation.processFragment(flags.endOfHeaders(), payload, lenToRead, listener);
            resetHeadersContinuationIfEnd(flags.endOfHeaders());
            return;
        }

        // The priority fields are not present in the frame. Prepare a continuation that invokes
        // the listener callback without priority information.
        headersContinuation = new HeadersContinuation() {
            @Override
            public int getStreamId() {
                return headersStreamId;
            }

            @Override
            public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
                    Http2FrameListener listener) throws Http2Exception {
                final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
                hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
                if (endOfHeaders) {
                    listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,
                                    headersFlags.endOfStream());
                }
            }
        };

        // Process the initial fragment, invoking the listener's callback if end of headers.
        int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
        //处理Headers请求
        headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);
        resetHeadersContinuationIfEnd(flags.endOfHeaders());
    }

最终,调用ServerImpl$ServerTransportListenerImpl#streamCreated通过反射在异步线程grpc-default-executor-0中进行rpc对应的方法调用。:

    public void streamCreated(
        final ServerStream stream, final String methodName, final Metadata headers) {
      if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
        String encoding = headers.get(MESSAGE_ENCODING_KEY);
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
        if (decompressor == null) {
          stream.close(
              Status.UNIMPLEMENTED.withDescription(
                  String.format("Can't find decompressor for %s", encoding)),
              new Metadata());
          return;
        }
        stream.setDecompressor(decompressor);
      }

      final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
          stream.statsTraceContext(), "statsTraceCtx not present from stream");

      final Context.CancellableContext context = createContext(stream, headers, statsTraceCtx);
      final Executor wrappedExecutor;
      // This is a performance optimization that avoids the synchronization and queuing overhead
      // that comes with SerializingExecutor.
      if (executor == directExecutor()) {
        wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
      } else {
        //创建异步线程池,用于执行rpc方法
        wrappedExecutor = new SerializingExecutor(executor);
      }

      final JumpToApplicationThreadServerStreamListener jumpListener
          = new JumpToApplicationThreadServerStreamListener(
              wrappedExecutor, executor, stream, context);
      stream.setListener(jumpListener);
      // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
      // are delivered, including any errors. Callbacks can still be triggered, but they will be
      // queued.
      wrappedExecutor.execute(new ContextRunnable(context) {
          @Override
          public void runInContext() {
            ServerStreamListener listener = NOOP_LISTENER;
            try {
              //通过方法名,找到对应的方法定义
              ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
              if (method == null) {
                method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
              }
              if (method == null) {
                Status status = Status.UNIMPLEMENTED.withDescription(
                    "Method not found: " + methodName);
                // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
                // memory as a map whose key is the method name, this would allow a misbehaving
                // client to blow up the server in-memory stats storage by sending large number of
                // distinct unimplemented method
                // names. (https://github.com/grpc/grpc-java/issues/2285)
                stream.close(status, new Metadata());
                context.cancel(null);
                return;
              }
              //执行方法
              listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
            } catch (RuntimeException e) {
              stream.close(Status.fromThrowable(e), new Metadata());
              context.cancel(null);
              throw e;
            } catch (Error e) {
              stream.close(Status.fromThrowable(e), new Metadata());
              context.cancel(null);
              throw e;
            } finally {
              jumpListener.setListener(listener);
            }
          }
        });
    }

可以看到,streamId是3,请求的方法名是protobuf.HelloService/SayOK

深入浅出掌握grpc通信框架_netty_23

服务端通过ServerCalls$StreamingServerCallHandler#startCall调用sayOK方法:

    public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
      ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
          new ServerCallStreamObserverImpl<ReqT, RespT>(call);
      StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
      responseObserver.freeze();
      if (responseObserver.autoFlowControlEnabled) {
        call.request(1);
      }
      return new StreamingServerCallListener(requestObserver, responseObserver, call);
    }

HelloServiceGrpc$MethodHandlers#invoke调用sayOK方法:

    public io.grpc.stub.StreamObserver<Req> invoke(
        io.grpc.stub.StreamObserver<Resp> responseObserver) {
      switch (methodId) {
        case METHODID_SAY_HI:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.sayHi(
              (io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse>) responseObserver);
        case METHODID_SAY_BAD:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.sayBad(
              (io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse>) responseObserver);
        case METHODID_SAY_OK:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.sayOK(
              (io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse>) responseObserver);
        default:
          throw new AssertionError();
      }
    }

最后执行到自定义的sayOK方法,返回匿名内部类StreamObserver实例对象。返回StreamObserver<Hello.HelloRequest>是用于处理HelloRequest请求,调用onNext时,通过responseObserver#onNext发送响应:

    public StreamObserver<Hello.HelloRequest> sayOK(StreamObserver<Hello.HelloResponse> responseObserver) {
        //返回StreamObserver<Hello.HelloRequest>是用于处理HelloRequest请求,调用onNext时,通过responseObserver#onNext发送响应
        return new StreamObserver<Hello.HelloRequest>() {
            @Override
            public void onNext(Hello.HelloRequest value) {
                //收到一个请求,流式返回三个响应,这些响应在一个流里面
                System.out.println("receive : " + value.getGreeting());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok1: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok2: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok3: " + value.getGreeting()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("error");
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
                responseObserver.onCompleted();
            }
        };
    }

最终,将返回的requestObserver和responseObserver封装到StreamingServerCallListener类中,后续将StreamingServerCallListener封装到JumpToApplicationThreadServerStreamListener类中。

6.3 处理HTTP请求

6.2节中,收到RPC请求,创建了requestObserver和responseObserver以处理请求数据,并返回响应。当收到响应时,调用ServerCalls$StreamingServerCallHandler$StreamingServerCallListener#onMessage方法,继续在grpc-default-executor-0中执行响应:

      public void onMessage(ReqT request) {
        requestObserver.onNext(request);

        // Request delivery of the next inbound message.
        if (responseObserver.autoFlowControlEnabled) {
          call.request(1);
        }
      }

执行自定义的onNext方法,然后发送响应,通过ServerCalls$ServerCallStreamObserverImpl#onNext方法响应:

    public void onNext(RespT response) {
      if (cancelled) {
        throw Status.CANCELLED.asRuntimeException();
      }
      if (!sentHeaders) {
        call.sendHeaders(new Metadata());
        sentHeaders = true;
      }
      call.sendMessage(response);
    }

然后再netty workergroup nioEventLoop通过NettyServerHandler#write发送响应帧。

request时,创建了一个新线程发送数据:

    public void request(final int numMessages) {
      if (channel.eventLoop().inEventLoop()) {
        // Processing data read in the event loop so can call into the deframer immediately
        transportState().requestMessagesFromDeframer(numMessages);
      } else {
        //
        channel.eventLoop().execute(new Runnable() {
          @Override
          public void run() {
            transportState().requestMessagesFromDeframer(numMessages);
          }
        });
      }
    }

6.4 服务端处理多rpc请求

不同的rpc请求对应不同的流,服务端为每个rpc请求创建一个线程处理。如下,sayBad和sayOk两个rpc请求对应两个线程:

深入浅出掌握grpc通信框架_netty_24

sayBad方法对应的是streamId为5的流:

深入浅出掌握grpc通信框架_netty_25

最终的执行结果如下,可以看到不同rpc的打印交错执行,说明不同流的处理是并行的:

深入浅出掌握grpc通信框架_grpc_26

rpc客户端-服务端线程模型如下图所示:

深入浅出掌握grpc通信框架_grpc_27

7. grpc代码研究心得

为了看grpc,前前后后共花费了1个月的时间。总结下遇到的坑。

  1. 如果要从启动代码,一步一步看调用逻辑,最终看到服务端,估计3个月都看不完。为了急功近利地快点看完,其实通过debug下,找到最核心代码,打断点,然后基本上就能很快了解grpc的执行过程了。看其他框架同理。
  2. 看源码,先搞懂它的原理,比如netty,HTTP2这些知识。然后调试的时候,直接看服务端、客户端启动了哪些线程,这些线程有什么用,为什么这么设计,结合前面的基础知识,就能比较清晰地了解的组件的运行原理了。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK