52

Netty Pipeline与ChannelHandler那些事

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzIwNTI2ODY5OA%3D%3D&%3Bmid=2649938675&%3Bidx=1&%3Bsn=5b974299bfe1dc806ee554f7d04ec880
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

f6R7V3m.png!web 戳蓝字「TopCoder 」关注我们哦!

QvyIRf7.jpg!web

Netty的Pipeline和ChannelHandler是Netty处理流程的重要组成部分,ChannelHandler对应一个个业务处理器,Pipeline则是负责将各个ChannelHandler串起来的“容器”,二者结合起来一起完成Netty的处理流程。

Pipeline

每个channel内部都会持有一个ChannelPipeline对象pipeline,pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。

yUf2M3U.jpg!web

channel的读写操作都会走到DefaultChannelPipeline中,当channel完成register、active、read、readComplete等操作时,会触发pipeline的相应方法。

  1. 当channel注册到selector后,触发pipeline的fireChannelRegistered方法;

  2. 当channel是可用时,触发pipeline的fireChannelActive方法。(fireChannelActive触发一般是在fireChannelRegistered之后触发的);

  3. 当客户端发送数据时,触发pipeline的fireChannelRead方法;

  4. 触发pipeline的fireChannelRead方法之后会触发pipeline的fireChannelReadComplete方法。

DefaultChannelPipeline 是Netty默认pipeline实现,对应代码如下:

 1public class DefaultChannelPipeline implements ChannelPipeline {
 2    // head和tail是handler的处理链/上下文
 3    final AbstractChannelHandlerContext head;
 4    final AbstractChannelHandlerContext tail;
 5    private final Channel channel;
 6
 7    protected DefaultChannelPipeline(Channel channel) {
 8        this.channel = ObjectUtil.checkNotNull(channel, "channel");
 9        succeededFuture = new SucceededChannelFuture(channel, null);
10        voidPromise =  new VoidChannelPromise(channel, true);
11
12        tail = new TailContext(this);
13        head = new HeadContext(this);
14        head.next = tail;
15        tail.prev = head;
16    }
17}

TailContext实现了ChannelOutboundHandler接口,HeadContext实现了ChannelInboundHandler和ChannelOutboundHandler接口,head和tail构成了一个链表。

对于Inbound操作,从head开始进行处理,向后遍历;对于OutBound操作,从tail开始处理,向前遍历。那么哪些操作是Inbound哪些是OutBound操作呢?

  • InBound:channelRegistered、channelActive、channelRead、channelReadComplete;

  • OutBound:bind、connect、close、flush等。

注意,HeadContext实现了 ChannelInboundHandlerChannelOutboundHandler 接口,对于OutBound操作,最后也是会走到HeadContext来处理的,其实TailContext只是一个浅封装,实际逻辑并不多。HeadContext 中包含了一个netty底层的socket操作类,对于 bind/connect/disconnect/close/deregister/beginRead/read/wirte/flush 操作都是由unsafe对象来完成的。

 1final class HeadContext extends AbstractChannelHandlerContext
 2        implements ChannelOutboundHandler, ChannelInboundHandler {
 3
 4    // netty的底层socket操作类
 5    private final Unsafe unsafe;
 6
 7    HeadContext(DefaultChannelPipeline pipeline) {
 8        super(pipeline, null, HEAD_NAME, false, true);
 9        unsafe = pipeline.channel().unsafe();
10        setAddComplete();
11    }
12    // ...
13}

channelPipeline的channelHandlerContext链表是“责任链”模式的体现,一个请求的处理可能会涉及到多个channelHandler,比如decodeHandler、自定义的业务channelHandler和encodeHandler。业务channelHandler示例如下:

 1public class EchoHandler extends ChannelInboundHandlerAdapter {
 2    @Override
 3    public void channelRead(ChannelHandlerContext ctx, Object msg) {
 4        ByteBuf in = (ByteBuf) msg;
 5        System.out.println(in.toString(CharsetUtil.UTF_8));
 6        ctx.write(msg);
 7    }
 8    @Override
 9    public void channelReadComplete(ChannelHandlerContext ctx) {
10        ctx.flush();
11    }
12    @Override
13    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
14        cause.printStackTrace();
15        ctx.close();
16    }
17}

在channelReadComplete方法中调用flush,其实会走到head.flush方法,最后调用unsafe.flush将数据发送出去。netty pipeline就是责任链(或者说是流水线)模式的体现,通过pipeline机制,使netty处理数据机制具有强大的扩展性和灵活性。

ChannelHandler

netty的channelHandler是channel处理器,基于netty的业务处理,不管多么复杂,都是由channelHandler来做的,可能涉及到多个channelHandler,channelHandler分为多种类型:encoder、decoder、业务处理等。

decoderHandler

decoderHandler大都是接收到数据之后进行转换或者处理的,基本都是ByteToMessageDecoder的子类,其类图如下:

BBzQziy.jpg!web

ByteToMessageDecoder中会有一个数据暂存缓冲区,如果接收到数据不完整,可以先暂存下等到下次接收到数据时再处理。

encoderHandler

encoderHandler大都是将message转换成bytebuf数据,基本都是MessageToByteEncoder的子类,其类图如下:

qeYZ7jv.jpg!web

业务channelHandler

业务处理channelHanler就是用户自定义的业务逻辑了,一般是在最后才addLast到channel.pipeline的,比如http处理逻辑如下:

 1ServerBootstrap boot = new ServerBootstrap();
 2boot.group(bossGroup, workerGroup)
 3    .channel(NioServerSocketChannel.class)
 4    .localAddress(8080)
 5    .childHandler(new ChannelInitializer<SocketChannel>() {
 6        @Override
 7        protected void initChannel(SocketChannel ch) throws Exception {
 8            ch.pipeline()
 9                    .addLast("decoder", new HttpRequestDecoder())
10                    .addLast("encoder", new HttpResponseEncoder())
11                    .addLast("aggregator", new HttpObjectAggregator(512 * 1024))
12                    .addLast("handler", new HttpHandler());
13        }
14    });

DefaultChannelPipeline中的headContext(实现了ChannelOutboundHandler和ChannelInboundHandler)、tailContext(实现了ChannelOutboundHandler)和自定义的channelHandler(decoderHandler、ecoderHandler、channelHandler等,一般实现ChannelInboundHandler),通过ChannelHandlerContext的链接,组成了一个请求处理链。

注意,ChannelOutboundHandler和ChannelInboundHandler的顺序如何添加的,其实只要记住一条:ChannelOutboundHandler之间要保证顺序,ChannelInboundHandler之间要保证顺序,二者之间无需保证顺序。

channelHandler的运行流程图:

mqEneuJ.png!web

TailContesxt 本身代码不多并且挺多方法都是"空实现",不过它的channelRead方法内部会执行 ReferenceCountUtil.release(msg) 释放msg占用的内存空间,也就是说在 未定义用户ChannelHandler 或者 用户ChannelHandler的channelRead继续传递后续ChannelHandler的channelRead 时,到TailContext的channelRead时会自动释放msg所占用内存。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK