Categories
程式開發

Go有了新的基於反射的協議緩衝區API


1byg1b4A5KSjtTzAlg9YjAw.jpeg

導言

Go 基於 I/O multiplexing 和 goroutine 構建了一個簡潔而高性能的原生網絡模型(基於 Go 的 I/O 多路復用 netpoll ),提供了 goroutine-per-connection 這樣簡單的網絡編程模式。在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯,極大地降低了開發者編寫網絡應用時的心智負擔,且借助於Go runtime scheduler 對goroutines 的高效調度,這個原生網絡模型不論從適用性還是性能上都足以滿足絕大部分的應用場景。

然而,在工程性上能做到如此高的普適性和兼容性,最終暴露給開發者提供接口/模式如此簡潔,其底層必然是基於非常複雜的封裝,做了很多取捨,也有可能放棄了一些『極致』的設計和理念。事實上 netpoll 底層就是基於 epoll/kqueue/iocp 這些系統調用來做封裝的,最終暴露出 goroutine-per-connection 這樣的極簡的開發模式給使用者。

Go netpoll 在不同的操作系統,其底層使用的 I/O 多路復用技術也不一樣,可以從 Go 源碼目錄結構和對應代碼文件了解 Go 在不同平台下的網絡 I/O 模式的實現。比如,在 Linux 系統下基於 epoll,freeBSD 系統下基於 kqueue,以及 Windows 系統下基於 iocp。

本文將基於Linux 平台來解析Go netpoll 之I/O 多路復用的底層是如何基於epoll 封裝實現的,從源碼層層推進,全面而深度地解析Go netpoll 的設計理念和實現原理,以及Go 是如何利用 netpoll 來構建它的原生網絡模型的。主要涉及到的一些概念:I/O 模式、用戶/內核空間、epoll、Linux 源碼、goroutine scheduler 等等,我會盡量簡單地講解,如果有對相關概念不熟悉的同學,還是希望能提前熟悉一下。

用戶空間與內核空間

現在操作系統都是採用虛擬存儲器,那麼對 32 位操作系統而言,它的尋址空間(虛擬存儲空間)為 4G(2 的 32 次方)。操作系統的核心是內核,獨立於普通的應用程序,可以訪問受保護的內存空間,也有訪問底層硬件設備的所有權限。為了保證用戶進程不能直接操作內核(kernel),保證內核的安全,操心系統將虛擬空間劃分為兩部分,一部分為內核空間,一部分為用戶空間。針對Linux 操作系統而言,將最高的1G 字節(從虛擬地址0xC0000000 到0xFFFFFFFF),供內核使用,稱為內核空間,而將較低的3G 字節(從虛擬地址0x00000000 到0xBFFFFFFF),供各個進程使用,稱為用戶空間。

Go有了新的基於反射的協議緩衝區API 1

I/O 多路復用

在神作《UNIX 網絡編程》裡,總結歸納了 5 種 I​​/O 模型,包括同步和異步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路復用 (I/O multiplexing)
  • 信號驅動 I/O (Signal driven I/O)
  • 異步 I/O (Asynchronous I/O)

操作系統上的 I/O 是用戶空間和內核空間的數據交互,因此 I/O 操作通常包含以下兩個步驟:

  1. 等待網絡數據到達網卡(讀就緒)/等待網卡可寫(寫就緒) –> 讀取/寫入到內核緩衝區
  2. 從內核緩衝區復制數據 –> 用戶空間(讀)/從用戶空間複製數據 -> 內核緩衝區(寫)

而判定一個I/O 模型是同步還是異步,主要看第二步:數據在用戶和內核空間之間複製的時候是不是會阻塞當前進程,如果會,則是同步I/O,否則,就是異步I/O。基於這個原則,這 5 種 I​​/O 模型中只有一種異步 I/O 模型:Asynchronous I/O,其餘都是同步 I/O 模型。

這 5 種 I​​/O 模型的對比如下:

Go有了新的基於反射的協議緩衝區API 2

所謂I/O 多路復用指的就是select/poll/epoll 這一系列的多路選擇器:支持單一線程同時監聽多個文件描述符(I/O 事件),阻塞等待,並在其中某個文件描述符可讀寫時收到通知。 I/O 復用其實復用的不是 I/O 連接,而是複用線程,讓一個 thread of control 能夠處理多個連接(I/O 事件)。

select & poll

#include 

/* According to earlier standards */
#include 
#include 
#include 

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select 是 epoll 之前 Linux 使用的 I/O 事件驅動技術。

理解 select 的關鍵在於理解 fd_set,為說明方便,取 fd_set 長度為 1 字節,fd_set 中的每一 bit 可以對應一個文件描述符 fd,則 1 字節長的 fd_set 最大可以對應 8 個 fd。 select 的調用過程如下:

  1. 執行 FD_ZERO(&set), 則 set 用位表示是 0000,0000
  2. 若 fd=5, 執行 FD_SET(fd, &set); 後 set 變為 0001,0000(第 5 位置為 1)
  3. 再加入 fd=2, fd=1,則 set 變為 0001,0011
  4. 執行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都發生可讀事件,則 select 返回,此時 set 變為 0000,0011 (注意:沒有事件發生的 fd=5 被清空)

基於上面的調用過程,可以得出 select 的特點:

  • 可監控的文件描述符個數取決於 sizeof(fd_set) 的值。假設服務器上 sizeof(fd_set)=512,每 bit 表示一個文件描述符,則服務器上支持的最大文件描述符是 512*8=4096。 fd_set 的大小調整可參考 【原創】技術系列之 網絡模型(二) 中的模型 2,可以有效突破 select 可監控的文件描述符上限
  • 將 fd 加入 select 監控集的同時,還要再使用一個數據結構 array 保存放到 select 監控集中的 fd,一是用於在 select 返回後,array 作為源數據和 fd_set 進行 FD_ISSET 判斷。二是select 返回後會把以前加入的但並無事件發生的fd 清空,則每次開始select 前都要重新從array 取得fd 逐一加入(FD_ZERO 最先),掃描array 的同時取得fd 最大值maxfd,用於select 的第一個參數
  • 可見 select 模型必須在 select 前循環 array(加 fd,取 maxfd),select 返回後循環 array(FD_ISSET 判斷是否有事件發生)

所以,select 有如下的缺點:

  1. 最大並發數限制:使用 32 個整數的 32 位,即 32*32=1024 來標識 fd,雖然可修改,但是有以下第 2, 3 點的瓶頸
  2. 每次調用 select,都需要把 fd 集合從用戶態拷貝到內核態,這個開銷在 fd 很多時會很大
  3. 性能衰減嚴重:每次 kernel 都需要線性掃描整個 fd_set,所以隨著監控的描述符 fd 數量增長,其 I/O 性能會線性下降

poll 的實現和select 非常相似,只是描述fd 集合的方式不同,poll 使用pollfd 結構而不是select 的fd_set 結構,poll 解決了最大文件描述符數量限制的問題,但是同樣需要從用戶態拷貝所有的fd 到內核態,也需要線性遍歷所有的fd 集合,所以它和select 只是實現細節上的區分,並沒有本質上的區別。

epoll

epoll 是 Linux kernel 2.6 之後引入的新 I/O 事件驅動技術,I/O 多路復用的核心設計是 1 個線程處理所有連接的 等待消息准备好 I/O 事件,這一點上 epoll 和 select&poll 是大同小異的。但 select&poll 錯誤預估了一件事,當數十萬並發連接存在時,可能每一毫秒只有數百個活躍的連接,同時其餘數十萬連接在這一毫秒是非活躍的。 select&poll 的使用方法是這樣的: 返回的活跃连接 == select(全部待监控的连接)

什麼時候會調用 select&poll 呢?在你認為需要找出有報文到達的活躍連接時,就應該調用。所以,select&poll 在高並發時是會被頻繁調用的。這樣,這個頻繁調用的方法就很有必要看看它是否有效率,因為,它的輕微效率損失都會被 高频 二字所放大。它有效率損失嗎?顯而易見,全部待監控連接是數以十萬計的,返回的只是數百個活躍連接,這本身就是無效率的表現。被放大後就會發現,處理並發上萬個連接時,select&poll 就完全力不從心了。這個時候就該 epoll 上場了,epoll 通過一些新的設計和優化,基本上解決了 select&poll 的問題。

epoll 的 API 非常簡潔,涉及到的只有 3 個系統調用:

#include   
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epoll_create 創建一個epoll 實例並返回epollfd;epoll_ctl 註冊file descriptor 等待的I/O 事件(比如EPOLLIN、EPOLLOUT 等) 到epoll 實例上;epoll_wait 則是阻塞監聽epoll 實例上所有的file descriptor 的I/O 事件,它接收一個用戶空間上的一塊內存地址(events 數組),kernel 會在有I/O 事件發生的時候把文件描述符列表複製到這塊內存地址上,然後epoll_wait 解除阻塞並返回,最後用戶空間上的程序就可以對相應的fd 進行讀寫了:

#include 
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

Go有了新的基於反射的協議緩衝區API 3

與 select&poll 相比,epoll 分清了高頻調用和低頻調用。例如,epoll_ctl 相對來說就是非頻繁調用的,而 epoll_wait 則是會被高頻調用的。所以 epoll 利用 epoll_ctl 來插入或者刪除一個 fd,實現用戶態到內核態的數據拷貝,這確保了每一個 fd 在其生命週期只需要被拷貝一次,而不是每次調用 epoll_wait 的時候都拷貝一次。 epoll_wait 則被設計成幾乎沒有入參的調用,相比 select&poll 需要把全部監聽的 fd 集合從用戶態拷貝至內核態的做法,epoll 的效率就高出了一大截。

在實現上 epoll 採用紅黑樹來存儲所有監聽的 fd,而紅黑樹本身插入和刪除性能比較穩定,時間複雜度 O(logN)。通過 epoll_ctl 函數添加進來的 fd 都會被放在紅黑樹的某個節點內,所以,重複添加是沒有用的。當把fd 添加進來的時候時候會完成關鍵的一步:該fd 會與相應的設備(網卡)驅動程序建立回調關係,也就是在內核中斷處理程序為它註冊一個回調函數,在fd 相應的事件觸發(中斷)之後(設備就緒了),內核就會調用這個回調函數,該回調函數在內核中被稱為: ep_poll_callback這個回調函數其實就是把這個 fd 添加到 rdllist 這個雙向鍊錶(就緒鍊錶)中。 epoll_wait 實際上就是去檢查 rdlist 雙向鍊錶中是否有就緒的 fd,當 rdlist 為空(無就緒 fd)時掛起當前進程,直到 rdlist 非空時進程才被喚醒並返回。

相比於select&poll 調用時會將全部監聽的fd 從用戶態空間拷貝至內核態空間并線性掃描一遍找出就緒的fd 再返回到用戶態,epoll_wait 則是直接返回已就緒fd,因此epoll 的I/ O 性能不會像select&poll 那樣隨著監聽的fd 數量增加而出現線性衰減,是一個非常高效的I/O 事件驅動技術。

由於使用epoll 的I/O 多路復用需要用戶進程自己負責I/O 讀寫,從用戶進程的角度看,讀寫過程是阻塞的,所以select&poll&epoll 本質上都是同步I/O 模型,而像Windows 的IOCP 這一類的異步I/O,只需要在調用WSARecv 或WSASend 方法讀寫數據的時候把用戶空間的內存buffer 提交給kernel,kernel 負責數據在用戶空間和內核空間拷貝,完成之後就會通知用戶進程,整個過程不需要用戶進程參與,所以是真正的異步I/O。

延伸

另外,我看到有些文章說 epoll 之所以性能高是因為利用了 Linux 的 mmap 內存映射讓內核和用戶進程共享了一片物理內存,用來存放就緒 fd 列表和它們的數據 buffer,所以用戶進程在 epoll_wait 返回之後用戶進程就可以直接從共享內存那裡讀取/寫入數據了,這讓我很疑惑,因為首先看 epoll_wait 的函數聲明:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二個參數:就緒事件列表,是需要在用戶空間分配內存然後再傳給 epoll_wait 的,如果內核會用 mmap 設置共享內存,直接傳遞一個指針進去就行了,根本不需要在用戶態分配內存,多此一舉。其次,內核和用戶進程通過mmap 共享內存是一件極度危險的事情,內核無法確定這塊共享內存什麼時候會被回收,而且這樣也會賦予用戶進程直接操作內核數據的權限和入口,非常容易出現大的系統漏洞,因此一般極少會這麼做。所以我很懷疑 epoll 是不是真的在 Linux kernel 裡用了 mmap,我就去看了下最新版本(5.3.9)的 Linux kernel 源碼:

/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
             int maxevents, int timeout)
{
    // ...
  
    /* Time to fish for events ... */
    error = ep_poll(ep, events, maxevents, timeout);
}

// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
    // ...
  
    send_events:
    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */
  
    // 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
    // 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
    // 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
    // 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
    if (!res && eavail &&
            !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
        goto fetch_events;
  
    // ...
}

// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
                struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;

    esed.maxevents = maxevents;
    esed.events = events;
    // 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
    // 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
    ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
    return esed.res;
}

// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
    // ...
  
    /*
     * If the event mask intersect the caller-requested one,
     * deliver the event to userspace. Again, ep_scan_ready_list()
     * is holding ep->mtx, so no operations coming from userspace
     * can change the item.
     */
    revents = ep_item_poll(epi, &pt, 1);
    // 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
    if (!revents)
        continue;
    // 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
    // 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
    if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
        list_add(&epi->rdllink, head);
        ep_pm_stay_awake(epi);
        if (!esed->res)
            esed->res = -EFAULT;
        return 0;
    }
  
    // ...
}

do_epoll_wait 開始層層跳轉,我們可以很清楚地看到最後內核是通過 __put_user 函數把就緒 fd 列表和事件返回到用戶空間,而 __put_user 正是內核用來拷貝數據到用戶空間的標準函數。此外,我並沒有在Linux kernel 的源碼中和epoll 相關的代碼裡找到mmap 系統調用做內存映射的邏輯,所以基本可以得出結論:epoll 在Linux kernel 裡並沒有使用mmap 來做用戶空間和內核空間的內存共享,所以那些說epoll 使用了mmap 的文章都是誤解。

Non-blocking I/O

什麼叫非阻塞 I/O,顧名思義就是:所有 I/O 操作都是立刻返回而不會阻塞當前用戶進程。 I/O 多路復用通常情況下需要和非阻塞 I/O 搭配使用,否則可能會產生意想不到的問題。比如,epoll 的 ET(邊緣觸發) 模式下,如果不使用非阻塞 I/O,有極大的概率會導致阻塞 event-loop 線程,從而降低吞吐量,甚至導致 bug。

Linux 下,我們可以通過 fcntl 系統調用來設置 O_NONBLOCK 標誌位,從而把 socket 設置成 non-blocking。當對一個 non-blocking socket 執行讀操作時,流程是這個樣子:

Go有了新的基於反射的協議緩衝區API 4

當用戶進程發出 read 操作時,如果 kernel 中的數據還沒有準備好,那麼它並不會 block 用戶進程,而是立刻返回一個 EAGAIN error。從用戶進程角度講 ,它發起一個 read 操作後,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個 error 時,它就知道數據還沒有準備好,於是它可以再次發送 read 操作。一旦 kernel 中的數據準備好了,並且又再次收到了用戶進程的 system call,那麼它馬上就將數據拷貝到了用戶內存,然後返回。

所以,non-blocking I/O 的特點是用戶進程需要不斷的主動詢問 kernel 數據好了沒有。

Go netpoll

一個典型的 Go TCP server:

package main

import (
    "fmt"
    "net"
)

func main() {
    listen, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error: ", err)
        return
    }

    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept error: ", err)
            break
        }

        // start a new goroutine to handle the new connection
        go HandleConn(conn)
    }
}
func HandleConn(conn net.Conn) {
    defer conn.Close()
    packet := make([]byte, 1024)
    for {
        // 如果没有可读数据,也就是读 buffer 为空,则阻塞
        _, _ = conn.Read(packet)
        // 同理,不可写则阻塞
        _, _ = conn.Write(packet)
    }
}

上面是一個基於 Go 原生網絡模型(基於 netpoll)編寫的一個 TCP server,模式是 goroutine-per-connection ,在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯而且對於開發者來說I/O 是否阻塞是無感知的,也就是說開發者無需考慮goroutines 甚至更底層的線程、進程的調度和上下文切換。而Go netpoll 最底層的事件驅動技術肯定是基於epoll/kqueue/iocp 這一類的I/O 事件驅動技術,只不過是把這些調度和上下文切換的工作轉移到了runtime 的Go scheduler,讓它來負責調度goroutines,從而極大地降低了程序員的心智負擔!

Go netpoll 核心

Go netpoll 通過在底層對 epoll/kqueue/iocp 的封裝,從而實現了使用同步編程模式達到異步執行的效果。總結來說,所有的網絡操作都以網絡描述符 netFD 為中心實現。 netFD 與底層PollDesc 結構綁定,當在一個netFD 上讀寫遇到EAGAIN 錯誤時,就將當前goroutine 存儲到這個netFD 對應的PollDesc 中,同時調用gopark 把當前goroutine 給park 住,直到這個netFD 上再次發生讀寫事件,才將此goroutine 給ready 激活重新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。

接下來我們通過分析最新的 Go 源碼(v1.13.4),解讀一下整個 netpoll 的運行流程。

上面的示例代碼中相關的在源碼裡的幾個數據結構和方法:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
    fd *netFD
    lc ListenConfig
}

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
    conn
}

// Conn
type conn struct {
    fd *netFD
}

type conn struct {
    fd *netFD
}

func (c *conn) ok() bool { return c != nil && c.fd != nil }

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Write(b)
    if err != nil {
        err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

netFD

net.Listen("tcp", ":8888") 方法返回了一個 *TCPListener,它是一個實現了 net.Listener 接口的 struct,而通過 listener.Accept() 接收的新連接 *TCPConn 則是一個實現了 net.Conn 接口的 struct,它內嵌了 net.conn struct。仔細閱讀上面的源碼可以發現,不管是 Listener 的 Accept 還是 Conn 的 Read/Write 方法,都是基於一個 netFD 的數據結構的操作, netFD 是一個網絡描述符,類似於Linux 的文件描述符的概念,netFD 中包含一個poll.FD 數據結構,而poll.FD 中包含兩個重要的數據結構Sysfd 和pollDesc,前者是真正的系統文件描述符,後者對是底層事件驅動的封裝,所有的讀寫超時等操作都是通過調用後者的對應方法實現的。

netFDpoll.FD 的源碼:

// Network file descriptor.
type netFD struct {
    pfd poll.FD

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex

    // System file descriptor. Immutable until Close.
    Sysfd int

    // I/O poller.
    pd pollDesc

    // Writev cache.
    iovecs *[]syscall.Iovec

    // Semaphore signaled when file is closed.
    csema uint32

    // Non-zero if this file has been set to blocking mode.
    isBlocking uint32

    // Whether this is a streaming descriptor, as opposed to a
    // packet-based descriptor like a UDP socket. Immutable.
    IsStream bool

    // Whether a zero byte read indicates EOF. This is false for a
    // message based socket connection.
    ZeroReadIsEOF bool

    // Whether this is a file rather than a network socket.
    isFile bool
}

pollDesc

前面提到了 pollDesc 是底層事件驅動的封裝,netFD 通過它來完成各種 I/O 相關的操作,它的定義如下:

type pollDesc struct {
    runtimeCtx uintptr
}

這裡的 struct 只包含了一個指針,而通過 pollDesc 的 init 方法,我們可以找到它具體的定義是在 runtime.pollDesc 這裡:

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return syscall.Errno(errno)
    }
    pd.runtimeCtx = ctx
    return nil
}

// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool    // marks event scanning error happened
    user    uint32  // user settable cookie
    rseq    uintptr // protects from stale read timers
    rg      uintptr // pdReady, pdWait, G waiting for read or nil
    rt      timer   // read deadline timer (set if rt.f != nil)
    rd      int64   // read deadline
    wseq    uintptr // protects from stale write timers
    wg      uintptr // pdReady, pdWait, G waiting for write or nil
    wt      timer   // write deadline timer
    wd      int64   // write deadline
}

runtime.pollDesc 包含自身類型的一個指針,用來保存下一個 runtime.pollDesc 的地址,以此來實現鍊錶,可以減少數據結構的大小,所有的 runtime.pollDesc 保存在 runtime.pollCache 結構中,定義如下:

type pollCache struct {
   lock  mutex
   first *pollDesc
   // PollDesc objects must be type-stable,
   // because we can get ready notification from epoll/kqueue
   // after the descriptor is closed/reused.
   // Stale notifications are detected using seq variable,
   // seq is incremented when deadlines are changed or descriptor is reused.
}

net.Listen

調用 net.Listen 之後,底層會通過 Linux 的系統調用 socket 方法創建一個 fd 分配給 listener,並用以來初始化 listener 的 netFD ,接著調用 netFD 的 listenStream 方法完成對 socket 的 bind&listen 操作以及對 netFD 的初始化(主要是對 netFD 裡的 pollDesc 的初始化),相關源碼如下:

// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O    
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.

socketFunc        func(int, int, int) (int, error)  = syscall.Socket

// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
    poll.CloseFunc(s)
    return nil, err
}

// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    // ...
  
    // 完成绑定操作
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
  
    // 完成监听操作
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
  
    // 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once

// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
    // runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
    serverInit.Do(runtime_pollServerInit)
  
    // runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到 
    // epoll 实例中,另外,它会初始化一个 pollDesc 并返回
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return syscall.Errno(errno)
    }
    // 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
    // 后续使用直接通过该指针操作
    pd.runtimeCtx = ctx
    return nil
}

var (
    // 全局唯一的 epoll fd,只在 listener fd 初始化之时被指定一次
    epfd int32 = -1 // epoll descriptor
)

// netpollinit 会创建一个 epoll 实例,然后把 epoll fd 赋值给 epfd,
// 后续 listener 以及它 accept 的所有 sockets 有关 epoll 的操作都是基于这个全局的 epfd
func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        closeonexec(epfd)
        return
    }
    println("runtime: epollcreate failed with", -epfd)
    throw("runtime: netpollinit failed")
}

// netpollopen 会被 runtime_pollOpen 调用,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

我們前面提到的 epoll 的三個基本調用,Go 在源碼裡實現了對那三個調用的封裝:

#include   
int epoll_create(int size);  
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

netFD 就是通過這三個封裝來對 epoll 進行創建實例、註冊 fd 和等待事件操作的。

Listener.Accept()

netpoll accept socket 的工作流程如下:

  1. 服務端的 netFD 在 listen 時會創建 epoll 的實例,並將 listenerFD 加入 epoll 的事件隊列
  2. netFD 在 accept 時將返回的 connFD 也加入 epoll 的事件隊列
  3. netFD 在讀寫時出現 syscall.EAGAIN 錯誤,通過 pollDesc 的 waitRead 方法將當前的 goroutine park 住,直到 ready,從 pollDesc 的 waitRead 中返回

Listener.Accept() 接收來自客戶端的新連接,具體還是調用 netFD.accept 方法來完成這個功能:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 调用 poll.FD 的 Accept 方法接受新的 socket 连接,返回 socket 的 fd
    d, rsa, errcall, err := fd.pfd.Accept()
    if err != nil {
        if errcall != "" {
            err = wrapSyscallError(errcall, err)
        }
        return nil, err
    }
    // 以 socket fd 构造一个新的 netFD,代表这个新的 socket
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    // 调用 netFD 的 init 方法完成初始化
    if err = netfd.init(); err != nil {
        fd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

netFD.accept 方法裡會再調用 poll.FD.Accept ,最後會使用 Linux 的系統調用 accept 來完成新連接的接收,並且會把 accept 的 socket 設置成非阻塞 I/O 模式:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()

    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
    for {
        // 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        // 因为 listener fd 在创建的时候已经设置成非阻塞的了,
        // 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
        if err == nil {
            return s, rsa, "", err
        }
        // 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
        switch err {
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead 方法主要負責檢測當前這個pollDesc 的上層netFD 對應的fd 是否有『期待的』I/O 事件發生,如果有就直接返回,否則就park 住當前的goroutine 並持續等待直至對應的fd 上發生可讀/可寫或者其他『期待的』I/O 事件為止,然後它就會返回到外層的for 循環,讓goroutine 繼續執行邏輯。

poll.FD.Accept() 返回之後,會構造一個對應這個新socket 的netFD,然後調用init() 方法完成初始化,這個init 過程和前面net.Listen() 是一樣的,調用鏈:netFD.init( ) –> poll.FD.Init() –> poll.pollDesc.init(),最終又會走到這裡:

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return syscall.Errno(errno)
    }
    pd.runtimeCtx = ctx
    return nil
}

然後把這個 socket fd 註冊到 listener 的 epoll 實例的事件隊列中去,等待 I/O 事件。

Conn.Read/Conn.Write

我們先來看看 Conn.Read 方法是如何實現的,原理其實和 Listener.Accept 是一樣的,具體調用鏈還是首先調用 conn 的 netFD.Read ,然後內部再調用 poll.FD.Read ,最後使用 Linux 的系統調用 read: syscall.Read 完成數據讀取:

// Implementation of the Conn interface.

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    runtime.KeepAlive(fd)
    return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if len(p) == 0 {
        // If the caller wanted a zero byte read, return immediately
        // without trying (but after acquiring the readLock).
        // Otherwise syscall.Read returns 0, nil which looks like
        // io.EOF.
        // TODO(bradfitz): make it wait for readability? (Issue 15735)
        return 0, nil
    }
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        // 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
        // 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            // err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
            if err == syscall.EAGAIN && fd.pd.pollable() {
                // 如果当前没有发生期待的 I/O 事件,那么 waitRead 
                // 会通过 park goroutine 让逻辑 block 在这里
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }

            // On MacOS we can see EINTR here if the user
            // pressed ^Z.  See issue #22838.
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

conn.Writeconn.Read 的原理是一致的,它也是通過類似 pollDesc.waitReadpollDesc.waitWrite 來 park 住 goroutine 直至期待的 I/O 事件發生才返回,而 pollDesc.waitWrite 的內部實現原理和 pollDesc.waitRead 是一樣的,都是基於 runtime_pollWait ,這裡就不再贅述。

pollDesc.waitRead

pollDesc.waitRead 內部調用了 runtime_pollWait 來達成無 I/O 事件時 park 住 goroutine 的目的:

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    // 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
    // 这里的 for 循环是为了一直等到 io ready
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return 0
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    // gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
    // 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to WAIT
    // 这个 for 循环是为了等待 io ready 或者 io wait
    for {
        old := *gpp
        // gpp == pdReady 表示此时已有期待的 I/O 事件发生,
        // 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // need to recheck error states after setting gpp to WAIT
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
  
    // waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
    // 通常来说  netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark 
    // 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
    // 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
    // gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
    // 把 g 添加到链表里返回,接着重新调度 goroutine
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent READY notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
  // gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
    // 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
    mcall(park_m)
}

// park continuation on g0.
func park_m(gp *g) {
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()

    if fn := _g_.m.waitunlockf; fn != nil {
        // 调用 netpollblockcommit,把当前的 goroutine,
        // 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    // 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
    // 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

netpoll

前面已經從源碼的角度分析完了netpoll 是如何通過park goroutine 從而達到阻塞Accept/Read/Write 的效果,而通過調用gopark,goroutine 會被放置在某個等待隊列中(如channel 的waitq ,此時G 的狀態由 _Grunning_Gwaitting ),因此 G 必須被手動喚醒(通過 goready ),否則會丟失任務,應用層阻塞通常使用這種方式。

所以,最後還有一個非常關鍵的問題是:當 I/O 事件發生之後,netpoll 是通過什麼方式喚醒那些在 I/O wait 的 goroutine 的?答案是通過 epoll_wait ,在 Go 源碼中的 src/runtime/netpoll_epoll.go 文件中有一個 func netpoll(block bool) gList 方法,它會內部調用 epoll_wait 獲取就緒的 fd 列表,並將每個 fd 對應的 goroutine 添加到鍊錶返回

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
    if epfd == -1 {
        return gList{}
    }
    waitms := int32(-1)
    // 是否以阻塞模式调用 epoll_wait
    if !block {
        waitms = 0
    }
    var events [128]epollevent
retry:
    // 获取就绪的 fd 列表
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        goto retry
    }
    // toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        var mode int32
        // 判断发生的事件类型,读类型或者写类型
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出保存在 epollevent 里的 pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
            netpollready(&toRun, pd, mode)
        }
    }
    if block && toRun.empty() {
        goto retry
    }
    return toRun
}

// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    // mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        // 取出 gpp 存储的 g
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set READY for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // 重置 pollDesc 的 rg 或者 wg
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0
            }
            // 通过万能指针还原成 g 并返回
            return (*g)(unsafe.Pointer(old))
        }
    }
}

而 Go 在多種場景下都可能會調用 netpoll 檢查文件描述符狀態。尋找到 I/O 就緒的 socket fd,並找到這些 socket fd 對應的輪詢器中附帶的信息,根據這些信息將之前等待這些 socket fd 就緒的 goroutine 狀態修改為 _Grunnable 。執行完 netpoll 之後,會返回一個就緒 fd 列表對應的 goroutine 列表,接下來將就緒的 goroutine 加入到調度隊列中,等待調度運行。

首先,在 Go runtime scheduler 正常調度 goroutine 之時就有可能會調用 netpoll 獲取到已就緒的 fd 對應的 goroutine 來調度執行:

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    // ...
  
  if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }
  
    // ...
}

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
  // ...
  
  // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(false); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }
  
  // ...
}

Go scheduler 的核心方法 schedule 裡會調用一個叫 findrunable() 的方法獲取可運行的 goroutine 來執行,而在 findrunable() 方法裡就調用了 netpoll 獲取已就緒的 fd 列表對應的 goroutine 列表。

另外, sysmon 監控線程也可能會調用到 netpoll

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
        // ...
        now := nanotime()
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000  0 && lasttrace+int64(debug.schedtrace)*1000000  0)
        }
    }
}

Go runtime 在程序啟動的時候會創建一個獨立的 M 作為監控線程,叫 sysmon ,這個線程為系統級的 daemon 線程,無需 P 即可運行, sysmon 每 20us~10ms 運行一次。 sysmon 中以輪詢的方式執行以下操作(如上面的代碼所示):

  1. 以非阻塞的方式調用 runtime.netpoll ,從中找出能從網絡 I/O 中喚醒的 G,並調用 injectglist ,將其插入調度器的 runnable 列表中(全局),調度觸發時,有可能從這個全局 runnable 列表獲取 G。然後再循環調用 startm ,直到所有 P 都不處於 _Pidle 狀態。
  2. 調用 retake ,搶占長時間處於 _Psyscall 狀態的 P。

綜上,Go 借助於 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設計出了自己的 I/O 多路復用 netpoll,成功地讓 Listener.Accept / conn.Read / conn.Write 等方法從開發者的角度看來是同步模式。

Go netpoll 的價值

通過前面對源碼的分析,我們現在知道Go netpoll 依託於runtime scheduler,為開發者提供了一種強大的同步網絡編程模式;然而,Go netpoll 存在的意義卻遠不止於此,Go netpoll I/O多路復用搭配Non-blocking I/O 而打造出來的這個原生網絡模型,它最大的價值是把網絡I/O 的控制權牢牢掌握在Go 自己的runtime 裡,關於這一點我們需要從Go的runtime scheduler 說起,Go 的GPM 調度模型如下:

Go有了新的基於反射的協議緩衝區API 5

G 在運行過程中如果被阻塞在某個 system call 操作上,那麼不光 G 會阻塞,執行該 G 的 M 也會解綁 P(實質是被 sysmon 搶走了),與 G 一起進入 sleep 狀態。如果此時有 idle 的 M,則 P 與其綁定繼續執行其他 G;如果沒有 idle M,但仍然有其他 G 要去執行,那麼就會創建一個新的 M。當阻塞在 system call 上的 G 完成 syscall 調用後,G 會去嘗試獲取一個可用的 P,如果沒有可用的 P,那麼 G 會被標記為 _Grunnable 並把它放入全局的 runqueue 中等待調度,之前的那個 sleep 的 M 將再次進入 sleep。

現在清楚為什麼 netpoll 為什麼一定要使用非阻塞 I/O 了吧?就是為了避免讓操作網絡I/O 的goroutine 陷入到系統調用從而進入內核態,因為一旦進入內核態,整個程序的控制權就會發生轉移(到內核),不再屬於用戶進程了,那麼也就無法借助於Go 強大的runtime scheduler 來調度業務程序的並發了;而有了netpoll 之後,借助於非阻塞I/O ,G 就再也不會因為系統調用的讀寫而陷入內核態,當G 被阻塞在某個network I/O 操作上時,實際上它不是因為陷入內核態被阻塞住了,而是被Go runtime 調用gopark 給park 住了,此時G 會被放置到某個wait queue 中,而M 會嘗試運行下一個 _Grunnable 的 G,如果此時沒有 _Grunnable 的 G 供 M 運行,那麼 M 將解綁 P,並進入 sleep 狀態。當 I/O available,在 wait queue 中的 G 會被喚醒,標記為 _Grunnable ,放入某個可用的 P 的 local 隊列中,綁定一個 M 恢復執行。

Go netpoll 的問題

Go netpoll 的設計不可謂不精巧、性能也不可謂不高,配合 goroutine 開發網絡應用的時候就一個字:爽。因此 Go 的網絡編程模式是及其簡潔高效的。然而,沒有任何一種設計和架構是完美的, goroutine-per-connection 這種模式雖然簡單高效,但是在某些極端的場景下也會暴露出問題:goroutine 雖然非常輕量,它的自定義棧內存初始值僅為2KB,後面按需擴容;海量連接的業務場景下, goroutine-per-connection ,此時goroutine 數量以及消耗的資源就會呈線性趨勢暴漲,首先給Go runtime scheduler 造成極大的壓力和侵占系統資源,然後資源被侵占又反過來影響runtime 的調度,導致性能大幅下降;此外,我們通過源碼可以知道,Go netpoll 會通過 sync.Once 確保只初始化一個epoll 實例,也就是說它是single event-loop 模式,接受新連接和處理I/O 事件是全部放在一個thread 裡的,所以在海量連接同時又高頻創建和銷毀連接的業務場景下有可能會導致性能瓶頸。

Reactor 模式

目前在 Linux 平台下構建的高性能網絡程序中,大都使用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。

Reactor 模式本質上指的是使用 I/O 多路复用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。

通常設置一個主線程負責做 event-loop 事件循環和 I/O 讀寫,通過 select/poll/epoll_wait 等系統調用監聽 I/O 事件,業務邏輯提交給其他工作線程去做。而所謂『非阻塞I/O』的核心思想是指避免阻塞在read() 或者write() 或者其他的I/O 系統調用上,這樣可以最大限度的複用event-loop 線程,讓一個線程能服務於多個sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數上(select/poll/epoll_wait)。

Reactor 模式通常的工作流程如下:

  • Server 端完成在 bind&listen 之後,將 listenfd 註冊到 epollfd 中,最後進入 event-loop 事件循環。循環過程中會調用 select/poll/epoll_wait 阻塞等待,若有在 listenfd 上的新連接事件則解除阻塞返回,並調用 socket.accept 接收新連接 connfd,並將 connfd 加入到 epollfd 的 I/O 復用(監聽)隊列。
  • 當 connfd 上發生可讀/可寫事件也會解除 select/poll/epoll_wait 的阻塞等待,然後進行 I/O 讀寫操作,這裡讀寫 I/O 都是非阻塞 I/O,這樣才不會阻塞 event-loop 的下一個循環。然而,這樣容易割裂業務邏輯,不易理解和維護。
  • 調用 read 讀取數據之後進行解碼並放入隊列中,等待工作線程處理。
  • 工作線程處理完數據之後,返回到 event-loop 線程,由這個線程負責調用 write 把數據寫回 client。

accept 連接以及 conn 上的讀寫操作若是在主線程完成,則要求是非阻塞 I/O,因為 Reactor 模式一條最重要的原則就是:I/O 操作不能阻塞 event-loop 事件循環。實際上 event loop 可能也可以是多線程的,只是一個線程裡只有一個 select/poll/epoll_wait

上面提到了Go netpoll 在某些場景下可能因為創建太多的goroutine 而過多地消耗系統資源,而在現實世界的網絡業務中,服務器持有的海量連接中在極短的時間窗口內只有極少數是active 而大多數則是idle,就像這樣(非真實數據,僅僅是為了比喻):

Go有了新的基於反射的協議緩衝區API 6

那麼為每一個連接指派一個goroutine 就顯得太過奢侈了,而Reactor 模式這種利用I/O 多路復用進而只需要使用少量線程即可管理海量連接的設計就可以在這樣網絡業務中大顯身手了:

Go有了新的基於反射的協議緩衝區API 7

在絕大部分應用場景下,我推薦大家還是遵循 Go 的 best practices,以這種 netpoll 模式來構建自己的網絡應用。然而,在某些極度追求性能、壓榨系統資源以及技術棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業務層)的業務場景下,我們可以考慮自己構建 Reactor 網絡模型。

gnet

gnet 是一個基於事件驅動的高性能和輕量級網絡框架,支持多種協議:TCP/UDP/Unix-Socket。它直接使用 epollkqueue 系統調用而非標準 Golang 網絡包:net 來構建網絡應用,它的工作原理類似兩個開源的網絡庫:nettylibuv

gnet 的亮點在於它是一個高性能、輕量級、非阻塞的純 Go 實現的傳輸層(TCP/UDP/Unix-Socket)網絡框架,開發者可以使用 gnet 來實現自己的應用層網絡協議,從而構建出自己的應用層網絡應用:比如在 gnet 上實現 HTTP 協議就可以創建出一個 HTTP 服務器 或者 Web 開發框架,實現 Redis 協議就可以創建出自己的 Redis 服務器等等。

gnet,在某些極端的網絡業務場景,比如海量連接、高頻創建銷毀連接等等場景,gnet 在性能和資源佔用上都遠超 Go 原生的 net 包(基於 netpoll)。

gnet 已經實現了 Multi-ReactorsMulti-Reactors + Goroutine Pool 兩種網絡模型,也得益於這些網絡模型,使得 gnet 成為一個高性能和低損耗的 Go 網絡框架:

Go有了新的基於反射的協議緩衝區API 8

Go有了新的基於反射的協議緩衝區API 9

🚀 功能

  • [X] 高性能 的基於多線程/Go 程網絡模型的 event-loop 事件驅動
  • [X] 內置 Round-Robin 輪詢負載均衡算法
  • [X] 內置 goroutine 池,由開源庫 ants 提供支持
  • [X] 內置 bytes 內存池,由開源庫 pool 提供支持
  • [X] 簡潔的 APIs
  • [X] 基於 Ring-Buffer 的高效內存利用
  • [X] 支持多種網絡協議:TCP、UDP、Unix Sockets
  • [X] 支持兩種事件驅動機制:Linux 裡的 epoll 以及 FreeBSD 裡的 kqueue
  • [X] 支持異步寫操作
  • [X] 靈活的事件定時器
  • [X] SO_REUSEPORT 端口重用
  • [X] 內置多種編解碼器,支持對 TCP 數據流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,參考自 netty codec,而且支持自定制編解碼器
  • [X] 支持 Windows 平台,基於 IOCP 事件驅動機制 Go 標準網絡庫
  • [ ] 加入更多的負載均衡算法:隨機、最少連接、一致性哈希等等
  • [ ] 支持 TLS
  • [ ] 實現 gnet 客戶端

參考