22

【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

 3 years ago
source link: http://www.cnblogs.com/wang-meng/p/13711756.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前言

前面小飞已经讲解了 NIONetty 服务端启动,这一讲是 Client 的启动过程。

源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文 Netty 源码及以后的文章版本都基于: 4.1.22.Final

本篇是以 NettyClient 启动为切入点,带大家一步步进入 Netty 源码的世界。

Client启动流程揭秘

1、探秘的入口:netty-client demo

这里用 netty-exmaple 中的 EchoClient 来作为例子:

public final class EchoClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new EchoClientHandler());
                 }
             });

            ChannelFuture f = b.connect(HOST, PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

代码没有什么独特的地方,我们上一篇文章时也梳理过 Netty 网络编程的一些套路,这里就不再赘述了。

(忘记的小朋友可以查看 Netty 系列文章中查找~)

上面的客户端代码虽然简单, 但是却展示了 Netty 客户端初始化时所需的所有内容:

  • EventLoopGroupNetty 服务端或者客户端,都必须指定 EventLoopGroup ,客户端指定的是 NioEventLoopGroup
  • Bootstrap : Netty 客户端启动类,负责客户端的启动和初始化过程
  • channel() 类型:指定 Channel 的类型,因为这里是客户端,所以使用的是 NioSocketChannel ,服务端会使用 NioServerSocketChannel
  • Handler :设置数据的处理器
  • bootstrap.connect() : 客户端连接 netty 服务的方法

2、NioEventLoopGroup 流程解析

我们先从 NioEventLoopGroup 开始,一行行代码解析,先看看其类结构:

iYNBFnn.png!mobile

上面是大致的类结构,而 EventLoop 又继承自 EventLoopGroup ,所以类的大致结构我们可想而知。这里一些核心逻辑会在 MultithreadEventExecutorGroup 中,包含 EventLoopGroup 的创建和初始化操作等。

接着从 NioEventLoopGroup 构造方法开始看起,一步步往下跟( 代码都只展示重点的部分,省去很多暂时不需要关心的代码,以下代码都遵循这个原则 ):

EventLoopGroup group = new NioEventLoopGroup();

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里通过调用 this()super() 方法一路往下传递,期间会构造一些默认属性,一直传递到 MultithreadEventExecutorGroup 类中,接着往西看。

2.1、MultithreadEventExecutorGroup

上面构造函数有一个重要的参数传递: DEFAULT_EVENT_LOOP_THREADS ,这个值默认是 CPU核数 * 2

为什么要传递这个参数呢?我们之前说过 EventLoopGroup 可以理解成一个线程池, MultithreadEventExecutorGroup 有一个线程数组 EventExecutor[] children 属性,而传递过来的 DEFAULT_EVENT_LOOP_THREADS 就是数组的长度。

先看下 MultithreadEventExecutorGroup 中的构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    
    children = new EventExecutor[nThreads];
    
    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(executor, args);
    }
    
    // ... 省略
}

这段代码执行逻辑可以理解为:

  • 通过 ThreadPerTaskExecutor 构造一个 Executor 执行器,后面会细说,里面包含了线程执行的 execute() 方法
  • 接着创建一个 EventExecutor 数组对象,大小为传递进来的 threads 数量,这个所谓的 EventExecutor 可以理解为我们的 EventLoop ,在这个demo中就是 NioEventLoop 对象
  • 最后调用 newChild 方法逐个初始化 EventLoopGroup 中的 EventLoop 对象

上面只是大概说了下 MultithreadEventExecutorGroup 中的构造方法做的事情,后面还会一个个详细展开,先不用着急,我们先有个整体的认知就好。

再回到 MultithreadEventExecutorGroup 中的构造方法入参中,有个 EventExecutorChooserFactory 对象,这里面是有个很亮眼的细节设计,通过它我们来洞悉 Netty 的良苦用心。

2.1、亮点设计:DefaultEventExecutorChooserFactory

Jr2Afum.png!mobile

EventExecutorChooserFactory 这个类的作用是用来选择 EventLoop 执行器的,我们知道 EventLoopGroup 是一个包含了 CPU * 2 个数量的 EventLoop 数组对象,那每次选择 EventLoop 来执行任务是选择数组中的哪一个呢?

我们看一下这个类的具体实现, 红框中 都是需要重点查看的地方:

zYZZfyz.png!mobile

DefaultEventExecutorChooserFactory 是一个选择器工厂类,调用里面的 next() 方法达到一个轮询选择的目的。

数组的长度是length,执行第n次,取数组中的哪个元素就是对length取余

j6JzqeJ.png!mobile

继续回到代码的实现,这里的优化就是在于先通过 isPowerOfTwo() 方法判断数组的长度是否为2的n次幂,判断的方式很巧妙,使用 val & -val == val ,这里我不做过多的解释,网上还有很多判断2的n次幂的优秀解法,我就不班门弄斧了。( 可参考: https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/ )

当然我认为这里还有更容易理解的一个算法: x & (x - 1) == 0 大家可以看下面的图就懂了,这里就不延展了:

YVj2QfB.png!mobile

BUT!!! 这里为什么要去煞费苦心的判断数组的长度是2的n次幂?

不知道小伙伴们是否还记得 大明湖畔HashMap ?一般我们要求 HashMap 数组的长度需要是2的n次幂,因为在 key 值寻找数组位置的方法: (n - 1) & hash n是数组长度,这里如果数组长度是2的n次幂就可以通过位运算来提升性能,当 length 为2的n次幂时下面公式是等价的:

n & (length - 1) <=> n % length

还记得上面说过,数组的长度默认都是 CPU * 2 ,而一般服务器CPU核心数都是2、4、8、16等等,所以这一个小优化就很实用了,再仔细想想,原来数组长度的初始化也是很讲究的。

这里位运算的好处就是效率远远高于与运算, Netty 针对于这个小细节都做了优化,真是太棒了。

2.3、线程执行器:ThreadPerTaskExecutor

接着看下 ThreadPerTaskExecutor 线程执行器,每次执行任务都会通过它来创建一个线程实体。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

传递进来的 threadFactoryDefaultThreadFactory ,这里面会构造 NioEventLoop 线程命名规则为 nioEventLoop-1-xxx ,我们就不细看这个了。当线程执行的时候会调用 execute() 方法,这里会创建一个 FastThreadLocalThread 线程,具体看代码:

public class DefaultThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
}

这里通过 newThread() 来创建一个线程,然后初始化线程对象数据,最终会调用到 Thread.init() 中。

2.4、EventLoop初始化

接着继续看 MultithreadEventExecutorGroup 构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(executor, args);
        // .... 省略部分代码
    }
}

上面代码的最后一部分是 newChild 方法, 这个是一个抽象方法, 它的任务是实例化 EventLoop 对象. 我们跟踪一下它的代码, 可以发现, 这个方法在 NioEventLoopGroup 类中实现了, 其内容很简单:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

其实就是实例化一个 NioEventLoop 对象, 然后返回。 NioEventLoop 构造函数中会保存 provider 和事件轮询器 selector ,在其父类中还会创建一个 MpscQueue队列 ,然后保存线程执行器 executor

再回过头来想一想, MultithreadEventExecutorGroup 内部维护了一个 EventExecutor[] children 数组, NettyEventLoopGroup 的实现机制其实就建立在 MultithreadEventExecutorGroup 之上。

每当 Netty 需要一个 EventLoop 时, 会调用 next() 方法从 EventLoopGroup 数组中获取一个可用的 EventLoop 对象。其中 next 方法的实现是通过 NioEventLoopGroup.next() 来完成的,就是用的上面有过讲解的通过轮询算法来计算得出的。

最后总结一下整个 EventLoopGroup 的初始化过程:

JNNjU32.png!mobile

  • EventLoopGroup (其实是 MultithreadEventExecutorGroup ) 内部维护一个类型为 EventExecutor children 数组,数组长度是 nThreads
  • 如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是 处理器核心数 * 2
  • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组
  • 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
  • NioEventLoop 属性:
    • SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
    • Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

2.5、NioSocketChannel

Netty 中, Channel 是对 Socket 的抽象,每当 Netty 建立一个连接后,都会有一个与其对应的 Channel 实例。

我们在开头的 Demo 中,设置了 channel(NioSocketChannel.class)NioSocketChannel 的类结构如下:

RFFFj2.png!mobile

接着分析代码,当我们调用 b.channel() 时实际上会进入 AbstractBootstrap.channel() 逻辑,接着看 AbstractBootstrap 中代码:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    if (clazz == null) {
        throw new NullPointerException("clazz");
    }
    this.clazz = clazz;
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}

可以看到,这里 ReflectiveChannelFactory 其实就是返回我们指定的 channelClass:NioSocketChannel , 然后指定 AbstractBootstrap 中的 channelFactory = new ReflectiveChannelFactory()

2.6、Channel初始化流程

到了这一步,我们已经知道 NioEventLoopGroupchannel() 的流程,接着来看看 Channel 的 初始化流程,这也是 Netty 客户端启动的的核心流程之一:

ChannelFuture f = b.connect(HOST, PORT).sync();

接着就开始从 b.connect() 为入口一步步往后跟,先看下 NioSocketChannel 构造的整体流程:

Iz26FrM.png!mobile

connet 往后梳理下整体流程:

Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {
    Channel channel = channelFactory.newChannel();
    init(channel);
    
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

为了更易读,这里代码都做了简化,只保留了一些重要的代码。

紧接着我们看看 channelFactory.newChannel() 做了什么,这里 channelFactoryReflectiveChannelFactory ,我们在上面的章节分析过:

@Override
public T newChannel() {
    try {
        return clazz.getConstructor().newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

这里的 clazzNioSocketChannel ,同样是在上面章节讲到过,这里是调用 NioSocketChannel 的构造函数然后初始化一个 Channel 实例。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }
}

这里其实也很简单,就是创建一个 Java NIO SocketChannel 而已,接着看看 NioSocketChannel 的父类还做了哪些事情,这里梳理下类的关系:

NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        ch.configureBlocking(false);
    }
}

这里会调用父类的构造参数,并且传递 readInterestOp = SelectionKey.OP_READ: ,这里还有一个很重要的点,配置 Java NIO SocketChannel 为非阻塞的,我们之前在 NIO 章节的时候讲解过,这里也不再赘述。

接着继续看 AbstractChannel 的构造函数:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
}

这里创建一个 ChannelId ,创建一个 Unsafe 对象,这里的 Unsafe 并不是Java中的Unsafe,后面也会讲到。然后创建一个 ChannelPipeline ,后面也会讲到,到了这里,一个完整的 NioSocketChannel 就初始化完成了,我们再来总结一下:

  • NettySocketChannel 会与 Java 原生的 SocketChannel 绑定在一起;
  • 会注册 Read 事件;
  • 会为每一个 Channel 分配一个 channelId
  • 会为每一个 Channel 创建一个 Unsafe 对象;
  • 会为每一个 Channel 分配一个 ChannelPipeline

2.7、Channel 注册流程

还是回到最上面 initAndRegister 方法,我们上面都是在分析里面 newChannel 的操作,这个方法是 NioSocketChannel 创建的一个流程,接着我们在继续跟 init()register() 的过程:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    final ChannelFuture initAndRegister() {
        Channel channel = channelFactory.newChannel();
        init(channel);
        ChannelFuture regFuture = config().group().register(channel);
    }
}

init() 就是将一些参数 optionsattrs 设置到 channel 中,我们重点需要看的是 register 方法,其调用链为:

AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register

这里最后到了 unsaferegister() 方法,最终调用到 AbstractNioChannel.doRegister() :

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        return;
    }
}

javaChannel() 就是 Java NIO 中的 SocketChannel ,这里是将 SocketChannel 注册到与 eventLoop 相关联的 selector 上。

7NVzuyv.png!mobile

最后我们整理一下服务启动的整体流程:

  1. initAndRegister() 初始化并注册什么呢?
  • channelFactory.newChannel()
  • 通过反射创建一个 NioSocketChannel
  • Java 原生 Channel 绑定到 NettyChannel
  • 注册 Read 事件
  • Channel 分配 id
  • Channel 创建 unsafe 对象
  • Channel 创建 ChannelPipeline (默认是 head<=>tail 的双向链表)
  1. `init(channel)``
  • Bootstrap 中的配置设置到 Channel
  1. register(channel)
  • Channel 绑定到一个 EventLoop
  • Java 原生 Channel、NettyChannel、Selector 绑定到 SelectionKey
  • 触发 Register 相关的事件

2.8 unsafe初始化

上面有提到过在初始化 Channel 的过程中会创建一个 Unsafe 的对象,然后绑定到 Channel 上:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

newUnsafe 直接调用到了 NioSocketChannel 中的方法:

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

NioSocketChannelUnsafeNioSocketChannel 中的一个内部类,然后向上还有几个父类继承,这里主要是对应到相关 Java 底层的 Socket 操作。

2.9 pipeline初始化

我们还是回到 pipeline 初始化的过程,来看一下 newChannelPipeline() 的具体实现:

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

我们调用 DefaultChannelPipeline 的构造器, 传入了一个 channel , 而这个 channel 其实就是我们实例化的 NioSocketChannel

DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在 channel 字段中. DefaultChannelPipeline 中, 还有两个特殊的字段, 即 headtail , 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的关键.

关于 DefaultChannelPipeline 中的双向链表以及它所起的作用, 我们会在后续章节详细讲解。这里只是对 pipeline 做个初步的认识。

HeadContext 的继承层次结构如下所示:

BjmE32q.png!mobile

TailContext 的继承层次结构如下所示:

7faeUbB.png!mobile

我们可以看到, 链表中 head 是一个 ChannelOutboundHandler , 而 tail 则是一个 ChannelInboundHandler .

3.0、客户端connect过程

客户端连接的入口方法还是在 Bootstrap.connect() 中,上面也分析过一部分内容,请求的具体流程是:

Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
            throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws IOException {
                return socketChannel.connect(remoteAddress);
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

看到这里,还是用 Java NIO SocketChannel 发送的 connect 请求进行客户端连接请求。

总结

本篇文章以一个 Netty Client demo 为入口,然后解析了 NioEventLoopGroup 创建的流程、 Channel 的创建和注册的流程,以及客户端发起 connect 的具体流程,这里对于很多细节并没有很深的深入下去,这些会放到后续的源码分析文章,敬请期待~

u6VZbiU.png!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK