最近一段时间都在啃Linux内核, 也给了自己机会再度深入理解Java的NIO实现,希望能获得更多东西,尝试理解以前未能理解的,会涉及少量OpenJDK源码。
因为NIO本身的实现很多牵扯到操作系统,所以需要先稍微过一下,有理解不对的地方,请指出。
涉及的Linux知识
文件描述符
对于Linux来说,一切皆为文件,设备文件、IO文件还是普通文件,都可以通过一个叫做文件描述符(FileDescriptor)的东西来进行操作,其涉及的数据结构可以自行了解VFS。
设备阻塞与非阻塞
任意对设备的操作都是默认为阻塞的,如果没有或有不可操作的资源,会被添加到wait_queue_head_t
中进行等待,直到被semaphore
通知允许执行。此时可以通过fcntl()
函数将文件描述符设置为非阻塞,若没有或有不可操作的资源,立即返回错误信息。
JVM内存结构 & 虚拟地址空间
众所周知,Linux下的每一进程都有自己的虚拟内存地址,而JVM也是一个进程,且JVM有自己的内存结构。既然如此,两者之间必有对应关系,OracleJDK7提供了NMT,用jcmd pid VM.native_memory detail
可以查看各类区域的reserved,被committed的内存大小及其地址区间,再通过pmap -p
可以看到进程内存信息。
肉眼对比地址区间可以发现,JVM heap是通过mmap分配内存的,位于进程的映射区内,而进程堆区可以被malloc进行分配,对应关系如图。

socket编程
先回顾一下几个相关函数,JVM相关实现可以看Net.c源码,这里不做赘述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
int socket(int domain, int type, int protocol);
int bind(int sockfd, struct sockaddr* addr, int addrlen);
int listen(int sockfd, int backlog);
int accept(int sockfd, struct sockaddr addr, socklen_t addrlen); int connect(int sd, struct sockaddr *server, int addr_len);
|
另,socketIO可以使用read & write
,和recv & send
两种函数,后者多了一个参数flags。
注,阻塞非阻塞模式,以下函数返回值有所区别。
1 2 3 4 5 6 7 8 9
| int write(int fd, void *buf, size_t nbytes); int read(int fd, void *buf, size_t nbytes);
int recv(int sockfd,void *buf,int len,int flags); int send(int sockfd,void *buf,int len,int flags);
|
IO多路复用
NIO在不同操作系统提供了不同实现,win-select,linux-epoll以及mac-kqueue,本文忽略windows平台,只说linux & mac下的实现。
epoll
不太想讲epoll跟select的区别,网上多的是,不过唯一要说epoll本身是fd,很多功能都基于此,也不需要select一样重复实例化,下面的kqueue也是一样。
首先是epoll是个文件,所以有可能被其他epoll/select/poll监听,所以可能会出现循环或反向路径,内核实现极其复杂冗长,有兴趣可以啃下ep_loop_check
和reverse_path_check
,我图论学得不好,看不下去。
需要说明fd、event、epfd的关系,epfd <n/n> fd <n/n> event,均是多对多的关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
struct epoll_event { __uint32_t events; epoll_data_t data; };
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
|
值得注意的是,epoll的边沿模式(EL)和水平模式(TL),
EL
只在中断信号来临时反馈,所以buffer cache
的数据未处理完,没有新数据到来是不会通知就绪的。
TL
则是会查看buffer cache
是否还有数据,只要没有被处理完,会继续通知就绪。
一个关于这两种模式的问题,就EL模式是否必须把fd设置为O_NONBLOCK。我不是很理解Linux手册中对EL的描述,为什么要和EL扯上关系,若是因为读写阻塞导致后续任务饥饿,那在TL是一样的后果。要我说,既然用了epoll,那就直接把fd设置为O_NONBLOCK得了,就没那么多事。
对此我强烈建议写过一次linux下的网络编程,加强理解,这里不写示例了。
kqueue
全网关于kqueue的文章少之又少,特别是中文,描述得比较详细的只有这篇《FreeBSD Kqueue的实现原理》,外文的就是发明者的论文和FreeBSD手册了。kqueue的数据结构我并没有完全搞懂,懒得啃FreeBSD的实现(解压出来的源码有1.05g 手动微笑)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| int kqueue(void);
int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout); struct kevent { uintpt_t ident; short filter; u_short flags; u_int fflags; intptr_t data; void *udata; }
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
|
NIO源码
先来一个NIO网络通讯的示例
Server,IOException
是要做处理的,我懒得写。示例代码
Client,read()
同 Server。示例代码
多路复用们的包装类
我很想按照demo的代码顺序讲,但感觉NIO的实现几乎围绕着SelectorImpl
写的,所以还是先来讲讲起子类与多路复用的包装类们。
EPollSelectorImpl
& EPollSelectorWapper
后者就是Linux中epoll编程的包装类,在对应的EPollArrayWrapper.c
中可以看出调用的都是上面说到的函数,实现类特意注册了一个管道用于唤醒epoll_wait
。
每种实现都是通过selector.select();
进行轮询,其实现的终极入口在SelectorImpl.doSelect(timeout)
,对于epoll来说,究极实现在EPollArrayWrapper.poll(timeout)
,最后调用的则是epoll_wait
,下面代码都是围绕着轮询实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| class EPollSelectorImpl extends SelectorImpl { protected int fd0; protected int fd1; EPollArrayWrapper pollWrapper; private Map<Integer,SelectionKeyImpl> fdToKey; private volatile boolean closed = false;
private Object interruptLock = new Object(); private boolean interruptTriggered = false;
EPollSelectorImpl(SelectorProvider sp) { super(sp); }
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } }
class EPollArrayWrapper { private final int epfd; private final AllocatedNativeObject pollArray; private final long pollArrayAddress; private int outgoingInterruptFD; private int incomingInterruptFD; private int interruptedIndex;
EPollArrayWrapper() throws IOException { epfd = epollCreate(); }
int poll(long timeout) throws IOException { updateRegistrations(); updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; }
public void interrupt() { interrupt(outgoingInterruptFD); } private static native void interrupt(int fd); }
|
EPollArrayWrapper的JNI代码,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| #define RESTARTABLE(_cmd, _result) do { \ do { \ _result = _cmd; \ } while((_result == -1) && (errno == EINTR)); \ } while(0)
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) { struct epoll_event *events = jlong_to_ptr(address); int res;
if (timeout <= 0) { RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res); } else { res = iepoll(epfd, events, numfds, timeout); } return res; }
static int iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout) { jlong start, now; int remaining = timeout; struct timeval t; int diff;
gettimeofday(&t, NULL); start = t.tv_sec * 1000 + t.tv_usec / 1000;
for (;;) { int res = epoll_wait(epfd, events, numfds, timeout); if (res < 0 && errno == EINTR) { if (remaining >= 0) { gettimeofday(&t, NULL); now = t.tv_sec * 1000 + t.tv_usec / 1000; diff = now - start; remaining -= diff; if (diff < 0 || remaining <= 0) { return 0; } start = now; } } else { return res; } } }
|
KqueueSelectorImpl
& KqueueSelectorWapper
我挺纠结是否要说kqueue,毕竟除了本身的声明过程,其他几乎与上述的epoll一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class KQueueSelectorImpl extends SelectorImpl { protected int doSelect(long timeout) throws IOException { int entries = 0; if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); entries = kqueueWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); return updateSelectedKeys(entries); } }
class KQueueArrayWrapper { int poll(long timeout) { updateRegistrations(); int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout); return updated; } private native int kevent0(int kq, long keventAddress, int keventCount, long timeout); }
|
要说不同,也就最后kevent0
的轮询,不像epoll收到中断后会继续轮询,这里是直接return 0,由用户代码继续下一次轮询。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| JNIEXPORT jint JNICALL Java_sun_nio_ch_KQueueArrayWrapper_kevent0(JNIEnv *env, jobject this, jint kq, jlong kevAddr, jint kevCount, jlong timeout) { if (timeout >= 0) { ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout % 1000) * 1000000; tsp = &ts; } else { tsp = NULL; } result = kevent(kq, NULL, 0, kevs, kevCount, tsp);
if (result < 0) { if (errno == EINTR) { result = 0; } else { JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue failed"); } } return result; }
|
由此,多路复用在JVM的实现到这为止。
Channels
讲道理,这个图看起来复杂,其实功能接口很分明,阅读难度并不大。

接口类型及其作用
Channel
顶级接口,实际只提供一个close()
。
InterruptibleChannel
注释写了用于异步关闭or中断,大概说的是AbstractInterruptibleChannel.begin()
的回调,中断后调用implCloseChannel()
。
SelectableChannel
这个就是多路复用提供的部分实现API。
NetworkChannel
网络IO,绑定、设置socket选项等。
ScatteringByteChannel
& GatheringByteChannel
就是BufferByte读写了。
SeekableByteChannel
知道lseek()
就明白是跟文件IO相关的了。
网络IO相关实现及其分析
这里我需要先说明一下configureBlocking(boolean)
方法,这实际是调用了上述说到fcntl()
,可以看下IOUtil.configureBlocking(FileDescriptor fd, boolean blocking);
的JNI源码,所以下述socket fd都是非阻塞的,有空循环
很正常。
1 2 3 4 5 6 7 8 9 10 11
|
static int configureBlocking(int fd, jboolean blocking) { int flags = fcntl(fd, F_GETFL); int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); }
|
不过有必要说明该方法的使用注意事项,一旦fd(即channel)被注册后,是不能重新设置为阻塞的。如果在注册前或不需要注册,是可以使用阻塞模式的fd进行读写操作的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public abstract class AbstractSelectableChannel extends SelectableChannel { public final SelectableChannel configureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; } }
|
在上面已经提及AbstractSelectableChannel.configureBlocking
这么小而重要的方法,有一个与其息息相关的方法就是register了。需要说的是,epfd(或kq)和被监听的fd是可以多对多的,所以每个channel都需要被维护一个selectionKey[]记录被哪些epfd监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
|
来看看ServerSocketChannel.open()
发射出去的实例SocketChannelImpl
,示例中ssc.bind(new InetSocketAddress(16767))
已经包含了bind
&listen
两个函数,这里也把accpet()
给说了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl { private static final int ST_UNINITIALIZED = -1; private static final int ST_INUSE = 0; private static final int ST_KILLED = 1; ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; }
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { synchronized (lock) { InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : Net.checkAddress(local); SecurityManager sm = System.getSecurityManager(); NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); Net.bind(fd, isa.getAddress(), isa.getPort()); Net.listen(fd, backlog < 1 ? 50 : backlog); synchronized (stateLock) { localAddress = Net.localAddress(fd); } } return this; }
public SocketChannel accept() throws IOException { synchronized (lock) { SocketChannel sc = null;
int n = 0; FileDescriptor newfd = new FileDescriptor(); InetSocketAddress[] isaa = new InetSocketAddress[1];
try { begin(); if (!isOpen()) return null; thread = NativeThread.current(); for (;;) { n = accept(this.fd, newfd, isaa); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; break; } } finally { thread = 0; end(n > 0); assert IOStatus.check(n); }
if (n < 1) return null; IOUtil.configureBlocking(newfd, true); InetSocketAddress isa = isaa[0]; sc = new SocketChannelImpl(provider(), newfd, isa); SecurityManager sm = System.getSecurityManager(); if (sm != null) { } return sc; } } }
|
Net.c算是把Linux的socket编程都写了一遍了,部分是ipv6&udp的设置,我个人不是很了解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
JNIEXPORT jint JNICALL Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6, jboolean stream, jboolean reuse, jboolean ignored) { int type = (stream ? SOCK_STREAM : SOCK_DGRAM); fd = socket(domain, type, 0); if (fd < 0) { return handleSocketError(env, errno); } setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg, sizeof(int));
setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)); setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg, sizeof(arg));
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, sizeof(arg)); }
JNIEXPORT jint JNICALL Java_sun_nio_ch_ServerSocketChannelImpl_accept0(JNIEnv *env, jobject this, jobject ssfdo, jobject newfdo, jobjectArray isaa) { for (;;) { socklen_t sa_len = alloc_len; newfd = accept(ssfd, sa, &sa_len); if (newfd >= 0) { break; } if (errno != ECONNABORTED) { break; } }
if (newfd < 0) { free((void *)sa); if (errno == EAGAIN) return IOS_UNAVAILABLE; if (errno == EINTR) return IOS_INTERRUPTED; JNU_ThrowIOExceptionWithLastError(env, "Accept failed"); return IOS_THROWN; } return 1; }
|
客户端的连接就很简单了,并不难,方法看似很长重点也就那么几行,JNI的代码就不贴了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public boolean connect(SocketAddress sa) throws IOException { for (;;) { InetAddress ia = isa.getAddress(); if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); n = Net.connect(fd, ia, isa.getPort()); if ((n == IOStatus.INTERRUPTED) && isOpen()) continue; break; } }
|
文件IO
留个位置
ByteBuffer体系
从继承关系来看,其实并不复杂,数据结构也很简单,但对于malloc
和allocateDirect
分配的空间在进程虚拟内存所处的位置却很值得拿出来探讨一番,因为涉及NIO是否真实现了零拷贝
。

Buffer的指针
就是个对数组操作的容器,内部的指针也很容易理解,直接上图上源码,不多做解释。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public abstract class Buffer { private int mark = -1; private int position = 0; private int limit; private int capacity; public final Buffer position(int newPosition) { if ((newPosition > limit) || (newPosition < 0)) throw new IllegalArgumentException(); position = newPosition; if (mark > position) mark = -1; return this; } public final Buffer limit(int newLimit) { if ((newLimit > capacity) || (newLimit < 0)) throw new IllegalArgumentException(); limit = newLimit; if (position > limit) position = limit; if (mark > limit) mark = -1; return this; } }
|
ByteBuffer
都是一些读读写写的操作,不做讲述了。
HeapByteBuffer
其实就是个byte[],这个确实没什么好讲的。
1 2 3 4 5
| class HeapByteBuffer extends ByteBuffer { HeapByteBuffer(int cap, int lim) { super(-1, 0, lim, cap, new byte[cap], 0); } }
|
DirectByteBuffer
正如类名所示direct,分配了java heap以外的「直接」内存,空间大小由JVM参数-XX:MaxDirectMemorySize
控制,默认64m。我一开始认为jvm应该会根据这个参数在进程里面分配相对于的vm_area_struct,与heap相似的管理方式。直到我看到下面DirectByteBuffer
的构造方法,吃了一鲸,并不是我想象中那样,而是DirectMemory分配的控制是交给java控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer { super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap);
long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; } }
|
这个地方我一直很让我纠结,这个「直接」内存是分配在进程的堆区还是在映射区(忽略malloc()大于128k使用mmap()),自己又实在不想浪费心力过分研读JVM源码。如果正用了malloc,DirectByteBuffer并非所谓实现Linux零拷贝。
如果是在进程堆区,最后还是要拷贝至内核空间,参考FileChannel的map(),JNI毫不吝啬直接调用mmap(),所以看到os::malloc我很疑惑是否仅仅直接交给glibc进行内存分配。
1 2 3 4 5 6 7 8 9 10 11 12 13
| UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size)) UnsafeWrapper("Unsafe_AllocateMemory"); size_t sz = (size_t)size; if (sz == 0) { return 0; } sz = round_to(sz, HeapWordSize); void* x = os::malloc(sz, mtInternal); return addr_to_java(x); UNSAFE_END
|