Netty(三): 直接内存原理及应用

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/14353111.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Netty作为一个流行的应用框架,它的强悍之处主要有两点:1. 是性能强悍,可以轻松承载数万并发; 2. 其编程模型简单,容易上手; 这就给大家打开了一扇通向高性能的大门。

我在rocketmq的一篇文章里阐述过,高性能的核心本质和实现原理。主要就是依赖于操作系统提供的高效io模型和内存控制。有兴趣的可以阅读我来之前的文章: RocketMQ(七):高性能探秘之MappedFile


1. 通常的内存模型概述

一般地,系统为了保证系统本身的安全性和健壮性,会将内存从逻辑上隔离成内核区域和用户区域,这很容易理解。因为用户行为不可控性太强,暴露得太多,就容易导致各种神奇的用法,超出系统的控制范围。当然,有的语言是支持直接控制内存的,比如C, 你可以用一个指针,访问内存中的几乎任意位置的数据(除了一些硬件地址)。而像汇编,则可以访问任意地址。而这些底层的语言,已经离我们越来越远了,它基本上和普通程序员关系不大了。

用户很多时候的编程控制,都是在用户区域进行的,比如我做一些加减乘除,如 Integer a = 2; Integer b = 3; Integer c = a * b; 这种操作, 所有操作就是在用户空间上完成的。这些操作,不会有内核区域的介入。但是有些操作,则必须由内核进行,比如对文件的读写,就是不同设备之间的数据交换,也就是io类操作。这类操作因为有非常的难度实现,所以一定是由操作系统来完成底层的操作的。那么,第一手的数据必定要经过内核区域。然而我们的代码是跑在用户区的,那么,通常情况下,就会存在内核区数据,拷贝到用户区数据的这么一个过程。这是一个读的过程,而写的过程则是一个相反的操作,从用户区拷贝数据到内核区,然后再由内核完成io操作。




2. Java中的直接内存原理






    // 创建直接内存空间实例
    ByteBuffer buffer = ByteBuffer.allocateDirect(1600);
    for (int i = 0; i < 90_0000; i++) {
        for (int j = 0; j < 199; j++) {
            // 数据的写入
        for (int j = 0; j < 199; j++) {
            // 数据的读取
        // 数据清理

3. Netty中使用直接内存


两个场景:1. 向应用传递网络数据时(读过程); 2. 应用向远端传递数据时(写过程);

    // 写过程,将msg转换为直接内存存储的二进制数据
    // io.netty.handler.codec.MessageToByteEncoder#write
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                I cast = (I) msg;
                // 默认 preferDirect = true;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 调用子类的实现,编码数据,以便实现私有协议
                    encode(ctx, cast, buf);
                } finally {

                if (buf.isReadable()) {
                    // 写数据到远端
                    ctx.write(buf, promise);
                } else {
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                buf = null;
            } else {
                ctx.write(msg, promise);
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
    // io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
     * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
     * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception {
        if (preferDirect) {
            // PooledByteBufAllocator
            return ctx.alloc().ioBuffer();
        } else {
            return ctx.alloc().heapBuffer();
    // io.netty.buffer.AbstractByteBufAllocator#ioBuffer()
    public ByteBuf ioBuffer() {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(DEFAULT_INITIAL_CAPACITY);
        return heapBuffer(DEFAULT_INITIAL_CAPACITY);
    // io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        validate(initialCapacity, maxCapacity);
        return newDirectBuffer(initialCapacity, maxCapacity);
    // io.netty.buffer.PooledByteBufAllocator#newDirectBuffer
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);

        return toLeakAwareBuffer(buf);

    // io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
        // io.netty.buffer.PoolArena.DirectArena#newByteBuf
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;

            final PoolSubpage<T> head = table[tableIdx];

             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);

        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
    // io.netty.util.internal.PlatformDependent0#newDirectBuffer
    static ByteBuffer newDirectBuffer(long address, int capacity) {
        ObjectUtil.checkPositiveOrZero(capacity, "capacity");

        try {
            return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
        } catch (Throwable cause) {
            // Not expected to ever throw!
            if (cause instanceof Error) {
                throw (Error) cause;
            throw new Error(cause);

向ByteBuffer中写入数据过程, 即是向直接内存中写入数据的过程,它可能不像普通的堆对象一样简单咯。

    // io.netty.buffer.AbstractByteBuf#writeBytes(byte[])
    public ByteBuf writeBytes(byte[] src) {
        writeBytes(src, 0, src.length);
        return this;

    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        setBytes(writerIndex, src, srcIndex, length);
        writerIndex += length;
        return this;
    // io.netty.buffer.PooledUnsafeDirectByteBuf#setBytes(int, byte[], int, int)
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        // addr() 将会得到一个内存地址
        UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
        return this;
    // io.netty.buffer.PooledUnsafeDirectByteBuf#addr
    private long addr(int index) {
        return memoryAddress + index;

    // io.netty.buffer.UnsafeByteBufUtil#setBytes(io.netty.buffer.AbstractByteBuf, long, int, byte[], int, int)
    static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) {
        buf.checkIndex(index, length);
        if (length != 0) {
            // 将字节数据copy到DirectByteBuffer中
            PlatformDependent.copyMemory(src, srcIndex, addr, length);
    // io.netty.util.internal.PlatformDependent#copyMemory(byte[], int, long, long)
    public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) {
        PlatformDependent0.copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddr, length);
    // io.netty.util.internal.PlatformDependent0#copyMemory(java.lang.Object, long, java.lang.Object, long, long)
    static void copyMemory(Object src, long srcOffset, Object dst, long dstOffset, long length) {
        //UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, length);
        while (length > 0) {
            long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
            // 最终由jvm的本地方法,进行内存的copy, 此处dst为null, 即数据只会copy到对应的 dstOffset 中
            // 偏移基数就是: 各种基础地址 ARRAY_OBJECT_BASE_OFFSET...
            UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
            length -= size;
            srcOffset += size;
            dstOffset += size;

可以看到,最后直接内存的写入,是通过 Unsafe 类,对操作系统进行内存数据的写入的。


    // io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");

        try {
            if (isNotValidPromise(promise, true)) {
                // cancelled
                return promise;
        } catch (RuntimeException e) {
            throw e;
        write(msg, false, promise);

        return promise;

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            safeExecute(executor, task, promise, m);

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        // io.netty.channel.DefaultChannelPipeline.HeadContext#write
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        // io.netty.channel.AbstractChannel.AbstractUnsafe#write
        public final void write(Object msg, ChannelPromise promise) {

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak

            int size;
            try {
                // 转换msg为直接内存,如有必要
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
            } catch (Throwable t) {
                safeSetFailure(promise, t);
            // 将msg放入outboundBuffer中,即相当于写完了数据
            outboundBuffer.addMessage(msg, size, promise);
    // io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;

            return newDirectBuffer(buf);

        if (msg instanceof FileRegion) {
            return msg;

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    // io.netty.channel.ChannelOutboundBuffer#addMessage
     * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
     * the message was written.
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        // 如有必要,立即触发 fireChannelWritabilityChanged 事件,从而使立即向网络写入数据
        incrementPendingOutboundBytes(entry.pendingSize, false);


以上,就是netty对整个直接内存的操作方式了。看起来有点复杂,主要netty到处都是其设计哲学的体现,无论是一个写事件、读事件、或者是状态变更事件,都是一长串的流水线操作。当然了,我们此处讨论的是,其如何使用直接内存的。它通过使用一个 PooledUnsafeDirectByteBuf , 最终引用jdk的 direct = ByteBuffer.allocateDirect(1); 使用 DirectByteBuffer 实现直接内存的使用。并使用其构造方法 DirectByteBuffer(long addr, int cap) 进行直接内存对象创建。

4. 闲话多说




About Joyk

Aggregate valuable and interesting links.
Joyk means Joy of geeK