NIO的零拷贝如何实现高效数据传输? _
Java NIO 中的通道Channel就相当于操作系统的内核空间kernel space的缓冲区而缓冲区Buffer对应的相当于操作系统的用户空间user space中的用户缓冲区user buffer。通道Channel是全双工的双向传输它既可能是读缓冲区read buffer也可能是网络缓冲区socket buffer。缓冲区Buffer分为堆内存HeapBuffer和堆外内存DirectBuffer这是通过 malloc() 分配出来的用户态内存。堆外内存DirectBuffer在使用后需要应用程序手动回收而堆内存HeapBuffer的数据在 GC 时可能会被自动回收。因此在使用 HeapBuffer 读写数据时为了避免缓冲区数据因为 GC 而丢失NIO 会先把 HeapBuffer 内部的数据拷贝到一个临时的 DirectBuffer 中的本地内存native memory这个拷贝涉及到sun.misc.Unsafe.copyMemory()的调用背后的实现原理与memcpy()类似。 最后将临时生成的 DirectBuffer 内部的数据的内存地址传给 I/O 调用函数这样就避免了再去访问 Java 对象处理 I/O 读写。内存映射文件内存映射文件 I/O 是一种读和写文件数据的方法它可以比常规的基于流或者基于通道的 I/O 快得多。向内存映射文件写入可能是危险的只是改变数组的单个元素这样的简单操作就可能会直接修改磁盘上的文件。修改数据与将数据保存到磁盘是没有分开的。下面代码行将文件的前 1024 个字节映射到内存中map() 方法返回一个 MappedByteBuffer它是 ByteBuffer 的子类。因此可以像使用其他任何 ByteBuffer 一样使用新映射的缓冲区操作系统会在需要时负责执行映射。javaMappedByteBuffer mbb fc.map(FileChannel.MapMode.READ_WRITE, 0, 1024);MappedByteBufferMappedByteBuffer 是 NIO 基于内存映射mmap这种零拷贝方式的提供的一种实现可以减少一次数据拷贝的过程。它继承自 ByteBuffer。FileChannel 定义了一个 map() 方法它可以把一个文件从 position 位置开始的 size 大小的区域映射为内存映像文件。抽象方法 map() 方法在 FileChannel 中的定义如下javapublic abstract MappedByteBuffer map(MapMode mode, long position, long size) throws IOException;mode限定内存映射区域MappedByteBuffer对内存映像文件的访问模式包括只可读READ_ONLY、可读可写READ_WRITE和写时拷贝PRIVATE三种模式。position文件映射的起始地址对应内存映射区域MappedByteBuffer的首地址。size文件映射的字节长度从 position 往后的字节数对应内存映射区域MappedByteBuffer的大小。MappedByteBuffer 相比 ByteBuffer 新增了 fore()、load() 和 isLoad() 三个重要的方法fore()对于处于 READ_WRITE 模式下的缓冲区把对缓冲区内容的修改强制刷新到本地文件。load()将缓冲区的内容载入物理内存中并返回这个缓冲区的引用。isLoaded()如果缓冲区的内容在物理内存中则返回 true否则返回 false。下面给出一个利用 MappedByteBuffer 对文件进行读写的使用示例javaprivate final static String CONTENT Zero copy implemented by MappedByteBuffer; private final static String FILE_NAME /mmap.txt; private final static String CHARSET UTF-8;写文件数据打开文件通道 fileChannel 并提供读权限、写权限和数据清空权限通过 fileChannel 映射到一个可写的内存缓冲区 mappedByteBuffer将目标数据写入 mappedByteBuffer通过force()方法把缓冲区更改的内容强制写入本地文件。javaTest public void writeToFileByMappedByteBuffer() { Path path Paths.get(getClass().getResource(FILE_NAME).getPath()); byte[] bytes CONTENT.getBytes(Charset.forName(CHARSET)); try (FileChannel fileChannel FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) { MappedByteBuffer mappedByteBuffer fileChannel.map(READ_WRITE, 0, bytes.length); if (mappedByteBuffer ! null) { mappedByteBuffer.put(bytes); mappedByteBuffer.force(); } } catch (IOException e) { e.printStackTrace(); } }读文件数据打开文件通道 fileChannel 并提供只读权限通过 fileChannel 映射到一个只可读的内存缓冲区 mappedByteBuffer读取 mappedByteBuffer 中的字节数组即可得到文件数据。javaTest public void readFromFileByMappedByteBuffer() { Path path Paths.get(getClass().getResource(FILE_NAME).getPath()); int length CONTENT.getBytes(Charset.forName(CHARSET)).length; try (FileChannel fileChannel FileChannel.open(path, StandardOpenOption.READ)) { MappedByteBuffer mappedByteBuffer fileChannel.map(READ_ONLY, 0, length); if (mappedByteBuffer ! null) { byte[] bytes new byte[length]; mappedByteBuffer.get(bytes); String content new String(bytes, StandardCharsets.UTF_8); assertEquals(content, Zero copy implemented by MappedByteBuffer); } } catch (IOException e) { e.printStackTrace(); } }下面介绍map()方法的底层实现原理。map()方法是java.nio.channels.FileChannel的抽象方法由子类sun.nio.ch.FileChannelImpl.java实现下面是和内存映射相关的核心代码javapublic MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { int pagePosition (int)(position % allocationGranularity); long mapPosition position - pagePosition; long mapSize size pagePosition; try { addr map0(imode, mapPosition, mapSize); } catch (OutOfMemoryError x) { System.gc(); try { Thread.sleep(100); } catch (InterruptedException y) { Thread.currentThread().interrupt(); } try { addr map0(imode, mapPosition, mapSize); } catch (OutOfMemoryError y) { throw new IOException(Map failed, y); } } int isize (int)size; Unmapper um new Unmapper(addr, mapSize, isize, mfd); if ((!writable) || (imode MAP_RO)) { return Util.newMappedByteBufferR(isize, addr pagePosition, mfd, um); } else { return Util.newMappedByteBuffer(isize, addr pagePosition, mfd, um); } }map() 方法通过本地方法 map0() 为文件分配一块虚拟内存作为它的内存映射区域然后返回这块内存映射区域的起始地址。文件映射需要在 Java 堆中创建一个 MappedByteBuffer 的实例。如果第一次文件映射导致 OOM则手动触发垃圾回收休眠 100ms 后再尝试映射如果失败则抛出异常。通过 Util 的 newMappedByteBuffer 可读可写方法或者 newMappedByteBufferR仅读 方法方法反射创建一个 DirectByteBuffer 实例其中 DirectByteBuffer 是 MappedByteBuffer 的子类。map()方法返回的是内存映射区域的起始地址通过起始地址 偏移量就可以获取指定内存的数据。这样一定程度上替代了read()或write()方法底层直接采用sun.misc.Unsafe类的getByte()和putByte()方法对数据进行读写。javaprivate native long map0(int prot, long position, long mapSize) throws IOException;上面是本地方法native methodmap0 的定义它通过 JNIJava Native Interface调用底层 C 的实现这个 native 函数Java_sun_nio_ch_FileChannelImpl_map0的实现位于 JDK 源码包下的native/sun/nio/ch/FileChannelImpl.c这个源文件里面。cJNIEXPORT jlong JNICALL Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len) { void *mapAddress 0; jobject fdo (*env)-GetObjectField(env, this, chan_fd); jint fd fdval(env, fdo); int protections 0; int flags 0; if (prot sun_nio_ch_FileChannelImpl_MAP_RO) { protections PROT_READ; flags MAP_SHARED; } else if (prot sun_nio_ch_FileChannelImpl_MAP_RW) { protections PROT_WRITE | PROT_READ; flags MAP_SHARED; } else if (prot sun_nio_ch_FileChannelImpl_MAP_PV) { protections PROT_WRITE | PROT_READ; flags MAP_PRIVATE; } mapAddress mmap64( 0, /* Let OS decide location */ len, /* Number of bytes to map */ protections, /* File permissions */ flags, /* Changes are shared */ fd, /* File descriptor of mapped file */ off); /* Offset into file */ if (mapAddress MAP_FAILED) { if (errno ENOMEM) { JNU_ThrowOutOfMemoryError(env, Map failed); return IOS_THROWN; } return handle(env, -1, Map failed); } return ((jlong) (unsigned long) mapAddress); }可以看出 map0() 函数最终是通过mmap64()这个函数对 Linux 底层内核发出内存映射的调用mmap64()函数的原型如下c#include sys/mman.h void *mmap64(void *addr, size_t len, int prot, int flags, int fd, off64_t offset);下面详细介绍一下mmap64()函数各个参数的含义以及参数可选值addr文件在用户进程空间的内存映射区中的起始地址是一个建议的参数通常可设置为 0 或 NULL此时由内核去决定真实的起始地址。当 flags 为 MAP_FIXED 时addr 就是一个必选的参数即需要提供一个存在的地址。len文件需要进行内存映射的字节长度prot控制用户进程对内存映射区的访问权限PROT_READ读权限PROT_WRITE写权限PROT_EXEC执行权限PROT_NONE无权限flags控制内存映射区的修改是否被多个进程共享MAP_PRIVATE对内存映射区数据的修改不会反映到真正的文件数据修改发生时采用写时复制机制MAP_SHARED对内存映射区的修改会同步到真正的文件修改对共享此内存映射区的进程是可见的MAP_FIXED不建议使用这种模式下 addr 参数指定的必须的提供一个存在的 addr 参数fd文件描述符。每次 map 操作会导致文件的引用计数加 1每次 unmap 操作或者结束进程会导致引用计数减 1offset文件偏移量。进行映射的文件位置从文件起始地址向后的位移量下面总结一下 MappedByteBuffer 的特点和不足之处MappedByteBuffer 使用是堆外的虚拟内存因此分配map的内存大小不受 JVM 的 -Xmx 参数限制但是也是有大小限制的。 如果当文件超出 Integer.MAX_VALUE 字节限制时可以通过 position 参数重新 map 文件后面的内容。MappedByteBuffer 在处理大文件时性能的确很高但也存内存占用、文件关闭不确定等问题被其打开的文件只有在垃圾回收的才会被关闭而且这个时间点是不确定的。MappedByteBuffer 提供了文件映射内存的 mmap() 方法也提供了释放映射内存的 unmap() 方法。然而 unmap() 是 FileChannelImpl 中的私有方法无法直接显示调用。因此用户程序需要通过 Java 反射的调用 sun.misc.Cleaner 类的 clean() 方法手动释放映射占用的内存区域。javapublic static void clean(final Object buffer) throws Exception { AccessController.doPrivileged((PrivilegedActionVoid) () - { try { Method getCleanerMethod buffer.getClass().getMethod(cleaner, new Class[0]); getCleanerMethod.setAccessible(true); Cleaner cleaner (Cleaner) getCleanerMethod.invoke(buffer, new Object[0]); cleaner.clean(); } catch(Exception e) { e.printStackTrace(); } }); }DirectByteBuffer编辑DirectByteBuffer 的对象引用位于 Java 内存模型的堆里面JVM 可以对 DirectByteBuffer 的对象进行内存分配和回收管理一般使用 DirectByteBuffer 的静态方法 allocateDirect() 创建 DirectByteBuffer 实例并分配内存。javapublic static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); }DirectByteBuffer 内部的字节缓冲区位在于堆外的用户态直接内存它是通过 Unsafe 的本地方法 allocateMemory() 进行内存分配底层调用的是操作系统的 malloc() 函数因此DirectByteBuffer 使用的是操作系统内存。javaDirectByteBuffer(int cap) { super(-1, 0, cap, cap); boolean pa VM.isDirectMemoryPageAligned(); int ps Bits.pageSize(); long size Math.max(1L, (long)cap (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base 0; try { base unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa (base % ps ! 0)) { address base ps - (base (ps - 1)); } else { address base; } cleaner Cleaner.create(this, new Deallocator(base, size, cap)); att null; }使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用这块内存不受 jvm 垃圾回收的影响因此内存地址固定有助于 IO 读写java 中的 DirectByteBuf 对象仅维护了此内存的虚引用内存回收分成两步DirectByteBuf 对象被垃圾回收将虚引用加入引用队列通过专门线程访问引用队列根据虚引用释放堆外内存减少了一次数据拷贝用户态与内核态的切换次数没有减少除此之外初始化 DirectByteBuffer 时还会创建一个 Deallocator 线程并通过 Cleaner 的 freeMemory() 方法来对直接内存进行回收操作freeMemory() 底层调用的是操作系统的 free() 函数。javaprivate static class Deallocator implements Runnable { private static Unsafe unsafe Unsafe.getUnsafe(); private long address; private long size; private int capacity; private Deallocator(long address, long size, int capacity) { assert (address ! 0); this.address address; this.size size; this.capacity capacity; } public void run() { if (address 0) { return; } unsafe.freeMemory(address); address 0; Bits.unreserveMemory(size, capacity); } }由于使用 DirectByteBuffer 分配的是系统本地的内存不在 JVM 的管控范围之内因此直接内存的回收和堆内存的回收不同直接内存如果使用不当很容易造成 OutOfMemoryError。说了这么多那么 DirectByteBuffer 和零拷贝有什么关系前面有提到在 MappedByteBuffer 进行内存映射时它的 map() 方法会通过 Util.newMappedByteBuffer() 来创建一个缓冲区实例初始化的代码如下javastatic MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd, Runnable unmapper) { MappedByteBuffer dbb; if (directByteBufferConstructor null) initDBBConstructor(); try { dbb (MappedByteBuffer)directByteBufferConstructor.newInstance( new Object[] { new Integer(size), new Long(addr), fd, unmapper }); } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { throw new InternalError(e); } return dbb; } private static void initDBBRConstructor() { AccessController.doPrivileged(new PrivilegedActionVoid() { public Void run() { try { Class? cl Class.forName(java.nio.DirectByteBufferR); Constructor? ctor cl.getDeclaredConstructor( new Class?[] { int.class, long.class, FileDescriptor.class, Runnable.class }); ctor.setAccessible(true); directByteBufferRConstructor ctor; } catch (ClassNotFoundException | NoSuchMethodException | IllegalArgumentException | ClassCastException x) { throw new InternalError(x); } return null; }}); }DirectByteBuffer 是 MappedByteBuffer 的具体实现类也就是基于底层操作系统的 mmap 技术。实际上Util.newMappedByteBuffer() 方法通过反射机制获取 DirectByteBuffer 的构造器然后创建一个 DirectByteBuffer 的实例对应的是一个单独用于内存映射的构造方法javaprotected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper) { super(-1, 0, cap, cap, fd); address addr; cleaner Cleaner.create(this, unmapper); att null; }因此除了允许分配操作系统的直接内存以外DirectByteBuffer 本身也具有文件内存映射的功能这里不做过多说明。我们需要关注的是DirectByteBuffer 在 MappedByteBuffer 的基础上提供了内存映像文件的随机读取 get() 和写入 write() 的操作。内存映像文件的随机读操作javapublic byte get() { return ((unsafe.getByte(ix(nextGetIndex())))); } public byte get(int i) { return ((unsafe.getByte(ix(checkIndex(i))))); }内存映像文件的随机写操作javapublic ByteBuffer put(byte x) { unsafe.putByte(ix(nextPutIndex()), ((x))); return this; } public ByteBuffer put(int i, byte x) { unsafe.putByte(ix(checkIndex(i)), ((x))); return this; }内存映像文件的随机读写都是借助 ix() 方法实现定位的 ix() 方法通过内存映射空间的内存首地址address和给定偏移量 i 计算出指针地址然后由 unsafe 类的 get() 和 put() 方法和对指针指向的数据进行读取或写入。javaprivate long ix(int i) { return address ((long)i 0); }FileChannelFileChannel 是一个用于文件读写、映射和操作的通道同时它在并发环境下是线程安全的基于 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 getChannel() 方法可以创建并打开一个文件通道。FileChannel 定义了 transferFrom() 和 transferTo() 两个抽象方法它通过在通道和通道之间建立连接实现数据传输的。transferTo()通过 FileChannel 把文件里面的源数据写入一个 WritableByteChannel 的目的通道。javapublic abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;transferFrom()把一个源通道 ReadableByteChannel 中的数据读取到当前 FileChannel 的文件里面。javapublic abstract long transferFrom(ReadableByteChannel src, long position, long count) throws IOException;下面介绍 transferTo() 和 transferFrom() 方法的底层实现原理这两个方法也是 java.nio.channels.FileChannel 的抽象方法由子类 sun.nio.ch.FileChannelImpl.java 实现。transferTo() 和 transferFrom() 底层都是基于sendfile的方式 实现数据传输的其中 FileChannelImpl.java 定义了 3 个常量用于标示当前操作系统的内核是否支持 sendfile 以及 sendfile 的相关特性。javaprivate static volatile boolean transferSupported true; private static volatile boolean pipeSupported true; private static volatile boolean fileSupported true;transferSupported用于标记当前的系统内核是否支持 sendfile() 调用默认为 true。pipeSupported用于标记当前的系统内核是否支持文件描述符fd基于管道pipe的 sendfile() 调用默认为 true。fileSupported用于标记当前的系统内核是否支持文件描述符fd基于文件file的 sendfile() 调用默认为 true。来分析一下其中原理transferTo()方法直接将当前通道内容传输到另一个通道没有涉及到Buffer的任何操作NIO中的Buffer是JVM堆或者堆外内存但不论如何他们都是操作系统内核空间的内存。也就是说这种方式不会有内核缓冲区和用户缓冲区之间的拷贝问题。transferTo()的实现方式就是通过系统调用sendfile()当然这是Linux中的系统调用根据我们上面所写说这个过程是效率远高于从内核缓冲区到用户缓冲区的读写的。同理transferFrom()也是这种实现方式。下面以 transferTo() 的源码实现为例。FileChannelImpl 首先执行 transferToDirectly() 方法以 sendfile 的零拷贝方式尝试数据拷贝。如果系统内核不支持 sendfile进一步执行 transferToTrustedChannel() 方法以 mmap 的零拷贝方式进行内存映射这种情况下目的通道必须是 FileChannelImpl 或者 SelChImpl 类型。如果以上两步都失败了则执行 transferToArbitraryChannel() 方法基于传统的 I/O 方式完成读写具体步骤是初始化一个临时的 DirectBuffer将源通道 FileChannel 的数据读取到 DirectBuffer再写入目的通道 WritableByteChannel 里面。javapublic long transferTo(long position, long count, WritableByteChannel target) throws IOException { // 计算文件的大小 long sz size(); // 校验起始位置 if (position sz) return 0; int icount (int)Math.min(count, Integer.MAX_VALUE); // 校验偏移量 if ((sz - position) icount) icount (int)(sz - position); long n; if ((n transferToDirectly(position, icount, target)) 0) return n; if ((n transferToTrustedChannel(position, icount, target)) 0) return n; return transferToArbitraryChannel(position, icount, target); }接下来重点分析一下 transferToDirectly() 方法的实现也就是 transferTo() 通过 sendfile 实现零拷贝的精髓所在。可以看到transferToDirectlyInternal() 方法先获取到目的通道 WritableByteChannel 的文件描述符 targetFD获取同步锁然后执行 transferToDirectlyInternal() 方法。javaprivate long transferToDirectly(long position, int icount, WritableByteChannel target) throws IOException { // 省略从target获取targetFD的过程 if (nd.transferToDirectlyNeedsPositionLock()) { synchronized (positionLock) { long pos position(); try { return transferToDirectlyInternal(position, icount, target, targetFD); } finally { position(pos); } } } else { return transferToDirectlyInternal(position, icount, target, targetFD); } }最终由 transferToDirectlyInternal() 调用本地方法 transferTo0() 尝试以 sendfile 的方式进行数据传输。如果系统内核完全不支持 sendfile比如 Windows 操作系统则返回 UNSUPPORTED 并把 transferSupported 标识为 false。如果系统内核不支持 sendfile 的一些特性比如说低版本的 Linux 内核不支持 DMA gather copy 操作则返回 UNSUPPORTED_CASE 并把 pipeSupported 或者 fileSupported 标识为 false。