Categories
程式開發

Netty源碼解析– 事件循環機制實現原理


本文主要分享Netty中事件循環機制的實現。

源碼分析基於Netty 4.1

事件循環

前面分享服務端和客戶端啟動過程的文章中說過,Netty通過事件循環機制(EventLoop)處理IO事件和異步任務,簡單來說,就是通過一個死循環,不斷處理當前已發生的IO事件和待處理的異步任務。示例如下

while(true) {
process(selector.select());

process(getTask());
}

這種事件循環機制也是一種常用的IO事件處理機制,包括Redis,Mysql都使用了類似的機制。

關於異步任務,前面文章說過,EventLoop實現了(jvm)Executor的接口,execute方法可以提供異步任務。

register,bind,connect等操作,都會提交一個任務給EventLoop處理。如

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
public void run() {
register0(promise);
}
});
}

下面看一下Netty中事件循環機制相關的類。

EventExecutor,事件執行器,負責處理事件。

EventExecutorGroup維護了一個EventExecutor鍊錶,它繼承了ScheduledExecutorService,execute方法通過next方法選擇一個EventExecutor,並調用EventLoop#execute處理事件。

(EventExecutor繼承了EventExecutorGroup,可以看做一個特殊的EventExecutorGroup,其execute方法可以提交一個任務任務)

EventLoop,事件循環器,繼承了EventExecutor,通過循環不斷處理註冊於其上的Channel的IO事件。

EventLoopGroup接口則繼承了EventExecutorGroup,負責調度EventLoop。

SingleThreadEventExecutor實現了EventExecutor,它會創建一個新線程,並在該線程上處理事件,可以理解為單線程處理器。

MultithreadEventExecutorGroup實現EventExecutorGroup,可以理解為多線程處理器(實際上是維護了多個EventExecutor,一個EventExecutor可以理解為一個線程),newChild方法構造具體的EventExecutor。

MultithreadEventExecutorGroup可以配置EventExecutor數量,即線程數量。

EventExecutorChooserFactory.EventExecutorChooser負責選擇一個EventExecutor執行實際操作。

NioEventLoop繼承了SingleThreadEventExecutor,負責處理NIO事件。所以,一個NioEventLoop對象可以看做是一個線程。

NioEventLoop也實現了EventLoop接口,它實現了事件循環機制,是Netty核心類。

MultithreadEventLoopGroup繼承了MultithreadEventExecutorGroup,並實現了EventLoopGroup,其newChild方法構造具體的EventLoop。

NioEventLoopGroup#newChild會構建NioEventLoop。

EventLoop各實現類關係如下

Netty源碼解析-- 事件循環機制實現原理 1

啟動

SingleThreadEventExecutor關鍵字段

private final Queue taskQueue;// 待处理异步任务
private volatile Thread thread;// EventLoop执行线程,即SingleThreadEventExecutor创建的新线程
private final Executor executor;// java.util.concurrent.Executor,负责创建线程

當我們通過execute方法提交任務時,如果還沒有創建執行新線程,會通過SingleThreadEventExecutor#executor一個新線程,並在新線程中調用run方法(run方法由子類實現,負責實現事件循環機制,新線程是EventLoop真正執行線程)。

SingleThreadEventExecutor#execute

public void execute(Runnable task) {
...

boolean inEventLoop = inEventLoop();
// #1
addTask(task);
// #2
if (!inEventLoop) {
startThread();
// #3
if (isShutdown()) {
...
}
}
// #4
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

#1 添加任務到待處理列表

#2

inEventLoop方法,判斷當前線程是否為EventLoop執行線程

若當前線程非EventLoop執行線程,調用startThread方法啟動一個新的線程,執行run方法。

這裡可以理解為啟動EventLoop。

#3 如果當前EventLoop已關閉,拒絕任務

#4 若當前EventLoop線程阻塞正等待IO事件(Selector#select方法),調用wakeup方法喚醒線程執行該新增任務

循環機制

NioEventLoop#run方法負責實現NIO事件處理機制。

protected void run() {
int selectCnt = 0;
// #1
for (;;) {

int strategy;

// #2
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
// #3
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// #4
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// #5
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
...

// #6
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
// #7
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
// #8
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}

// #9
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}

}
}

為了版面整潔,這裡刪除了異常處理代碼。

#1 可以看到,這里通過一個死循環不斷處理IO事件和異步任務。

#2 如果當前存在待處理的任務,調用selector.selectNow(),這時會跳出switch語句,往下處理事件和任務,否則返回SelectStrategy.SELECT。

#3 curDeadlineNanos,計算延遲任務隊列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),沒有任務則返回-1。

更新nextWakeupNanos為阻塞時間。

由於頻繁調用(jvm)Selector.wakeup會造成性能消耗,NioEventLoop維護了一個喚醒標識nextWakeupNanos。 nextWakeupNanos有三種值

NONE — 執行線程被阻塞;

AWAKE — 執行線程未阻塞;

其他值– 執行線程被超時阻塞,在指定的時間後喚醒;

NioEventLoop#wakeup方法中,只有nextWakeupNanos.getAndSet(AWAKE) != AWAKE成功才調用selector.wakeup()方法。

#4

這時如果還沒有任務加入,則執行select,阻塞線程。 select方法返回結果作為新的strategy。

#5

lazySet方法,設置值之後其他線程在短期內還是可能讀到舊值

這裡將nextWakeupNanos設置為AWAKE,主要是減少wakeup方法中不必要的wakeup操作。

所以使用lazySet方法也沒有問題。

#6 selectCnt增加

舊版本的Java NIO在Linux Epoll實現上存在bug,(jvm)Selector.select方法可能在沒有任何就緒事件的情況下返回,導致CPU空轉,利用率飆升到100%。

於是,Netty計算select方法重複調用次數selectCnt,並在selectCnt大於SELECTORAUTOREBUILD_THRESHOLD配置(默認512)時,重建selector,從而規避該問題。

幸好在JDK66u4,JDK7b12已修復該Bug。

#7 processSelectedKeys方法處理IO事件,runAllTask​​s方法處理任務。

ioRatio表示執行IO事件所佔CPU時間百分比,默認50,

ioTime * (100 – ioRatio) / ioRatio,通過ioTime,ioRatio計算處理任務的CPU時間。

#8 如果執行了任務或者select方法返回有效值,直接重置selectCnt。

unexpectedSelectorWakeup方法中會在selectCnt大於SELECTORAUTOREBUILD_THRESHOLD時重建selector。

#9 如果是正在關閉狀態,則要關閉所有的Channel

IO事件

下面看一下Eventloop中如何處理IO事件。

NioEventLoop關鍵字段

Selector unwrappedSelector;// JVM中的Selector
Selector selector;// 优化后的SelectedSelectionKeySetSelector
SelectedSelectionKeySet selectedKeys;// 对(jvm)Selector#selectedKeys进行优化

SelectedSelectionKeySetSelector每次調用select前都清除SelectedSelectionKeySet

SelectedSelectionKeySet使用數組代替原Selector的中的HashSet,提高性能。數組默認大小為1024,不夠用時擴展為原大小的2倍。

NioEventLoop#構造方法-> NioEventLoop#openSelector

private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// #1
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

...

final Class selectorImplClass = (Class) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {

public Object run() {
try {
// #2
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

...

selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} ...
}
});

...
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// #3
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

#1 通过(jvm)SelectorProvider打开一个Selector

#2 构造了selectedKeySet,并通过反射将该对象设置到Selector的selectedKeys,publicSelectedKeys属性中,这样Selector监听到的事件就会存储到selectedKeySet。

#3 构造了SelectedSelectionKeySetSelector对象

NioEventLoop#select负责阻塞线程,等待IO事件

private int select(long deadlineNanos) throws IOException {
// #1
if (deadlineNanos == NONE) {
return selector.select();
}

// #2
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }

#1 一直阻塞,知道發生IO事件或加入了新任務

#2 計算阻塞時間,在原阻塞時間加上995微秒後轉化為毫秒。

如果原阻塞時間在5微秒內,就不阻塞了。

IO事件的處理流程為

NioEventLoop#processSelectedKeys -> (沒有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...

try {
int readyOps = k.readyOps();
// #1
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// #2
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// #3
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

#1 處理OP_CONNECT

移除關注事件OP_CONNECT,否則Selector.select(..)將不斷返回

前面分享客戶端啟動過程的文章說過了,這裡會調用AbstractNioUnsafe#finishConnect,完成客戶端Connect操作,可回顧《客戶端啟動過程解析》。

#2 先處理OP_WRITE事件,能夠儘早寫入數據釋放內存,這裡涉及flush操作,後面有文章解析。

#3 處理OPREAD或OPACCEPT事件。

對於ServerChannel,這裡會調用NioMessageUnsafe#read,處理OP_ACCEPT事件,可回顧《客戶端啟動過程解析》。

對於SocketChannel,這裡會調用NioByteUnsafe#read,進行讀寫操作,後面有文章解析。

異步任務

下面看一下Eventloop中如何處理異步任務。

run方法#4步驟-> SingleThreadEventExecutor#runAllTask​​s

protected boolean runAllTasks(long timeoutNanos) {
// #1
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}

final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// #2
safeExecute(task);

runTasks ++;

// #3
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// #4
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// #5
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}

#1 AbstractScheduledEventExecutor#scheduledTaskQueue中存放的是定時任務,

SingleThreadEventExecutor#taskQueue中存放的是待處理的任務。

fetchFromScheduledTaskQueue方法會獲取已到期的定時任務,移動到SingleThreadEventExecutor#taskQueue。

#2 執行獲取的任務

#3 每個64個任務檢查一次是否超時,因為nanoTime()方法也是一個相對昂貴的操作。

#4 取下一個任務,繼續處理

#5 預留的擴展方法。

NioEventLoop在Netty 4.1版本被優化,代碼做了較大改動,刪除了原來的wakeup標誌,改用nextWakeupNanos,代碼更清晰。

請參考-- 清理NioEventLoop

Netty是由事件驅動的,服務端register,bind,客戶端connect等操作都是提交異步任務給EventLoop處理的

,而Accept,Read/Writ,Connect等IO事件都都需要EventLoop的處理。

大家可以結合前面分析服務端和客戶端啟動過程的文章,理解EventLoop是如何驅動Netty工作的。

如果您覺得本文不錯,歡迎關注我的微信公眾號,您的關注是我堅持的動力!

Netty源碼解析-- 事件循環機制實現原理 2