21

搭建生产级的Netty项目

 4 years ago
source link: http://66yr.cn/2020/04/01/netty/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Netty是Trustin Lee在2004年开发的一款高性能的网络应用程序框架。相比于JDK自带的NIO,Netty做了相当多的增强,且隔离了jdk nio的实现细节,API也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了Netty,比如Dubbo、Elasticsearch、Zookeeper等。

Netty的具体开发

提示:因代码相对较多,这里只展示其主要部分,至于项目中用到的编解码器、工具类,请直接拉到最后下载源码!也欢迎顺手给个Star~

需要的依赖

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-jmx</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29.Final</version>
</dependency>

Client端代码

package com.example.nettydemo.client;

import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException;

public class Client {

    public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);

        //客户端连接服务器最大允许时间,默认为30s
        bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s

        NioEventLoopGroup group = new NioEventLoopGroup();
        try {

            bootstrap.group(group);

            RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
            LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);

            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast(new FrameDecoder());
                    pipeline.addLast(new FrameEncoder());

                    pipeline.addLast(new ProtocolEncoder());
                    pipeline.addLast(new ProtocolDecoder());

                    pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
                    pipeline.addLast(new OperationToRequestMessageEncoder());

//                    pipeline.addLast(loggingHandler);

                }
            });

            //连接服务
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
            //因为future是异步执行,所以需要先连接上后,再进行下一步操作
            channelFuture.sync();

            long streamId = IdUtil.nextId();
            /**
             * 发送数据测试,按照定义的规则组装数据
             */
//            OrderOperation orderOperation =  new OrderOperation(1001, "你好啊,hi");
            RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));

            //将future放入center
            OperationResultFuture operationResultFuture = new OperationResultFuture();
            requestPendingCenter.add(streamId, operationResultFuture);

            //发送消息
            for (int i = 0; i < 10; i++) {
                channelFuture.channel().writeAndFlush(requestMessage);
            }

            //阻塞等待结果,结果来了之后会调用ResponseDispatcherHandler去set结果
//            OperationResult operationResult = operationResultFuture.get();
//            //将结果打印
//            System.out.println("返回:"+operationResult);

            channelFuture.channel().closeFuture().get();

        } finally {
            group.shutdownGracefully();
        }

    }

}

Server端代码

package com.example.nettydemo.server;

import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j;

import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException;

/**
 * netty server 入口
 */
@Slf4j
public class Server {


    public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //设置channel模式,因为是server所以使用NioServerSocketChannel
        serverBootstrap.channel(NioServerSocketChannel.class);

        //最大的等待连接数量
        serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
        //设置是否启用 Nagle 算法:用将小的碎片数据连接成更大的报文 来提高发送效率。
        //如果需要发送一些较小的报文,则需要禁用该算法
        serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);

        //设置netty自带的log,并设置级别
        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

        //thread
        //用户指定线程名
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
        NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
        UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));

        //只能使用一个线程,因GlobalTrafficShapingHandler比较轻量级
        NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));

        try {
            //设置react方式
            serverBootstrap.group(bossGroup, workGroup);

            //metrics
            MetricsHandler metricsHandler = new MetricsHandler();

            //trafficShaping流量整形
            //long writeLimit 写入时控制, long readLimit 读取时控制 具体设置看业务修改
            GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);


            //log
            LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
            LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);

            //设置childHandler,按执行顺序放
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {

                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast("debugLog", debugLogHandler);
                    pipeline.addLast("tsHandler", globalTrafficShapingHandler);
                    pipeline.addLast("metricHandler", metricsHandler);
                    pipeline.addLast("idleHandler", new ServerIdleCheckHandler());

                    pipeline.addLast("frameDecoder", new FrameDecoder());
                    pipeline.addLast("frameEncoder", new FrameEncoder());
                    pipeline.addLast("protocolDecoder", new ProtocolDecoder());
                    pipeline.addLast("protocolEncoder", new ProtocolEncoder());

                    pipeline.addLast("infoLog", infoLogHandler);
                    //对flush增强,减少flush次数牺牲延迟增强吞吐量
                    pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
                    //为业务处理指定单独的线程池
                    pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
                }
            });

            //绑定端口并阻塞启动
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();

            channelFuture.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            businessGroup.shutdownGracefully();
            eventLoopGroupForTrafficShaping.shutdownGracefully();
        }

    }

}

最后

以上介绍了Netty的基本用法,在代码中也做了一部分的关键注释,但可能还会有许多不足,也不可能满足所有人的要求,大家可根据自己的实际需求去改造此项目。附上源码地址 netty源码


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK