1

游戏服务器中的Netty应用以及源码浅析

 2 years ago
source link: https://gameinstitute.qq.com/community/detail/133719
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

最近因为工作需要,学习了一段时间Netty的源码,并做了一个简单的分享,研究还不是特别深入,继续努力。因为分享也不涉及公司业务,所以这里也把这次对源码的研究成果分享出来
以下都是在游戏服务器开发中针对Netty使用需要了解知识点以及相关优化

这次分享主要设计以下内容 

1.Netty线程模型 
2.Netty对TCP相关参数的配置和具体含义 
3.Netty对Epoll的封装 
4.Netty的优雅关闭

一、Reactor模式和Netty线程模型

客户端连接数的限制

  • CPU资源
  • 端口号资源cat /proc/sys/net/ipv4/ip_local_port_range
  • 文件描述符资源系统级:当前系统可打开的最大数量,通过 cat /proc/sys/fs/file-max 查看 用户级:指定用户可打开的最大数量,通过 cat /etc/security/limits.conf 查看 进程级:单个进程可打开的最大数量,通过 cat /proc/sys/fs/nr_open 查看
  • 线程资源
    BIO/NIO

1. BIO模型

  • 所有操作都是同步阻塞(accept,read)
  • 客户端连接数与服务器线程数比例是1:1
XCIg8JryAbvSfW3x5gdi.png

2. NIO模型

  • 非阻塞IO
  • 通过selector实现可以一个线程管理多个连接
  • 通过selector的事件注册(OP_READ/OP_WRITE/OP_CONNECT/OP_ACCEPT),处理自己感兴趣的事件
  • 客户端连接数与服务器线程数比例是n:1
UoE2OadQPMJuNzJBJ4E2.png

3. Reacor模型

①. 单Reacor单线程模型

  •     所有IO在同一个NIO线程完成(处理连接,分派请求,编码,解码,逻辑运算,发送)

优点

  • 不存在共享资源竞争

缺点

  • 单线程处理大量链路时,性能无法支撑,不能合理利用多核处理
  • 线程过载后,处理速度变慢,会导致消息积压
  • 一旦线程挂掉,整个通信层不可用
    redis使用的就是reactor单进程模型,redis由于都是内存级操作,所以使用此模式没什么问题

reactor单线程模型图

WitROqcyR9eqYUMMTAGs.png

netty reactor单线程模型图

6zscJvE0U2oMDxHP2R71.png

// Netty对应实现方式:创建io线程组是,boss和worker,使用同一个线程组,并且线程数为1

EventLoopGroupioGroup = newNioEventLoopGroup(1);

b.group(ioGroup, ioGroup)

        .channel(NioServerSocketChannel.class)

        .childHandler(initializer);

ChannelFuturef = b.bind(portNumner);

cf = f.sync();

f.get();

②. 单Reactor多线程模型

根据单线程模型,io处理中最耗时的编码,解码,逻辑运算等cpu消耗较多的部分,可提取出来使用多线程实现,并充分利用多核cpu的优势

  • 优点
    • 多线程处理逻辑运算,提高多核CPU利用率
  • 缺点
    • 对于单Reactor来说,大量链接的IO事件处理依然是性能瓶颈
  • reactor多线程模型图
caEoYL6V4n28vXEYNOMe.png
  • netty reactor多线程模型图
nmuJqhTP2SCAtSexadw7.png

// Netty对应实现方式:创建io线程组是,boss和worker,使用同一个线程组,并且线程数为1,把逻辑运算部分投递到用户自定义线程处理

EventLoopGroupioGroup = newNioEventLoopGroup(1);

b.group(ioGroup, ioGroup)

        .channel(NioServerSocketChannel.class)

        .childHandler(initializer);

ChannelFuturef = b.bind(portNumner);

cf = f.sync();

f.get();

③. 主从Reactor多线程模型

根据多线程模型,可把它的性能瓶颈做进一步优化,即把reactor由单个改为reactor线程池,把原来的reactor分为mainReactor和subReactor

优点

  • 解决单Reactor的性能瓶颈问题(Netty/Nginx采用这种设计)

reactor主从多线程模型图

Iee2S3fDWJYq0KQFTqSD.png

netty reactor主从多线程模型图

EezpIOwQUQV1rEpEnpzQ.png

// Netty对应实现方式:创建io线程组boss和worker,boss线程数为1,work线程数为cpu*2(一般IO密集可设置为2倍cpu核数)

EventLoopGroupbossGroup = newNioEventLoopGroup(1);

EventLoopGroupworkerGroup = newNioEventLoopGroup();

b.group(bossGroup, workerGroup)

        .channel(NioServerSocketChannel.class)

        .childHandler(initializer);

ChannelFuturef = b.bind(portNumner);

cf = f.sync();

f.get();

④. 部分源码分析

1.创建group实例

// 1.构造参数不传或传0,默认取系统参数配置,没有参数配置,取CPU核数*2

super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

privatestaticfinalintDEFAULT_EVENT_LOOP_THREADS;

static {

    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

}

// 2.不同版本的JDK会有不同版本的SelectorProvider实现,Windows下的是WindowsSelectorProvider

publicNioEventLoopGroup(int nThreads, Executor executor) {

    //默认selector,最终实现类似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java

    //basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default

    this(nThreads, executor, SelectorProvider.provider());

}

// 3.创建nThread个EventExecutor,并封装到选择器chooser,chooser会根据线程数分别有两种实现(GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser,算法不同,但实现逻辑一样,就是均匀的分配线程处理)

EventExecutorChooserFactory.EventExecutorChooserchooser;

children = newEventExecutor[nThreads];

for (inti = 0; i < nThreads; i ++) {

    // ...

    children[i] = newChild(executor, args);

    // ...

}

chooser = chooserFactory.newChooser(children);

2.设置group

// 两种方式设置group

// parent和child使用同一个group,调用仍然是分别设置parent和child

@Override

publicServerBootstrapgroup(EventLoopGroup group) {

    returngroup(group, group);

}

ServerBootstrap.group(EventLoopGroup parentGroup, EventLoopGroup childGroup){

    // 具体代码略,可直接参考源码

    // 里面实现内容是把parentGroup绑定到this.group,把childGroup绑定到this.childGroup

}

3.Netty启动

// 调用顺序

ServerBootstrap:bind() ->doBind() ->initAndRegister()

privateChannelFuturedoBind(finalSocketAddress localAddress) {

    finalChannelFutureregFuture = initAndRegister();

    // ...

    doBind0(regFuture, channel, localAddress, promise);

    // ...

}

finalChannelFutureinitAndRegister() {

    // 创建ServerSocketChannel

    Channelchannel = channelFactory.newChannel();

    // ...

    // 开始register

    ChannelFutureregFuture = config().group().register(channel);

    // register调用顺序

    // next().register(channel) -> (EventLoop) super.next() -> chooser.next()

    // ...

}

由以上源码可得知,bind只在起服调用一次,因此bossGroup仅调用一次regist,也就是仅调用一次next,因此只有一根线程是有用的,其余线程都是废弃的,所以bossGroup线程数设置为1即可

// 启动BossGroup线程并绑定本地SocketAddress

privatestaticvoiddoBind0(

        finalChannelFuture regFuture, finalChannel channel,

        finalSocketAddress localAddress, finalChannelPromise promise) {

    channel.eventLoop().execute(newRunnable() {

        @Override

        publicvoidrun() {

            if (regFuture.isSuccess()) {

                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

             } else {

                promise.setFailure(regFuture.cause());

             }

         }

     });

}

4.客户端连接

// 消息事件读取

NioEventLoop.run() ->processSelectedKeys() -> ... ->ServerBootstrapAcceptor.channelRead

// ServerBootstrapAcceptor.channelRead处理客户端连接事件

// 最后一行的childGroup.register的逻辑和上面的代码调用处一样

publicvoidchannelRead(ChannelHandlerContext ctx, Object msg) {

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    setAttributes(child, childAttrs);

    childGroup.register(child)

}

二、select/poll和epoll

  • select(时间复杂度O(n)):用一个fd数组保存所有的socket,然后通过死循环遍历调用操作系统的select方法找到就绪的fd
  • while(1) {  nready = select(list);  // 用户层依然要遍历,只不过少了很多无效的系统调用  for(fd <-- fdlist) {    if(fd != -1) {      // 只读已就绪的文件描述符      read(fd, buf);      // 总共只有 nready 个已就绪描述符,不用过多遍历      if(--nready == 0) break;     }   } }
  • poll(时间复杂度O(n)):同select,不过把fd数组换成了fd链表,去掉了fd最大连接数(1024个)的数量限制
  • epoll(时间复杂度O(1)):解决了select/poll的几个缺陷
    • 调用需传入整个fd数组或fd链表,需要拷贝数据到内核
    • 内核层需要遍历检查文件描述符的就绪状态
    • 内核仅返回可读文件描述符个数,用户仍需自己遍历所有fd
  • epoll是操作系统基于事件关联fd,做了以下优化:
    • 内核中保存一份文件描述符集合,无需用户每次都重新传入,只需告诉内核修改的部分即可。(epoll_ctl)
    • 内核不再通过轮询的方式找到就绪的文件描述符,而是通过异步 IO 事件唤醒。(epoll_wait)
    • 内核仅会将有 IO 事件的文件描述符返回给用户,用户也无需遍历整个文件描述符集合。

epoll仅在Linux系统上支持

2.jdk提供selector

// DefaultSelectorProvider.create方法在不同版本的jdk下有不同实现,创建不同Selector

// Windows版本的jdk,其实现中调用的是native的poll方法

publicstaticSelectorProvidercreate() {

    returnnewWindowsSelectorProvider();

}

// Linux版本的jdk

publicstaticSelectorProvidercreate() {

    Stringstr = (String)AccessController.doPrivileged(newGetPropertyAction("os.name"));

    if (str.equals("SunOS")) {

        returncreateProvider("sun.nio.ch.DevPollSelectorProvider");

     }

    if (str.equals("Linux")) {

        returncreateProvider("sun.nio.ch.EPollSelectorProvider");

     }

    returnnewPollSelectorProvider();

}

3.Netty提供的Epoll封装

netty依然基于epoll做了一层封装,主要做了以下事情:

(1)java的nio默认使用水平触发,Netty的Epoll默认使用边缘触发,且可配置

  • 边缘触发:当状态变化时才会发生io事件。
  • 水平触发:只要满足条件,就触发一个事件(只要有数据没有被获取,内核就不断通知你)

(2)Netty的Epoll提供更多的nio的可配参数。
(3)调用c代码,更少gc,更少synchronized
具体可以参考源码NioEventLoop.run和EpollEventLoop.run进行对比

4.Netty相关类图

  • 线程组类图
BLd4Pnr3gJw1bUfwHTbW.png
  • channel类图
nhyvxb50CePsDapeQAii.png

5.配置Netty为EpollEventLoop

// 创建指定的EventLoopGroup

bossGroup = newEpollEventLoopGroup(1, newDefaultThreadFactory("BOSS_LOOP"));

workerGroup = newEpollEventLoopGroup(32, newDefaultThreadFactory("IO_LOOP"));

b.group(bossGroup, workerGroup)

        // 指定channel的class

        .channel(EpollServerSocketChannel.class)

        .childHandler(initializer);

// 其中channel(clz)方法是通过class来new一个反射ServerSocketChannel创建工厂类

publicBchannel(Class<? extends C> channelClass) {

    if (channelClass == null) {

        thrownewNullPointerException("channelClass");

     }

    returnchannelFactory(newReflectiveChannelFactory<C>(channelClass));

}

finalChannelFutureinitAndRegister() {

    // ...

    Channelchannel = channelFactory.newChannel();

    // ...

}

三、Netty相关参数

1.SO_KEEPALIVE

childOption(ChannelOption.SO_KEEPALIVE, true)

TCP链路探活

2.SO_REUSEADDR

option(ChannelOption.SO_REUSEADDR, true)

重用处于TIME_WAIT但是未完全关闭的socket地址,让端口释放后可立即被重用。默认关闭,需要手动开启

3.TCP_NODELAY

childOption(ChannelOption.TCP_NODELAY, true)

IP报文格式

wwuoLsTjz6m9FU0oXGBH.png

TCP报文格式

GDU5qbULve7Jffpek4nu.png

开启则禁用TCP Negal算法,优点低延时,缺点在大量小数据包的情况下,网络利用率低

关闭则开启TCP Negal算法,优点提高网络利用率(数据缓存到一定量才发送),缺点延时高

Negal算法

  1. 如果包长度达到MSS(maximum segment size最大分段长度),则允许发送;
  2. 如果该包含有FIN,则允许发送;
  3. 设置了TCP_NODELAY选项,则允许发送;
  4. 未设置TCP_CORK选项(是否阻塞不完整报文)时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
  5. 上述条件都未满足,但发生了超时(一般为200ms),则立即发送。

MSS计算规则
MSS的值是在TCP三次握手建立连接的过程中,经通信双方协商确定的
802.3标准里,规定了一个以太帧的数据部分(Payload)的最大长度是1500个字节(MTU)

  MSS = MTU - IP首部 - TCP首部

  以太网环境下:

    MTU = 1500字节

  IP首部 = 32*5/4 = 160bit = 20字节

  TCP首部 = 32*5/4 = 160bit = 20字节

  最终得出MSS = 1460字节

结论:因为游戏服务器的实时性要求,在网络带宽足够的情况下,建议开启TCP_NODELAY,关闭Negal算法,带宽可以浪费,响应必须及时

注意:需要客户端服务器均关闭Negal算法,否则仍然会有延迟发送,影响传输速度

4.SO_BACKLOG

option(ChannelOption.SO_BACKLOG, 100)

操作系统内核中维护的两个队列

  • syns queue:保存syn到达,但没完成三次握手的半连接

cat /proc/sys/net/ipv4/tcp_max_syn_backlog

  • accpet queue:保存完成三次握手,内核等待accept调用的连接

cat /proc/sys/net/core/somaxconn

netty对于backlog的默认值设置在NetUtil类253行

 SOMAXCONN = AccessController.doPrivileged(newPrivilegedAction<Integer>() {

    @Override

    publicIntegerrun() {

        // 1.设置默认值

        intsomaxconn = PlatformDependent.isWindows() ?200:128;

        Filefile = newFile("/proc/sys/net/core/somaxconn");

        if (file.exists()) {

            // 2.文件存在,读取操作系统配置

            in = newBufferedReader(newFileReader(file));

            somaxconn = Integer.parseInt(in.readLine());

         } else {

            // 3.文件不存在,从各个参数中读取

            if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {

                tmp = sysctlGetInt("kern.ipc.somaxconn");

                if (tmp == null) {

                    tmp = sysctlGetInt("kern.ipc.soacceptqueue");

                    if (tmp != null) {

                        somaxconn = tmp;

                     }

                 } else {

                    somaxconn = tmp;

                 }

             }

         }

     }

}

结论:Linux下/proc/sys/net/core/somaxconn一定存在,所以backlog一定取得它的值,我参考prod机器的参数配置的65535,也就是不设置backlog的情况下,服务器运行缓存65535个全连接

5.ALLOCATOR和RCVBUF_ALLOCATOR

rw2KJKBQakl19w8h1hD5.png

默认分配ByteBuffAllocator赋值如下:
ByteBufUtil.java

static {

    //以io.netty.allocator.type为准,没有的话,安卓平台用非池化实现,其他用池化实现

    StringallocType = SystemPropertyUtil.get(

            "io.netty.allocator.type", PlatformDependent.isAndroid() ?"unpooled":"pooled");

    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocatoralloc;

    if ("unpooled".equals(allocType)) {

        alloc = UnpooledByteBufAllocator.DEFAULT;

        logger.debug("-Dio.netty.allocator.type: { }", allocType);

     } elseif ("pooled".equals(allocType)) {

        alloc = PooledByteBufAllocator.DEFAULT;

        logger.debug("-Dio.netty.allocator.type: { }", allocType);

     } else {

        //io.netty.allocator.type设置的不是"unpooled"或者"pooled",就用池化实现。

        alloc = PooledByteBufAllocator.DEFAULT;

        logger.debug("-Dio.netty.allocator.type: pooled (unknown: { })", allocType);

     }

    DEFAULT_ALLOCATOR = alloc;

}

RCVBUF_ALLOCATOR默认AdaptiveRecvByteBufAllocator

public class DefaultChannelConfigimplementsChannelConfig {

    // ...

    publicDefaultChannelConfig(Channelchannel) {

        this(channel, newAdaptiveRecvByteBufAllocator());

     }

    // ...

}

四、Netty相关参数

/** 
* Shortcut method for { @link #shutdownGracefully(long, long, TimeUnit) } with sensible default values. 

* @return the { @link #terminationFuture() } 
*/ 
Future<?> shutdownGracefully(); 

/** 
* Signals this executor that the caller wants the executor to be shut down.  Once this method is called, 
* { @link #isShuttingDown() } starts to return { @code true }, and the executor prepares to shut itself down. 
* Unlike { @link #shutdown() }, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i> 
* (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period, 
* it is guaranteed to be accepted and the quiet period will start over. 

* @param quietPeriod the quiet period as described in the documentation 
                    静默期:在此期间,仍然可以提交任务 
* @param timeout     the maximum amount of time to wait until the executor is { @linkplain #shutdown() } 
*                    regardless if a task was submitted during the quiet period 
                    超时时间:等待所有任务执行完的最大时间 
* @param unit        the unit of { @code quietPeriod } and { @code timeout } 

* @return the { @link #terminationFuture() } 
*/ 
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); 

// 抽象类中的实现 
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; 
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; 
@Override 
public Future<?> shutdownGracefully() {  
   return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); 
}

1.  把NIO线程的状态位设置成ST_SHUTTING_DOWN状态,不再处理新的消息(不允许再对外发送消息); 
2.  退出前的预处理操作:把发送队列中尚未发送或者正在发送的消息发送完、把已经到期或者在退出超时之前到期的定时任务执行完成、把用户注册到NIO线程的退出Hook任务执行完成; 
3.  资源的释放操作:所有Channel的释放、多路复用器的去注册和关闭、所有队列和定时任务的清空取消,最后是NIO线程的退出。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK