1

NioServerSocketChannel的注册源码解析

 2 years ago
source link: https://segmentfault.com/a/1190000040318455
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

我们上一章分析了Netty中NioServerSocketChaennl的创建于初始化,本章节将继续分析NioServerSocketChannel的分析,NioServerSocketChannel是Netty官方封装的一个通道对象,旨用来代替或者包装JDK原生的SocketChannel对象,那么他是如何讲NioServerSocketChannel于JDK的NIO相关代码关联起来的呢?

一、源码入口寻找

我们上一节课主要分析的源码方法是initAndRegister方法,其实从名字可以看出来,这里是做通道的初始化于注册的,我们继续回到这个方法,该方法的寻找,参照上一章节:

AbstractBootstrap#initAndRegister

我们跳过上节课已经分析的代码,直接来到注册相关的逻辑:

ChannelFuture regFuture = config().group().register(channel);

我们逐个方法进行分析:

config()

image-20210429084738842

现在我们创建的ServerBootstrap,所以为什么选这个我就不多说了:

private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

我们可以看到,他返回的是这个对象,该对象是再创建ServerBootstrap的时候自动创建的,我们看,他构造方法里面穿了一个this,证明他持有一个ServerBootstrap的引用,这代表着他可以通过这个对象,获取ServerBootstrap内所有的属性和方法!获取到这个类之后干嘛了呢?

config().group()

估计大家很多都已经猜出来了,我们直接点进group里面去验证一下:

@SuppressWarnings("deprecation")
public final EventLoopGroup group() {
    return bootstrap.group();
}

该代码是获取到了我们再构建ServerBootstrap的时候设置的bossGroup对象,有兴趣的可以追一下,这里比较简单就不做太多的阐述了,我们继续回到主线,

config().group().register(channel);

我们通过上述代码的分析,知道了group方法返回的是NioEventLoopGroup,我们进入到register方法:

image-20210429090641133

我们发现这里并没有NioEventLoopGroup,但是通过前几章我们的学习,我们知道NioEventLoopGroup是MultithreadEventLoopGroup的子类,所以我们子类没有往父类找,我们进入到MultithreadEventLoopGroup源码里面:

@Override
public ChannelFuture register(Channel channel) {
    //一般来说这里获取的NioEventLoop 他有继承与  SingleThreadEventLoop
    return next().register(channel);
}

在这里,我们看到了一个我们前面分析过得代码,next(),他调用的是chooser.next();, chooser是我们在构建NioEventLoopGroup的时候创建的一个执行器的选择器,next方法的功能是轮训的返回一个线程执行器:NioEventLoop!记不太清的同学可以回头看NioEventLoopGroup初始化源码解析的那一章代码!

现在我们根据前几章的基础,我们知道了next()方法返回的是一个NioEventLoop类,我们进入到register()方法查看:

image-20210429095521189

但是,我们发现NioEventLoop相关的实现,但是我们根据前面所学,我们可以知道,NioEventLoop的父类是SingleThreadEventLoop,所以我们进入到 SingleThreadEventLoop#register(io.netty.channel.Channel):

@Override
public ChannelFuture register(Channel channel) {
    //调用本身的注册方法
    return register(new DefaultChannelPromise(channel, this));
}

//没什么可说的继续往下追
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

我们一定能够猜到,这里的主要代码是:promise.channel().unsafe().register(this, promise);

我们上一章分析过 unsafe是 NioMessageUnsafe, 但是register却没有他的实现:

image-20210429100442771

我们还是需要往父类追,进入到io.netty.channel.AbstractChannel.AbstractUnsafe#register(this, promise):

我们这里先关注一下参数 :

this: 传入的是他本身,他本身是个什么 NioEventLoop,也就是说,他传入了一个执行器

promise:NioServerSocketChannel的包装对象

我们进入到 register方法中,分析主要代码:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ......................暂时忽略不必要代码.............................
    AbstractChannel.this.eventLoop = eventLoop;
    //注意此时的thread = null 所以返回false
    if (eventLoop.inEventLoop()) {
        //实际的注册 注册selector 触发 handlerAdded事件和 channelRegistered事件
        register0(promise);
    } else {
        .......................暂时忽略不必要代码......................
    }
}
AbstractChannel.this.eventLoop = eventLoop;

首先我们将上一步获取的执行器保存在NioServerSocketChannel中! 这行代码有力的证明了,每一个Channel绑定一个NioEventLoop对象!

if (eventLoop.inEventLoop()) {
    //实际的注册 注册selector 触发 handlerAdded事件和 channelRegistered事件
    register0(promise);
}

注意:这里我需要澄清一点,真实的调试过程中,并不会走这个分支,而是会走else分支异步进行注册,这里为了更方便大家理解,我就依照if分支进行源码分析,其实没有太大变化,都是调用register0方法进行注册,只不过一个同步一个异步,关于异步,是Netty中及其重要的一个知识点,我将放到后面单独开一章进行讲解!

我们进入到register0源码里面:

private void register0(ChannelPromise promise) {
    try {
        ..............忽略代码..................
]
        //实际的注册  调用jdk底层的数据注册selector
        // 调用 JDK 底层的 register() 进行注册
        //io.netty.channel.nio.AbstractNioChannel.doRegister
        doRegister();
        neverRegistered = false;
        registered = true;

        //通知管道  传播handlerAdded事件
        //触发 handlerAdded 事件 触发任务 add事件
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //通知管道  传播channelRegistered事件
        // 触发 channelRegistered 事件
        pipeline.fireChannelRegistered();
        // 如果从未注册过频道,则仅触发channelActive。
        // 如果取消注册并重新注册通道,则多个通道处于活动状态。
        //isActive() 返回false
        // 此时 Channel 还未注册绑定地址,所以处于非活跃状态
        if (isActive()) {
            ....................忽略不必要代码..................
        }
    } catch (Throwable t) {
        // 直接关闭通道以避免FD泄漏。
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

二、源码解析

doRegister();

doRegister();

真正的注册方法,该方法是将Netty本身的NioServerSocket与JDK连接起来的最重要的一个类!

image-20210429142545350

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

javaChannel()方法是返回JDK原生的SocketChannel,他是再NioServerSocketChannel初始化的时候被保存的,还记得我们再讲述NIO开发Socket的时候的流程吗

image-20210429143500583

我们重点关注一下javaChannel().register的参数:

eventLoop().unwrappedSelector():NioEventLoop再创建的时候,会保存两个选择器,一个是JDK的原始的选择器,一个是经过Netty包装的选择器,这里返回的是原生的选择器!

0:不关注任何事件

this:this代表着当前类,他是NioServerSocketChannel类型的,他将一个NioServerSocketChannel的对象,绑定到了JDK原生的选择器,后续只需要通过SelectionKey.attachment(),就能获取到NioServerSocketChannel,而一个NioServerSocketChannel里面又包含一个JDK原生的Channel对象,就可以基于该jdk原生的Channel来进行各种读写操作!

到现在为止,我们就完成JDK中的NIO的将通道绑定到选择器上,我们回到上一步:

pipeline.invokeHandlerAddedIfNeeded

pipeline.invokeHandlerAddedIfNeeded();

开始回调pipeline通道里面添加自定义事件:

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // 现在,我们已注册到EventLoop。现在该调用ChannelHandler的回调了,
        // 在完成注册之前添加的内容。
        callHandlerAddedForAllHandlers();
    }
}

//callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
    //task = PendingHandlerAddedTask
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

需要注意的是 PendingHandlerCallback task 是PendingHandlerAddedTask类型的,他是什么时候加载的呢?实在我们初始化NioServerSocketChannel的时候调用addLast方法的时候被赋的值,有兴趣的小伙伴可以自己去跟一下源码,这里直接进入到:

image-20210429155614576

if (executor.inEventLoop()) {
    callHandlerAdded0(ctx);
}

//进入到 callHandlerAdded0源码逻辑
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try{
        ctx.callHandlerAdded();
    }
    .......................
}

//进入到ctx.callHandlerAdded();
final void callHandlerAdded() throws Exception {
    if (setAddComplete()) {
        handler().handlerAdded(this);
    }
}

还接记得handler()吗,我再NioServerSocketChannel初始化的时候说过,当时程序向pipeline中添加了一个ChannelInitializer,这里返回的就是那个ChannelInitializer! 我们进入到ChannelInitializer#handlerAdded方法里面:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            removeState(ctx);
        }
    }
}

首先我们重点关注一个 initChannel(ctx)

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // 防止再次进入。
    if (initMap.add(ctx)) {
        try {
            // 调用 ChannelInitializer 实现的 initChannel() 方法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            ................................
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // 将 ChannelInitializer 自身从 Pipeline 中移出
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
initChannel((C) ctx.channel());

该方法会回调ChannelInitializer的抽象方法initChannel,该抽象方法在我们初始化的时候完成,我们就要找到实现这个抽象方法的地方,我们回到上一节课的代码: io.netty.bootstrap.ServerBootstrap#init

void init(Channel channel) {
    ..........................;
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            //将用户自定义的handler添加进管道  handler 是在构建ServerBootStr的时候传入的  handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(() -> {
                pipeline.addLast(new ServerBootstrapAcceptor(
                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            });
    }    
}

上一节课讲的时候,我们将这一段逻辑略过了,只说是会向通道中添加一个ChannelInitializer实现,现在开始回调他的initChannel方法了:

ChannelHandler handler = config.handler();
if (handler != null) {
    pipeline.addLast(handler);
}

这段代码会将客户再构建ServerBootstrap的时候传入的handler添加进通道,我们为了方便理解,假设用户没有设置handler,所以这个handler判断不通过,跳过,我们继续往下:

ch.eventLoop().execute(() -> {
    pipeline.addLast(new ServerBootstrapAcceptor(
        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
});

这里异步的向管道流注册一个默认的Handler, 为什么说是异步的,我们暂且不说,我们暂且认为是同步的进行add,此时我们的通道如下:

image-20210429162226540

ServerBootstrapAcceptor的作用是专门用于新连接接入的,

ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
    this.childGroup = childGroup;
    this.childHandler = childHandler;
    this.childOptions = childOptions;
    this.childAttrs = childAttrs;

    enableAutoReadTask = new Runnable() {
        @Override
        public void run() {
            channel.config().setAutoRead(true);
        }
    };
}

我们可以看到,他会保存一系列的参数,包括WorkGroup、childHandler、childOptions、childAttrs这些参数都是我们再创建serverBootstrap的时候传入的参数,这也证明了,这些参数是作用于客户端Socket连接的!

有关ServerBootstrapAcceptor,后续会进行一个详细的分析,我们接着说,这里需要重点讲一下addLast方法,

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        ........................忽略.........................
        //通知添加方法回调
        callHandlerAdded0(newCtx);
        return this;
    }

在进行添加的时候,他会回调内部产生的handlerAdded方法,还记得,我们在介绍Netty的基本架构的业务通道章节吗?

image-20210429163456014

再调用addLast之后,该方法会被回调!

这样就讲所有的方法注册完毕了,我们继续回到ChannelInitializer#handlerAdded方法,当initChannel(ctx)调用完了之后:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // 防止再次进入。
    if (initMap.add(ctx)) {
        try {
            // 调用 ChannelInitializer 实现的 initChannel() 方法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            ................................
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // 将 ChannelInitializer 自身从 Pipeline 中移出
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

我们会进入到finally里面,我们会看到,此时会做一个操作,删除当前的类,当前的类是谁,是ChannelInitializer,所以删除完毕后,此时管道对象的结构如图所示:

image-20210429173529722

至此 invokeHandlerAddedIfNeeded 分析完毕

pipeline.fireChannelRegistered();

@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

这行代码其实没什么可说的,大家可以自己跟一下调试一下代码,这个代码的意义是从HeadContext节点开始传播channelRegistered方法:

image-20210429174602462

至此,NioServerSocketChannel的注册基本就分析完了,有的同学可能觉得少分析了一段:

if (isActive()) {
    if (firstRegistration) {
        //Channel 当前状态为活跃时,触发 channelActive 事件
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        // 该通道已注册,并已设置autoRead()。这意味着我们需要开始阅读
        // 再次,以便我们处理入站数据。
        //
        // See https://github.com/netty/netty/issues/4805
        //开始读事件
        beginRead();
    }
}

这段代码再第一次启动的时候并不会被调用,因为此时通道还没有绑定端口正式启动起了,所以这里isActive会返回false,有关逻辑,会在新连接接入讲解的时候进行分析!

  1. Netty会调用JDK底层的注册方法,同时将本身的NioServerSocketChannel作为att绑定到选择事件上!
  2. 当注册完成后会回调 handlerAdded方法
  3. Netty会回调再初始化NioServerSocketChannel的时候注册的Channelinitialization<Channel>, 添加一个新连接接入器ServerBootstrapAcceptor,并删除本身!
  4. 当注册完成后会回调Channelregistered方法

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK