4

Netty(三): 直接内存原理及应用

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/14353111.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Netty作为一个流行的应用框架,它的强悍之处主要有两点:1. 是性能强悍,可以轻松承载数万并发; 2. 其编程模型简单,容易上手; 这就给大家打开了一扇通向高性能的大门。

我在rocketmq的一篇文章里阐述过,高性能的核心本质和实现原理。主要就是依赖于操作系统提供的高效io模型和内存控制。有兴趣的可以阅读我来之前的文章: RocketMQ(七):高性能探秘之MappedFile

高效io模型略去不说,我们今天主要来看看内存控制这块的强大之处。

1. 通常的内存模型概述

一般地,系统为了保证系统本身的安全性和健壮性,会将内存从逻辑上隔离成内核区域和用户区域,这很容易理解。因为用户行为不可控性太强,暴露得太多,就容易导致各种神奇的用法,超出系统的控制范围。当然,有的语言是支持直接控制内存的,比如C, 你可以用一个指针,访问内存中的几乎任意位置的数据(除了一些硬件地址)。而像汇编,则可以访问任意地址。而这些底层的语言,已经离我们越来越远了,它基本上和普通程序员关系不大了。

用户很多时候的编程控制,都是在用户区域进行的,比如我做一些加减乘除,如 Integer a = 2; Integer b = 3; Integer c = a * b; 这种操作, 所有操作就是在用户空间上完成的。这些操作,不会有内核区域的介入。但是有些操作,则必须由内核进行,比如对文件的读写,就是不同设备之间的数据交换,也就是io类操作。这类操作因为有非常的难度实现,所以一定是由操作系统来完成底层的操作的。那么,第一手的数据必定要经过内核区域。然而我们的代码是跑在用户区的,那么,通常情况下,就会存在内核区数据,拷贝到用户区数据的这么一个过程。这是一个读的过程,而写的过程则是一个相反的操作,从用户区拷贝数据到内核区,然后再由内核完成io操作。

直接将内存划分为内核区与用户区,实在是太泛了,不能说错,但有一种说了等于没说的感觉。

所以,对内存的划分,还需要再细点,即所谓的内存模型或者内存区域。各语言各场景各实现自然是百家争鸣,无可厚非。但大致就是按照一定的规则,切分成不同用途的区域,然后在需要的时候向该区域进行内存分配,并保存到相应的表或者标识中,以便后续可读或不可再分配。而这其中,还有个非常重要的点是,除了知道如何分配内存之外,还要知道如何回收内存。另外,如何保证内存的可见性,也是一个内存模型需要考虑的重要话题。

具体实现就不用说了,因为没有一个放之四海而皆准的说法,我也没那能耐讲清楚这事情。大家自行脑补吧。

2. Java中的直接内存原理

首先,来说说为什么java中会有直接内存这个概念?我们知道,java中有很重要的一个内存区域,即堆内存,几乎所有的对象都堆上进行分配,所以,大部分的GC工作,也是针对堆进行的。关联上一节所讲的事,堆内存我们可以划分到用户空间内存区域去。应该说,java只要将这一块内存管理好了,基本上就可以管理好java的对象的生命周期了。那么,到底什么直接内存?和堆内存又有啥关系?

直接内存是脱离掉堆空间的,它不属于java的堆,其他区域也不属于,即直接内存不受jvm管控。它属于受系统直接控制的一段内存区域。

为什么直接内存要脱离jvm的管控呢?因为jvm管控的是用户空间,而有的场景则必须要内核空间的介入,整个过程才能完成。而如果用户空间想要获取数据,则必须要像内核中请求复制数据,数据才对用户空间可见。而很多这种场景,复制数据的目的,仅仅是为了使用一次其数据,做了相应的转换后,就不再使用有关系,比如流数据的接入过程。这个复制的过程,则必定有不少的性能损耗,所以就有直接内存的出现。它的目的在于避免内核空间和用户空间之间进行无意义的数据复制,从而提升程序性能。

直接内存不受jvm管控,那么它受谁的管控呢?实际上,是由操作系统的底层进行管控的,在进行内存分配请求时,系统会申请一段共享区域。由内核和用户代码共享这里的数据写入,即内核写入的数据,用户代码可以直接访问,用户代码写入的数据,内核可以直接使用。在底层,是由mmap这种函数接口来实现的共享内存的。

而在java层面,则是使用DirectByteBuffer来呈现的,它的创建、使用、删除如下:

    // 创建直接内存空间实例
    ByteBuffer buffer = ByteBuffer.allocateDirect(1600);
    for (int i = 0; i < 90_0000; i++) {
        for (int j = 0; j < 199; j++) {
            // 数据的写入
            buffer.putInt(j);
        }
        buffer.flip();
        for (int j = 0; j < 199; j++) {
            // 数据的读取
            buffer.get();
        }
        // 数据清理
        buffer.clear();
    }

3. Netty中使用直接内存

知道了直接内存的使用过程,那么如何找到更好的场景,则是需要我们去发现的。netty作为一个高性能网络通信框架,重要的工作就是在处理网络io问题。那么,在它的场景里,使用上直接内存这一大杀器,则是再好不过了。那么,netty是如何利用它的呢?

两个场景:1. 向应用传递网络数据时(读过程); 2. 应用向远端传递数据时(写过程);

    // 写过程,将msg转换为直接内存存储的二进制数据
    // io.netty.handler.codec.MessageToByteEncoder#write
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                // 默认 preferDirect = true;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 调用子类的实现,编码数据,以便实现私有协议
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    // 写数据到远端
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
    // io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
    /**
     * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
     * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
     */
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception {
        if (preferDirect) {
            // PooledByteBufAllocator
            return ctx.alloc().ioBuffer();
        } else {
            return ctx.alloc().heapBuffer();
        }
    }
    // io.netty.buffer.AbstractByteBufAllocator#ioBuffer()
    @Override
    public ByteBuf ioBuffer() {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(DEFAULT_INITIAL_CAPACITY);
        }
        return heapBuffer(DEFAULT_INITIAL_CAPACITY);
    }
    // io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)
    @Override
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }
    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);
        return newDirectBuffer(initialCapacity, maxCapacity);
    }
    // io.netty.buffer.PooledByteBufAllocator#newDirectBuffer
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    // io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
    }
        // io.netty.buffer.PoolArena.DirectArena#newByteBuf
        @Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
    // io.netty.util.internal.PlatformDependent0#newDirectBuffer
    static ByteBuffer newDirectBuffer(long address, int capacity) {
        ObjectUtil.checkPositiveOrZero(capacity, "capacity");

        try {
            return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
        } catch (Throwable cause) {
            // Not expected to ever throw!
            if (cause instanceof Error) {
                throw (Error) cause;
            }
            throw new Error(cause);
        }
    }

向ByteBuffer中写入数据过程, 即是向直接内存中写入数据的过程,它可能不像普通的堆对象一样简单咯。

    // io.netty.buffer.AbstractByteBuf#writeBytes(byte[])
    @Override
    public ByteBuf writeBytes(byte[] src) {
        writeBytes(src, 0, src.length);
        return this;
    }

    @Override
    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        ensureWritable(length);
        setBytes(writerIndex, src, srcIndex, length);
        writerIndex += length;
        return this;
    }
    
    // io.netty.buffer.PooledUnsafeDirectByteBuf#setBytes(int, byte[], int, int)
    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        // addr() 将会得到一个内存地址
        UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
        return this;
    }
    // io.netty.buffer.PooledUnsafeDirectByteBuf#addr
    private long addr(int index) {
        return memoryAddress + index;
    }

    // io.netty.buffer.UnsafeByteBufUtil#setBytes(io.netty.buffer.AbstractByteBuf, long, int, byte[], int, int)
    static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) {
        buf.checkIndex(index, length);
        if (length != 0) {
            // 将字节数据copy到DirectByteBuffer中
            PlatformDependent.copyMemory(src, srcIndex, addr, length);
        }
    }
    // io.netty.util.internal.PlatformDependent#copyMemory(byte[], int, long, long)
    public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) {
        PlatformDependent0.copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddr, length);
    }
    // io.netty.util.internal.PlatformDependent0#copyMemory(java.lang.Object, long, java.lang.Object, long, long)
    static void copyMemory(Object src, long srcOffset, Object dst, long dstOffset, long length) {
        //UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, length);
        while (length > 0) {
            long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
            // 最终由jvm的本地方法,进行内存的copy, 此处dst为null, 即数据只会copy到对应的 dstOffset 中
            // 偏移基数就是: 各种基础地址 ARRAY_OBJECT_BASE_OFFSET...
            UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
            length -= size;
            srcOffset += size;
            dstOffset += size;
        }
    }

可以看到,最后直接内存的写入,是通过 Unsafe 类,对操作系统进行内存数据的写入的。

最后,来看下它如何将写数据到远端:

    // io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }
    
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
        // io.netty.channel.DefaultChannelPipeline.HeadContext#write
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
        // io.netty.channel.AbstractChannel.AbstractUnsafe#write
        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                // 转换msg为直接内存,如有必要
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            // 将msg放入outboundBuffer中,即相当于写完了数据
            outboundBuffer.addMessage(msg, size, promise);
        }
    // io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
    @Override
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }
    // io.netty.channel.ChannelOutboundBuffer#addMessage
    /**
     * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
     * the message was written.
     */
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        // 如有必要,立即触发 fireChannelWritabilityChanged 事件,从而使立即向网络写入数据
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

大概就是说,通过直接内存写好的数据,只需要再调用下内核的接入接口,将直接内存的数据放入缓冲,就可以被发送到远端了。

以上,就是netty对整个直接内存的操作方式了。看起来有点复杂,主要netty到处都是其设计哲学的体现,无论是一个写事件、读事件、或者是状态变更事件,都是一长串的流水线操作。当然了,我们此处讨论的是,其如何使用直接内存的。它通过使用一个 PooledUnsafeDirectByteBuf , 最终引用jdk的 direct = ByteBuffer.allocateDirect(1); 使用 DirectByteBuffer 实现直接内存的使用。并使用其构造方法 DirectByteBuffer(long addr, int cap) 进行直接内存对象创建。

4. 闲话多说

从整体上来说,直接内存减少了进行io时的内存复制操,但其仅为内核与用户空间的内存复制,因为用户空间的数据复制是并不可少的,因为最终它们都必须要转换为二进制流,才能被不同空间的程序读取。但创建直接内存对象的开销要高于创建普通内存对象,因为它可能需要维护更复杂的关系环境。事实上,直接内存可以做到不同进程间的内存共享,而这在普通对象内存中是无法做到的(不过java是单进程的,不care此场景)。java的直接内存的使用,仅为使用系统提供的一个便捷接口,适应更好的场景。

而netty作为一个网络通信框架,则是为了更好处理具体场景,更合理的使用了直接内存,从而成就了所谓的零拷贝,高性能的基石之一。所以,一个好的框架,一定是解决某类问题的翘楚,它不一定是功能开创者,但一定是很好的继承者。

另外,内存管理是个非常复杂的问题。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK