6

Netty之EventLoop

 3 years ago
source link: https://1fishman.github.io/2019/06/06/Netty%E4%B9%8BEventLoop/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

EventLoop

Netty线程模型是被精心设计的,提升了框架的并发性能,并且在很大程度上避免锁.下面来讲一下Netty中的线程模型和它的EventLoop.

一说到线程模型,很容易能够想到Reactor线程模型.Reactor又有单线程模型,多线程模型和主从多线程模型.

单线程模型

单线程模型,显而易见就是只有一个线程,即所有的I/O操作都在同一个NIO线程模型.对于此I/O模型的职责则有很多:

  • 作为服务端,接受TCP连接
  • 作为客户端,发起TCP连接
  • 读取通信对端的请求和应答消息
  • 向通信对端发送请求或应答消息

这种模型由于只有一个线程处理I/O,因此当并发量提高之后,一个线程很难处理那么多的请求,系统的吞吐量将变的很低.这个时候就演进出了Reactor多线程模型.使用多个线程来处理请求

多线程模型

Reactor多线程模型和单线程模型的最大的区别就是有一组I/O线程来处理I/O请求.多线程模型的特点如下:

  • 有专门一个线程用来监听服务端,接受客户端的TCP连接请求
  • 网络I/O操作读写等由一个NIO线程池负责.通过很多的NIO线程能够处理高并发的网络读写请求
  • 一个NIO线程可以同时处理N条连接,但是一个连接只对应与一个NIO线程,防止发生并发操作问题.

这个样子很好的解决了一般并发的问题.但是当如果一个时候有数百万个连接过来,或者服务端需要对客户端握手进行安全验证,但是验证本身非常损耗性能.这个时候,一个Acceptor线程会存在性能不足的问题,这个时候就又出现了第三种Reactor线程模型,主从多线程模型.

主从Reactor多线程模型

此线程模型的特点就是服务端接受客户端连接的不在是一个单独的NIO线程,而是一个独立的NIO线程池.Acceptor接受到客户端TCP连接请求后,将新创建的SocketChannel注册到I/O线程池中的某个线程上.接下来又此线程来处理网络读写问题.

Netty的线程模型

Netty的线程模型并不是一成不变的.它实际拒绝域用户的启动参数配置.通过设置不同的启动参数,Netty可以同事支持Reactor单线程模型,多线程模型和主从多线程模型.
比如下面三种方式:

单线程模式

EventLoopGroup work = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());

只有一个线程,并且没有线程池,设置EventLoopGroup的线程数为1.将上面代码改成如下形式:

Reactor多线程模式

EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());

主从Reactor多线程模式

EventLoopGroup boss = new NioEventLoopGroup(8);
EventLoopGroup work = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new StringEncoder());

只需要将BossGroup的参数设置为多个就可以了. 里面的参数就是用几个线程来处理.

一般来说,都使用主从Reactor多线程模型.主要分为两个线程组,如上所示:boss线程组主要用来接受连接,然后注册到work线程池上的某个线程上.work线程主要用来处理读写请求,并且执行系统调用Task和定时任务.如心跳检测.

NioEventLoop源码分析

Netty中的NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O操作外,还兼顾处理定时任务和系统任务.在Netty中有很多系统Task任务,主要就是为了在I/O线程和用户线程同事操作网络资源的时候,为了防止并发操作导致锁的竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行.这样实现了局部无锁化. 说完了EventLoop的功能,来看一下它的实现,这里NioEventLoop是它的一个子类,也是最常用的,因此主要分析一下此类.

先来看一下构造方法

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类方法初始化线程池,和回绝策略
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 这里打开一个Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

Selector

作为Nio的多路复用线程,必须得有一个多路复用器,先来看一下它的多路复用器.

// 经过Netty包装过的Selector
private Selector selector;
// 没有包装过的selector
private Selector unwrappedSelector;
private final SelectorProvider provider;

Selector的初始化特别简单,就是直接调用Selector.open()创建一个新的Selector.而Netty对Selector的selectedKeys进行了优化,可以通过io.netty.noKeySetOptimization开关来决定是否启用优化,默认不打开selectedKeys优化功能. 下面来看一下代码:

private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 一个为包装的Selector,通过provider打开一个Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

// 如果没有开启优化,则直接返回一个SelectorTuple对象
// 这个SelectorTuple就是未包装的Selector和包装的Selector的集合.
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 省略部分代码 这里就是将Netty自己的selectedKeys将JDk的selectedKeys替换掉.
// 主要是通过反射来从Selector实例中获取selectedKeys和publicSelectedKeys,将上述
// 两个变量设置为可写,在通过反射替换掉.

}

上面就是Selector的初始化. 初始化Selector完成后就应该

run()方法

在EventLoop中,所有的逻辑操作都在for循环体中执行,只有当NioEventLoop接受到退出指令的时候,才会退出循环.

下面看一方法实现

protected void run{
try {
//通过 select/selectNow 调用查询当前是否有就绪的 IO 事件
// 当 selectStrategy.calculateStrategy() 返回的是 CONTINUE, 就结束此轮循环,进入下一轮循环;
// 当返回的是 SELECT, 就表示任务队列为空,就调用select(Boolean);
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())){
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 调用Select方法
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
// 如果发生异常就重建Selector并且处理异常
rebuildSelector0();
handleLoopException(e);
continue;
}
// 省略部分代码,待下面分析
}

首先向将wakenUp设置为false,之后在调用select()方法,并且将之前的wakenUp作为参数传进去.
来看一下select方法

private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 设置选择次数为0
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 设置select执行的终止时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 设置超时时间
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时时间到了,并且还没有选择过,就立即选择一次,这里selectNow()会立即触发Selector的选择操作,如果有准备就绪的Channel,则返回就绪的Channel的集合,否则返回0.
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果有任务,并且wakenUp之前是false,则立即调用一次选择方法
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 这里调用带超时的select()方法
int selectedKeys = selector.select(timeoutMillis);
// 循环中每次选择都会将selectCnt加1.
selectCnt ++;
// 如果有事件发生或odlWakeup为true或者线程被系统唤醒或者有任务都会直接跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
// 如果循环次数过多,超过了默认次数,则说明出现了JDK的空轮询BUG.
// 需要重建Selector.
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}

这个selector的方法的就是选择出已经发生的事件.具体逻辑就是:

首先计算下一个将要触发定时任务的剩余超时时间,转换为毫秒,为超时时间增加0.5毫秒的调整值.对剩余的超时时间进行判断,如果需要立即执行或者已经超时,则调用selector.selectNow()进行轮询操作,将selectCnt设置为1.并且退出循环.

只有将定时任务剩余的超时时间作为参数进行select操作,没完成一次select操作就对selectCnt加1.

Select操作完成之后对结果进行判断,如果出现一下集中情况就退出循环:

  • 有Channel处于就绪状态,就是selectedKeys不为0,说明有读写事件需要处理
  • oldWakenUp为true
  • 系统或者用户调用了wakeup操作,唤醒当前的多路复用器
  • 系统队列中有新的任务需要处理.

如果本次selector的轮询结果为空,也没有wakeup操作或者是新的消息需要处理,则说明是个空轮询.将计数器加1. 这里会有一个判断,就是如果在一段时间内空轮询次数过多,则说明导致了一个JDk的空轮询bug,这里解决办法就是调用selectRebuildSelector()方法,来重建Selector.

看一下selectRebuildSelector()方法

private Selector selectRebuildSelector(int selectCnt) throws IOException {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 这里执行重建操作
rebuildSelector();
Selector selector = this.selector;

// 重建完成后立即选择一次
selector.selectNow();
return selector;
}

private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}

重建的逻辑也特别简单,首先新打开一个Selector,接着在将原来的Selector中的Channel取消注册,并且注册到新的Selector中,

在继续来看run方法中的循环

protected void run() {
cancelledKeys = 0;
needsToSelectAgain = false;
//ioRatio表示:此线程分配给IO操作所占的时间比(即运行processSelectedKeys耗时在整个循环中所占用的时间).
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//查询就绪的 IO 事件, 然后处理它;
processSelectedKeys();
} finally {
//运行 taskQueue 中的任务.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//查询就绪的 IO 事件, 然后处理它;
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
看到在run的方法中,执行完上面的select方法逻辑后,会开始执行I/O事件和Task.这里有一个ioRatio值,这个值就代表着线程分配给IO操作所占的时间比,例如ioRatio值为50,则IO执行的时间和执行task的时间为1:1.

在来看一下processSelectedKeys()方法
~~~java
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

这里有两种情况,一种是有I/O事件发生和没有I/O事件发生.重点来看一下有事件发生的时候也就是processSelectedKeysOptimized()方法;

private void processSelectedKeysOptimized() {
// 对每个准备好的I/O通道进行处理
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

首先是获取到SelectionKey上附加的对象,这个对象是在注册通道的时候将自己附加上去的.也就是获取了一个NioChannel.之后继续执行procesSelectedKey()方法

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取到unsafe实例
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 判断此键是否无效,如果是无效的,则使用unsafe来关闭连接.
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 获取准备好的事件
int readyOps = k.readyOps();
// 如果是连接事件,将连接事件取消,表明已经简历好连接,
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 调用finishConnect方法传播连接完成事件
unsafe.finishConnect();
}
// 如果是可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 直接调用强制刷新函数来写数据到通道中
ch.unsafe().forceFlush();
}
// 如果是读事件或者接受连接事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 调用读方法,来处理
unsafe.read();
}
// 发生异常就关闭通道.
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

具体的逻辑,注释也写的很清楚了,这里不在细说了.大致就是处理应有的事件.在Netty中的事件都是通过Channel的内部类unsafe实现的.这里就调用了unsafe的不同的方法来处理不同的事件.对于unsafe的处理这里就不多说了,可以看我之前的博文对于unsafe的讲解.

到这里,NioEventLoop就说完了.说白了,这个EventLoop就是通过在一个循环中不断检测是否有事件发生,如果有I/O事件发生或者有task在队列中,就去执行任务. 并且在循环等待I/O事件发生时候,会记录空轮询的次数,如果在一定的时间内,空轮询的次数超过一定限制,则会从新建立一个Selector,并且把之前的Selector中的Channel重新注册到新的Selector上.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK