35

Netty源码解析 -- 服务端启动过程

 3 years ago
source link: http://www.cnblogs.com/binecy/p/13908698.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文通过阅读Netty源码,解析Netty服务端启动过程。

源码分析基于Netty 4.1

Netty是一个高性能的网络通信框架,支持NIO,OIO等多种IO模式。通常,我们都是使用NIO模式,该系列文章也是解析Netty下NIO模式的实现。

首先,看一个NIO网络通信示意图

VVZBvy3.png!mobile

Netty中NIO网络通信过程在此基础上实现,下面来看一下具体实现。

Channel

首先,看一下Netty中的通道Channel,它代表了一个能完成IO操作的通道,提供read, write, connect, bind等方法。

Channel中维护了一个Unsafe对象,用于完成数据传输操作(这类操作通常由IO事件触发,而不是用户触发)。

SocketChannel代表Socket连接的网络通道,面向流,支持读写操作。

ServerChannel表示可以监听新连接的通道,ServerSocketChannel代表实现TCP/IP协议的ServerChannel。

AbstractChannel提供基础逻辑实现,它维护了Unsafe和ChannelPipeline对象,并委托这两个对象完成实际工作。同时,它也提供newUnsafe,newChannelPipeline方法给子类构造他们需要的对象。

AbstractUnsafe是AbstractChannel的内部类,实现了register,bind,disconnect等方法的基础逻辑。

ChannelPipeline可以理解为拦截器链表,维护了一个ChannelHandler链表,ChannelHandler即具体拦截器,负责逻辑处理。

DefaultChannelPipeline是ChannelPipeline接口的默认实现。Netty中Nio相关的Channel都使用它。

可以这样理解,Unsafe负责数据传输,而ChannelPipeline负责逻辑处理。

AbstractNioChannel实现了NIO基础逻辑,如维护(jvm)SelectableChannel,(jvm)SelectionKey等对象,还有一个很关键的selectionKey,代表关注的NIO事件。

AbstractNioUnsafe是AbstractNioChannel内部类,继承于AbstractUnsafe,并实现Unsafe另一个子接口NioUnsafe,添加了SelectableChannel相关的方法,如finishConnect,read。

AbstractNioChannel的子类可以分成ServerChannel实现类和SocketChannel实现类。

ServerChannel实现类是AbstractNioMessageChannel,newUnsafe方法返回的NioMessageUnsafe。

NioServerSocketChannel是AbstractNioMessageChannel子类,实现TCP/IP协议。

SocketChannel实现类是AbstractNioByteChannel,newUnsafe方法返回的NioByteUnsafe。

NioSocketChannel是AbstractNioByteChannel子类,实现TCP/IP协议。

Channel各实现类关系如下

mi2AJrR.png!mobile

Netty中将接口划分得很细微,最好大家可以按功能层次理解各接口代表含义以及实现类的​逻辑。​

以免后续看源码时混淆各接口功能。

服务端启动

首先简单了解一下EventLoop,可以理解为它负责处理网络事件和异步任务,后面有对应文章详细解析。

EventLoopGroup则是一组EventLoop集合,它会将操作委托给其中一个EventLoop处理。

Netty的服务端启动引导类ServerBootstrap中维护了两个EventLoopGroup,EventLoopGroup#childGroup和AbstractBootstrap#group。

AbstractBootstrap#group负责管理注册于其上的ServerChannel,处理这些Channel上发生的Accept事件,并将生成的SocketChannel注册到EventLoopGroup#childGroup。

EventLoopGroup#childGroup处理这些SocketChannel上发生的Read,Write事件。

为了方便,下文我将AbstractBootstrap#group称为AcceptGroup,ServerBootstrap#childGroup称为ReadGroup。

这些设计来自Reactor模式,详细可以见java.util.concurrent包的作者Doug Lea的 《Scalable IO in Java》

AbstractBootstrap#bind -> AbstractBootstrap#doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
	// #1
	final ChannelFuture regFuture = initAndRegister();	
	final Channel channel = regFuture.channel();
	if (regFuture.cause() != null) {
		return regFuture;
	}

	if (regFuture.isDone()) {
		ChannelPromise promise = channel.newPromise();
		// #2
		doBind0(regFuture, channel, localAddress, promise);
		return promise;
	} else {
		...
	}
}

#1 初始化及注册ServerChannel。

initAndRegister方法返回 ChannelFuture ,ChannelFuture继承了(jvm)Future,代表IO异步处理结果,并且可以绑定回调函数,异步IO处理完成Netty后会触发这些回调函数。

我们要有这个意识, Netty是一个异步框架 ,所有的IO操作都是异步的(充分利用cpu),IO方法不会等待实际IO操作完成,而是返回ChannelFuture。

待实际IO完成后,Netty再触发ChannelFuture中的回调函数处理后续逻辑。

ChannelPromise是一种特殊的ChannelFuture,提供更新操作结果的方法(setSuccess,setFailure方法),一般提供给IO方法作为参数(Unsafe中很多方法都有该参数),IO操作完成后,会调用这些方法更新操作结果。

#2 注册完成后,绑定ServerChannel监听端口。

AbstractBootstrap#initAndRegister

final ChannelFuture initAndRegister() {
	Channel channel = null;
	try {
		// #1
		channel = channelFactory.newChannel();
		// #2
		init(channel);
	} catch (Throwable t) {
		...
	}
	// #3
	ChannelFuture regFuture = config().group().register(channel);
	
	// #4
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
	return regFuture;
}

#1 构造ServerChannel

AbstractBootstrap#channelFactory是一个ReflectiveChannelFactory对象,他通过反射生成Channel。

ServerBootstrap#channel方法负责构造ReflectiveChannelFactory,并指定具体的ServerChannel类。

(所以我们要通过该方法指定NioServerSocketChannel.class -- new ServerBootstrap().channel(NioServerSocketChannel.class)

#2 初始化ServerChannel,该方法由子类实现

#3 注册Channel到AcceptGroup,注意, config().group() 返回AcceptGroup。

#4 如果IO操作发生了异常,需要关闭Channel。

NioServerSocketChannel#构造方法 -> NioServerSocketChannel#newSocket方法

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

使用(jvm)SelectorProvider,构造一个(jvm)ServerSocketChannel。

这里完成了NIO网络通信第一步。

ServerBootstrap#init

void init(Channel channel) throws Exception {
	// #1
	...

	ChannelPipeline p = channel.pipeline();

	final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

	// #2
	p.addLast(new ChannelInitializer<Channel>() {
		public void initChannel(final Channel ch) throws Exception {
			final ChannelPipeline pipeline = ch.pipeline();
			ChannelHandler handler = config.handler();
			if (handler != null) {
				pipeline.addLast(handler);
			}

			ch.eventLoop().execute(new Runnable() {
				public void run() {
					// #3
					pipeline.addLast(new ServerBootstrapAcceptor(
							ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));	
				}
			});
		}
	});
}

#1 设置ServerChannel的Option和Attribute属性。

#2 给ServerChannel的ChannelPipeline添加一个ChannelInitializer。

ChannelInitializer是一种特殊的ChannelHandler,initChannel方法负责完成一些Channel初始化工作,该方法的触发可以参考下文的延迟任务。

#3 上一步骤的ChannelInitializer负责给ServerChannel的ChannelPipeline添加一个ServerBootstrapAcceptor,并将SocketChannel的相关配置(childHandler,currentChildHandler,currentChildOptions,currentChildAttrs)交给它,ServerBootstrapAcceptor用于处理Accept事件,文章后面会解析。

AbstractBootstrap#initAndRegister方法 #3 步骤 -> SingleThreadEventLoop#register ->(通过Channel调用Unsafe)AbstractUnsafe#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...

    AbstractChannel.this.eventLoop = eventLoop;
	// #1
    if (eventLoop.inEventLoop()) {
        register0(promise);	
    } else {
        try {
            eventLoop.execute(new Runnable() {	
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

eventLoop.inEventLoop() 判断当前线程是否为EventLoop执行线程。

如果是,直接执行操作 -- 调用register0方法处理。

否则,提交一个任务给EventLoop。

这是Netty中提交异步任务的通用格式,Netty中有大量类似代码。

注意,这里是异步的关键,将当前操作作为一个异步任务,提交给EventLoop处理,而不需要阻塞当前线程。

EventLoop实际上是一个(jvm)EventExecutor,通过execute方法可以给它任务。

AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
	try {
		if (!promise.setUncancellable() || !ensureOpen(promise)) {
			return;
		}
		boolean firstRegistration = neverRegistered;
		// #1
		doRegister();
		neverRegistered = false;
		registered = true;

		// #2
		pipeline.invokeHandlerAddedIfNeeded();
		// #3
		safeSetSuccess(promise);
		// #4
		pipeline.fireChannelRegistered();
		// #5
		if (isActive()) {
			if (firstRegistration) {
				pipeline.fireChannelActive();
			} else if (config().isAutoRead()) {
				beginRead();
			}
		}
	} catch (Throwable t) {
		// #6
		closeForcibly();
		closeFuture.setClosed();
		safeSetFailure(promise, t);
	}
}

#1 由子类实现具体注册操作

#2 执行DefaultChannelPipeline中的延迟任务

#3 设置promise状态为Success

#4 触发ChannelPipeline#fireChannelRegistered

#5 如果是首次注册,触发ChannelPipeline#fireChannelActive

isActive() 方法判断当前Channel是否活跃

NioSocketChannel中调用SocketChannel#isOpen和SocketChannel#isConnected判断

NioServerSocketChannel中调用SelectableChannel#isOpen和ServerSocket#isBound方法判断

#6 异常处理,关闭Channel,设置promise状态为Failure。

AbstractUnsafe#doRegister -> AbstractNioChannel#doRegister

protected void doRegister() throws Exception {
	boolean selected = false;
	for (;;) {
		try {
			// #1
			selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
			return;
		} catch (CancelledKeyException e) {
			...
		}
	}
}

#1 javaChannel() 获取(jvm)SelectableChannel,

eventLoop().unwrappedSelector() 获取AcceptGroup维护的Selector(jvm)

这里将(jvm)ServerSocketChannel注册到(jvm)Selector,但还没有注册关注事件Key。

从Netty层面看,将Channel注册到EventLoop中。

注意,这里将当前NioServerSocketChannel作为channle#attachment,后面使用它来判断是否为IO事件。

AbstractUnsafe#register0方法 #5 步骤 -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive

这里涉及ChannelPipeline的事件传播,后面解析ChannelPipeline时详细说明。

HeadContext#channelActive会调用readIfIsAutoRead方法,判断是否开启autoRead,开启则自动触发read事件处理方法。

HeadContext#readIfIsAutoRead -> DefaultChannelPipeline#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {
	// #1
	final SelectionKey selectionKey = this.selectionKey;
	if (!selectionKey.isValid()) {
		return;
	}

	readPending = true;

	final int interestOps = selectionKey.interestOps();
	// #2
	if ((interestOps & readInterestOp) == 0) {
		selectionKey.interestOps(interestOps | readInterestOp);
	}
}

#1 selectionKey是Selector中关注事件集合(由AbstractNioChannel#doRegister方法中生成)

#2 这里注册了关注事件readInterestOp。

那么readInterestOp的值是什么呢? 它在AbstractNioChannel#构造方法中赋值,真正的值来自NioServerSocketChannel构造方法,可以看到,它在ServerChannel中固定为SelectionKey.OP_ACCEPT。

到这里,注册ServerChannel的关注事件OP_ACCEPT。

这里完成NIO网络通信第二步,注册关注事件。

AbstractBootstrap.doBind0 -> AbstractChannel#bind -> DefaultChannelPipeline#bind -> HeadContext#bind -> AbstractUnsafe#bind -> NioServerSocketChannel#doBind

protected void doBind(SocketAddress localAddress) throws Exception {
	if (PlatformDependent.javaVersion() >= 7) {
		javaChannel().bind(localAddress, config.getBacklog());
	} else {
		javaChannel().socket().bind(localAddress, config.getBacklog());
	}
}

根据不同JDK版本,调用不同的bind方法。

这里完成了NIO网络通信第三步,分配套接字地址,开始socket监听。

Accept事件处理

下面我们来看一下AcceptGroup中如何处理ServerChannel上监听到的accept事件。

这里涉及EventLoop的相关内容,后面有对应解析文章。

现在直接看Accept事件的处理方法NioMessageUnsafe#read

public void read() {
	...
	try {
		try {
			do {
				// #1
				int localRead = doReadMessages(readBuf);
				if (localRead == 0) {
					break;
				}
				if (localRead < 0) {
					closed = true;
					break;
				}

				allocHandle.incMessagesRead(localRead);
				
			} while (allocHandle.continueReading());
		} catch (Throwable t) {
			exception = t;
		}

		int size = readBuf.size();
		for (int i = 0; i < size; i ++) {
			readPending = false;
			// #2
			pipeline.fireChannelRead(readBuf.get(i));
		}
		readBuf.clear();
		allocHandle.readComplete();
		// #3
		pipeline.fireChannelReadComplete();

		...
	} ...
}

#1 调用NioServerSocketChannel#doReadMessages,处理Accept事件。

注意,readBuf是一个 List<Object> ,用于接收处理结果。

allocHandle.continueReading() ,判断是否需要继续执行,这里都是返回false

#2 触发DefaultChannelPipeline#fireChannelRead

#3 触发DefaultChannelPipeline#fireChannelReadComplete

NioServerSocketChannel#doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception {
	// #1
	SocketChannel ch = SocketUtils.accept(javaChannel());

	try {
		if (ch != null) {
			// #2
			buf.add(new NioSocketChannel(this, ch));
			return 1;
		}
	} catch (Throwable t) {
		...
	}

	return 0;
}

#1 调用(jvm)ServerSocketChannel#accept方法,生成的(jvm)SocketChannel

#2 使用(jvm)SocketChannel构造NioSocketChannel

前面说过,ServerChannel注册到AcceptGroup时,会给ServerChannel的ChannelPipeline添加一个ServerBootstrapAcceptor,用于处理accept事件。

NioMessageUnsafe#read方法 #2 步骤 -> DefaultChannelPipeline#fireChannelRead -> ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
	// #1
	final Channel child = (Channel) msg;

	child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

	try {
		// #2
		childGroup.register(child).addListener(new ChannelFutureListener() {
			public void operationComplete(ChannelFuture future) throws Exception {
				if (!future.isSuccess()) {
					forceClose(child, future.cause());
				}
			}
		});
	} catch (Throwable t) {
		forceClose(child, t);
	}
}

#1 注意msg参数,就是NioServerSocketChannel#doReadMessages方法中生成的NioSocketChannel。

上面说了,ServerBootstrap#init方法中会将ServerBootstrap中SocketChannel相关配置交给ServerBootstrapAcceptor。

这里配置NioSocketChannel的Options,Attribute,并将childHandler添加给pipeline。

#2 将NioSocketChannel注册到ReadGroup中,注册过程类似于NioServerSocketChannel注册到AcceptGroup,调用AbstractUnsafe#register方法实现。

但有一点不同,调用AbstractNioChannel#doBeginRead方法注册关注事件时,关注事件(即AbstractNioChannel#readInterestOp),是来自子类AbstractNioByteChannel#构造方法,固定为SelectionKey.OP_READ。

到这里,(jvm)SocketChannel已经注册到ReadGroupo维护中(jvm)Selector,关注的事件Key为read。

延迟任务

前面说了,ServerBootstrap#init方法 #2 步骤中ChannelInitializer#initChannel方法由延迟任务触发。现在看一下延迟任务的实现。

添加延迟任务

DefaultChannelPipeline#addFirst

public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
	final AbstractChannelHandlerContext newCtx;
	synchronized (this) {
		checkMultiplicity(handler);
		name = filterName(name, handler);
		// #1
		newCtx = newContext(group, name, handler);
		addFirst0(newCtx);
		// #2
		if (!registered) {
			newCtx.setAddPending();
			callHandlerCallbackLater(newCtx, true);
			return this;
		}
		// #3
		EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
	}
	callHandlerAdded0(newCtx);
	return this;
}

#1 构造一个ChannelHandlerContext并添加到拦截链表首部位置

#2 当前Channel未注册,调用DefaultChannelPipeline#callHandlerCallbackLater,添加一个延迟任务

#3 当前Channel已注册,调用DefaultChannelPipeline#callHandlerAdded0,完成ChannelHandler添加扩展操作。

DefaultChannelPipeline#callHandlerCallbackLater方法,将当前ChannelHandlerContext转化为一个延迟任务PendingHandlerAddedTask或者PendingHandlerRemovedTask,加到DefaultChannelPipeline#pendingHandlerCallbackHead列表中。

DefaultChannelPipeline#addLast/removeFirst/removeLast同样有类似处理延迟任务的逻辑。

执行延迟任务

AbstractUnsafe#register0方法 #2 步骤 -> DefaultChannelPipeline#callHandlerAddedForAllHandlers,该方法会执行pendingHandlerCallbackHead列表所有任务,调用其execute方法。

PendingHandlerAddedTask#execute会调用ChannelHandler#handlerAdded,完成ChannelHandler添加扩展工作。

PendingHandlerRemovedTask#execute则调用ChannelHandler#handlerRemoved,完成ChannelHandler移除善后工作。

ServerBootstrap#init方法 #2 步骤给ServerChannel的ChannelPipeline添加一个ChannelInitializer,它是Netty提供的工具类,实现了ChannelHandler#handlerAdded方法,实现逻辑是如果当前Channel已注册,则调用initChannel方法,否则不处理(所以我们常常利用该接口在注册完成后添加新的ChannelHandler给ChannelHandler)。

回到ServerBootstrap#init方法,由于该方法执行时Channel未注册,所以会生成延迟任务,由AbstractUnsafe#register0方法 #2 步骤触发完成实际操作,将ServerBootstrapAcceptor添加到ServerChannel的ChannelPipeline中。

最后说一下本文提到的netty组件。

Channel,通信通道,是Netty通信的基础组件。

EventLoop,ChannelPipeline是Netty中比较重要的组件,后面有对应的文章解析。

Unsafe负责实际数据传输工作,在解析Netty流程时会注解介绍它。

ChannelFuture,ChannelPromise代表Netty异步IO结果,通过回调函数执行后续操作。

如果您觉得本文不错,欢迎关注我的微信公众号,后续提供系列文章pdf下载。您的关注是我坚持的动力!

qYVJn2u.jpg!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK