

Netty线程模型
source link: https://zofun.github.io/2020/05/16/Netty%E7%BA%BF%E7%A8%8B%E6%A8%A1%E5%9E%8B/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Reactor线程模型
Netty的线程模型实际上就是Reactor模型的一种实现。
Reactor模型是基于事件驱动开发的,核心组成部分是一个Reactor和一个线程池,其中Reactor负责监听和分配事件,线程池负责处理事件。根据Reactor的数量有线程池的数量,又可以将Reactor分为三种模型:
- 单线程模型(单Reactor,单线程)
- 多线程模型(单Reactor,多线程)
- 主从多线程模型(多Reactor,多线程)
单线程模型
- Reactor内部通过
selector
轮询连接,收到事件后,通过dispatch
进行分发。 - 如果是连接事件,则分发给
Acceptor
处理,Accepter
通过accept
接受连接,并创建一个Headler
来处理连接后的各种事件。 - 如果是读写事件,那么直接交由对应的
Headelr
进行处理。
多线程模型
- 主线程中,Reactor对象通过
selector
监控连接事件,收到事件后通过dispatch
进行分发。 - 如果是建立连接的事件,则
Accepter
负责处理,它会通过accept
接受请求,并创建一个Headler
来处理后序事件,而Headler
只负责相应事件,不进行业务操作,也就是只进行read
读取数据和write
写出数据,业务处理是交给线程池进行处理。 - 线程池分配一个线程来进行业务的处理,处理结果交由对应的
Handler
进行转发。
主从多线程模型
- 存在多个
Reactor
,每个Reactor
都有自己的selector
选择器,线程和dispatch
- 主线程中的
mainReactor
通过自己的selector
监控连接建立事件,收到事件后通过Accepter
接受,将任务分配给某个子线程。 - 子线程中的
subReactor
将mainReactor
分配的连接加入连接队列中通过自己的selector
进行监听,并创建一个Handler
用于处理后序事件。 Handler
完成read
->业务处理->send
的完整业务流程。
Netty中的线程模型与Reactor的联系
在Netty中主要是通过NioEventLoopGroup
线程池来实现具体的线程模型的。
单线程模型
单线程模型就是指定一个线程执行客户端连接和读写操作,也就是在一个Reactor
中完成。对应的实现方式就是将NioEventLoopGroup
线程数设置为1.
Netty中是这样构造单线程模型的:
NioEventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
多线程模型
多线程模型就是当Reactor
进行客户端的连接处理,然后业务处理交由线程池来执行。
Netty中是这样构造多线程模型的:
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
主从多线程模型(最常使用)
主从多线程模型是有多个Reactor
,也就是有多个selector
,所以我们定义一个bossGroup
和一个workGroup
在Netty中是这样构建主从多线程模型的:
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
相较于多线程模型,主从多线程模型不会遇到处理连接的瓶颈问题。在多线程模型下,因为只有一个NIO的Acceptor
来处理连接请求,所以会出现性能瓶颈。
NioEventLoop源码分析
在Netty线程模型中,NioEventLoop
是比较关键的类。下面我们对它的实现进行分析。
它的继承关系图如下:
NioEventLoop
需要处理网络IO请求,因此有一个多路复用器Selector
:
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
并且在构造方法中完成了初始化:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
在NioEventLoop
中run()
方法比较的关键:
protected void run() {
for (;;) {
try {
try {
//通过hasTasks方法判断队列中是否还有未处理的方法
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
//没有任务则执行,select()执行网络IO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
//如果本轮Selector的轮询结果为null,那么可能触发了jdk epoll的bug
//该bug会导致IO线程处于100%的状态,需要重建Selector来解决
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
//处理IO事件所需的事件和花费在处理task的时间的比例,默认为50%
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//如果比例为100.则表示每次处理完IO后,才开始处理task
processSelectedKeys();
} finally {
// 执行task任务
runAllTasks();
}
} else {
//记录处理IO的开始时间
final long ioStartTime = System.nanoTime();
try {
//处理IO请求
processSelectedKeys();
} finally {
//计算IO请求的耗时
final long ioTime = System.nanoTime() - ioStartTime;
//执行task。判断执行task任务时间是否超过配置的比例,如果超过则停止执行task
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
重建Selector
的方法如下:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//创建一个新的Selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
//将原Selector上注册的所有SelectionKey转移到新的Selector
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
//用新的Selector替换旧的
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
//关闭旧的Selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
处理IO请求的是由processSelectedKey
完成的,它的实现如下:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略代码 ......
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 首先获取 Channel 的 NioUnsafe,所有的读写等操作都在 Channel 的 unsafe 类中操作。
- 获取 SelectionKey 就绪事件,如果是 OP_CONNECT,则说明已经连接成功,并把注册的 OP_CONNECT 事件取消。
- 如果是 OP_WRITE 事件,说明可以继续向 Channel 中写入数据,当写完数据后用户自己吧 OP_WRITE 事件取消掉。
- 如果是 OP_READ 或 OP_ACCEPT 事件,则调用 unsafe.read() 进行读取数据。unsafe.read() 中会调用到 ChannelPipeline 进行读取数据。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
// 省略代码 ......
// 获取 Channel 对应的 ChannelPipeline
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
// 省略代码 ......
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 委托给 pipeline 中的 Handler 进行读取数据
pipeline.fireChannelRead(readBuf.get(i));
}
当 NioEventLoop 读取数据的时候会委托给 Channel 中的 unsafe 对象进行读取数据。
Unsafe中真正读取数据是交由 ChannelPipeline 来处理。
ChannelPipeline 中是注册的我们自定义的 Handler,然后由 ChannelPipeline中的 Handler 一个接一个的处理请求的数据。
作者:jijs
链接:https://www.jianshu.com/p/9e5e45a23309
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK