20

Netty 系列笔记之开篇

 3 years ago
source link: https://aysaml.com/articles/2020/09/21/1600675546715.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

100

一、引言

❀ 众所周知:

Netty 是一款基于 NIO 客户、服务器端的 Java 开源编程框架,提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

❀ 通俗来讲:

Netty 一个非常好用的处理 Socket 的 Jar 包,可以用它来开发服务器和客户端。

二、为什么要学习 Netty

Netty 作为一个优秀的网络通信框架,许多开源项目都使用它来构建通信层。比如 Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper甚至我们常用的 Spring 等等。

更重要的是, Netty 是开发高性能 Java 服务器的必学框架。

可以说作为一个 Java 工程师,要了解 Java 服务器的高阶知识,Netty 是一个必须要学习的东西。

三、Netty 的特性

1、设计

  • 为不同的传输类型(阻塞和非阻塞)提供统一的 API
  • 基于灵活且可扩展的事件模型,可将关注点明确分离
  • 高度可定制的线程模型:单线程、一个或多个线程池
  • 可靠的无连接数据 Socket 支持(UDP)

2、易用

  • 完善的 JavaDoc ,用户指南和样例
  • 无需额外依赖,JDK 5 (Netty 3.x) 、JDK 6 (Netty 4.x)

3、性能

  • 更高的吞吐量,更低的延迟
  • 更省资源
  • 减少不必要的内存拷贝

4、安全

  • 完整的 SSL/TLS 和 STARTTLS 的支持

5、社区

  • 活跃的社区和众多的开源贡献者

四、初识 Netty

Talk is cheap, show me the code!

1、丢弃服务器

接下来从代码中感受一下 Netty,首先实现一个 discard(丢弃)服务器,即对收到的数据不做任何处理。

  • 实现 ChannelInBoundHandlerAdapter

    首先我们从 handler 的实现开始, Netty 使用 handler 来处理 I/O 事件。

    public class DiscardServerHandler extends ChannelInboundHandlerAdapter { 
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
          // 丢弃收到的数据
          ((ByteBuf) msg).release();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
          cause.printStackTrace();
          ctx.close();
        }
    }
    • 1 行,DiscardServerHandler 继承自 ChannelInboundHandlerAdapter ,这个类实现了 ChannelInboundHandler 接口,ChannelInboundHandler 提供了许多事件处理的接口方法。
    • 4 行,当收到新的消息时,就会调用 chanelRead() 方法。
    • 6 行,ByteBuf 是一个引用计数对象,这个对象必须显式地调用 release() 方法来释放。处理器的职责是释放所有传递到处理器的引用计数对象,下面是比较常见的 chanelRead() 方法实现:
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
      }
    • 10 行,exceptionCaught() 方法是在处理事件时发生异常调用的方法。
  • 启动 Handler

    实现 handler 后,我们需要一个 main() 方法来启动它。

    public class DiscardServer {
    
      private int port;
    
      public DiscardServer(int port) {
          this.port = port;
      }
    
      public void run() throws Exception {
          // 接收进来的连接
          EventLoopGroup boss = new NioEventLoopGroup();
          // 处理已经接收的连接
          EventLoopGroup worker = new NioEventLoopGroup();
          try {
              ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {
                      // 添加自定义的 handler
                      socketChannel.pipeline().addLast(new DiscardServerHandler());
                  }
              }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
              // 绑定端口,开始接收进来的连接
              ChannelFuture channelFuture = bootstrap.bind(port).sync();
              // 关闭
              channelFuture.channel().closeFuture().sync();
          } finally {
              boss.shutdownGracefully();
              worker.shutdownGracefully();
          }
      }
    
      public static void main(String[] args) throws Exception {
          int port = 8080;
          new DiscardServer(port).run();
      }
    }
    • 11 行, EventLoopGroup 是用来处理 I/O 操作的多线程事件循环器,Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在本例我们实现了一个服务端应用,因此需要两个 EventLoopGroup 。第一个用来接收进来的连接,常被称作 boss ;第二个用来处理已经接收的连接,成为 worker。一旦 boss 接收到一个新进来的连接,就会把连接的信息注册到 worker 上面。
    • 15 行, ServerBootstrap 是一个启动 NIO 服务的辅助启动类。
    • 16 行,指定 NioServerSocketChannel 用来说明一个新的 Channel 如何接收进来的连接。
    • 20 行, ChannelInitializer 用来帮助使用者创建一个新的 channel ,同时可以使用 pipline 指定一些特定的处理器。
    • 22 行,通过这两个方法可以指定新配置的 channel 的一些参数配置。
  • 查看接收到的数据

    如此,一个基于 Netty 的服务端程序就完成了,但是现在启动起来我们看不到任何交互,所以我们稍微修改一下 DiscardServerHandler 类的 channelRead() 方法,可以查看到客户端发来的消息。

    @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          ByteBuf byteBuf = (ByteBuf) msg;
          try {
              while (byteBuf.isReadable()) {
                  System.out.print((char) byteBuf.readByte());
                  System.out.flush();
              }
          } finally {
              ReferenceCountUtil.release(msg);
          }
      }
  • 测试

    接下来我们启动 DiscardServer ,使用 telnet 来测试一下。

    100

    控制台接收到了命令行发来的消息:

    100

2、应答服务器

我们已经实现了服务器可以接收客户端发来的消息,通常服务器会对客户端发来的请求作出回应,下面就通过 ECHO 协议来实现对客户端的消息响应。

ECHO 协议即会把客户端发来的数据原样返回,所以也戏称“乒乓球”协议。

在上述代码的基础上面,我们只需对 DiscardServerHandler 类的 channelRead() 方法稍加修改:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);
        ctx.flush();
}
  • ChannelHandlerContext 对象提供了许多操作,使你能够触发各种各样的 I/O 事件和操作。这里我们调用了 write(Object) 方法来逐字地把接受到的消息写入。请注意不同于 DISCARD 的例子我们并没有释放接受到的消息,这是因为当写入的时候 Netty 已经帮我们释放了。
  • ctx.write(Object) 方法不会使消息写入到通道上,他被缓冲在了内部,你需要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者可以用更简洁的 cxt.writeAndFlush(msg) 以达到同样的目的。

再次运行 telnet 命令,就会接受到你发送的信息。

3、时间服务器

接下来我们基于 TIME 协议,实现构建和发送一个消息,然后在完成时关闭连接。和之前的例子不同的是在不接受任何请求时会发送一个含32位的整数的消息,并且一旦消息发送就会立即关闭连接。

TIME 协议可以提供机器可读的日期时间信息。

我们会在连接创建时发送时间消息,所以需要覆盖 channelActive() 方法:

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 分配空间
        final ByteBuf time = ctx.alloc().buffer(4);
        // 获取 32 位时间戳并写入
        time.writeInt((int) (System.currentTimeMillis() / 1000L));
        final ChannelFuture future = ctx.writeAndFlush(time);
        // 添加监听器
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                assert future == channelFuture;
                // 关闭连接
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • 4 行, channelActive() 方法将会在连接被建立并且准备进行通信时被调用。
  • 6 行,同 Java 的 NIO 类似,为了构建一个消息,需要为缓冲区分配空间。因为要发送一个 32 为的时间戳,所以至少 4 字节。
  • 8 行,消息构建完毕后,执行写入。回想使用 Java NIO 的 Buffer 时,在读写操作之间,需要调用 buffer.flip( ) 方法设置指针位置。但是在在 Netty 中不需要这样操作,原因是 Netty 提供了两个指针,一个读指针和一个写指针,在读写时两者不相互影响。再也不用担心忘记调用 flip( ) 方法时数据为空或者数据错误啦。
  • 11 行,在第 9 行执行完 ctx.writeAndFlush(time) 后会返回一个 ChannelFuture 对象,代表着还没有发生的一次 I/O 操作。 这意味着任何一个请求操作都不会马上被执行,因为在 Netty 里所有的操作都是异步的 。这样来看,我们想完成消息发送后关闭连接,直接在后边调用 ctx.close( ) 可能不能立刻关闭连接。返回的 ChannelFuture 对象在操作完成后会通知它的监听器,继续执行操作完成后的动作。

4、时间客户端

对于时间服务端不能直接用 telnet 的方式测试,因为不能靠人工把一个 32 位的二进制数据翻译成时间,所以下面将实现一个时间客户端。

与服务端的实现唯一不同的就是使用了不同的 Bootstrap 和 Channel 实现:

public class TimeClient {

    private String host;

    private int port;

    public TimeClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            // 启动
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 等待连接关闭
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        TimeClient timeClient = new TimeClient("localhost", 8080);
        timeClient.run();
    }

}
  • 13 行,对比 server 端只指定了一个 EventLoopGroup ,它即会作为 boss group 也会作为 worker group,尽管客户端不需要使用到 boss group。
  • 15 行,Bootstrap 和 ServerBootstrap 类似,Bootstrap 面向于飞服务端的 channel ,比如客户端和无连接传输模式的 channel。

再稍微改动一下 handler :

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	// 在 TCP/IP 中,Netty 会把读到的数据放入 ByteBuf 中
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            long time = byteBuf.readUnsignedInt() * 1000L;
            System.out.println(new Date(time));
            ctx.close();
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分别启动 TimeServer 和 TimeClient ,控制台打印出了当前时间:

100

然而,多次运行后处理器有时候会因为抛出 IndexOutOfBoundsException 而拒绝工作。带着这个问题,继续往下面看。

5、处理基于流的传输

比较典型的基于流传输的 TCP/IP 协议,也就是说,应用层两个不同的数据包,在 TCP/IP 协议传输时, 可能会组合或者拆分应用层协议的数据 。由于 两个数据包之间并无边界区分 ,可能导致消息的读取错误。

很多资料也称上述这种现象为 TCP 粘包,而值得注意的是:

1、TCP 协议本身设计就是面向流的,提供可靠传输。

2、正因为面向流,对于应用层的数据包而言,没有边界区分。这就需要应用层主动处理不同数据包之间的组装。

3、发生粘包现象不是 TCP 的缺陷,只是应用层没有主动做数据包的处理。

回到上面程序,这也就是上述异常发生的原因。一个 32 位整型是非常小的数据,它并不见得会被经常拆分到到不同的数据段内。然而,问题是它确实可能会被拆分到不同的数据段内。

比较常见的两种解决方案就是基于长度或者基于终结符,继续以上面的 TIME 协议程序为基础,着手解决这个问题。因为只发送一个 32 位的整形时间戳,我们采用基于数据长度的方式:

❀ 解决方案一

最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。修改一下 TimeClientHandler 的代码:

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buf;

    private static final int CAPACITY = 4;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(CAPACITY);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        buf.writeBytes(byteBuf);
        byteBuf.release();
        // 数据大于或等于 4 字节
        if (buf.readableBytes() >= CAPACITY) {
            long time = buf.readUnsignedInt() * 1000L;
            System.out.println(new Date(time));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

其中覆盖了 handler 生命周期的两个方法:

  • 8 行, handlerAdded() :当检测到新的连接之后,调用 ch.pipeline().addLast(new LifeCycleTestHandler()) 之后的回调,表示当前的channel中已经成功添加了一个逻辑处理器
  • 13 行, handlerRemoved() :在连接关闭后把这条连接上的所有逻辑处理器全部移除掉。

❀ 解决方案二

尽管上述方案已经解决了 TIME 客户端的问题了,但是在处理器中增加了逻辑,我们可以把处理消息的部分抽取出来,成为一个单独的处理器,并且可以增加多个 ChannelHandler 到 ChannelPipline ,每个处理器各司其职,减少模块的复杂度。

由此,拆分出一个 TimeDecoder 用于处理消息:

public class TimeDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readBytes(4));
        }
    }
}
  • ByteToMessageDecoder 继承自 ChannelInboundHandlerAdapter ,每当有新数据接收的时候, ByteToMessageDecoder 都会调用 decode() 方法来处理内部的那个累积缓冲。
  • 如果在 decode() 方法里增加了一个对象到 out 对象里,这意味着解码器解码消息成功。 ByteToMessageDecoder 将会丢弃在累积缓冲里已经被读过的数据。

最后,修改 TimeClient 的代码,将 TimeDecoder 加入 ChannelPipline :

bootstrap.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            }).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);

除此之外,Netty还提供了更多开箱即用的解码器使你可以更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现,感兴趣的小伙伴可以自行了解。

6、将消息解码为自定义对象

上述的例子我们一直在使用 ByteBuf 作为协议消息的主要数据结构,但是实际使用中,需要传输的消息更加复杂,抽象为对象来处理更加方便。继续以 TIME 客户端和服务器为基础,使用自定义的对象代替 ByteBuf 。

  • 定义保存时间的对象 OurTime :

    public class OurTime {
    
      private final long value;
    
      public OurTime() {
          this(System.currentTimeMillis() / 1000L);
      }
    
      public OurTime(long value) {
          this.value = value;
      }
    
      public long value() {
          return value;
      }
    
      @Override
      public String toString() {
          return new Date(value() * 1000L).toString();
      }
    }
  • 修改 TimeDecoder 类,返回 OurTime 类:

    public class TimeDecoder extends ByteToMessageDecoder {
    
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          if (in.readableBytes() >= 4) {
              out.add(new OurTime(in.readUnsignedInt()));
          }
      }
    }
  • 修改后的 TimeClientHandler 类,处理新消息更加简洁:

    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          OurTime ourTime = (OurTime) msg;
          System.out.println(ourTime);
          ctx.close();
      }
    
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          cause.printStackTrace();
          ctx.close();
      }
    }

而对于服务端来说,大同小异。

修改 TimeServerHandler 的代码:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
 }

现在,唯一缺少的功能是一个编码器,是 ChannelOutboundHandler 的实现,用来将 OurTime 对象重新转化为一个 ByteBuf。这是比编写一个解码器简单得多,因为没有需要处理的数据包编码消息时拆分和组装。

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (OurTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

在这几行代码里还有几个重要的事情。第一,通过 ChannelPromise ,当编码后的数据被写到了通道上 Netty 可以通过这个对象标记是成功还是失败。第二, 我们不需要调用 cxt.flush()。因为处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),如果像自己实现 flush() 方法内容可以自行覆盖这个方法。

进一步简化操作,你可以使用 MessageToByteEncode :

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
        @Override
        protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
            out.writeInt((int)msg.value());
        }
    }

最后在 TimeServerHandler 之前把 TimeEncoder 插入到ChannelPipeline。

五、总结

相信读完这篇文章的从头至尾,小伙伴们对使用 Netty 编写一个客户端和服务端有了大概的了解。后面我们将继续探究 Netty 的源码实现,并结合其涉及的基础知识进行了解、深入。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK