Categories
程式開發

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池


並發(並行),一直以來都是一個編程語言裡的核心主題之一,也是被開發者關注最多的話題;Go語言作為一個出道以來就自帶『高並發』光環的富二代編程語言,它的並發(並行)編程肯定是值得開發者去探究的,而Go語言中的並發(並行)編程是經由Goroutine實現的,Goroutine是Golang最重要的特性之一,具有使用成本低、消耗資源低、能效高等特點,官方宣稱原生Goroutine並發成千上萬不成問題,於是它也成為Gopher們經常使用的特性。

Goroutine是優秀的,但不是完美的,在極大規模的高並發場景下,也可能會暴露出問題,什麼問題呢?又有什麼可選的解決方案?本文將通過runtime對Goroutine的調度分析,幫助大家理解它的機理和發現一些內存和調度的原理和問題,並且基於此提出一種個人的解決方案 — 一個高性能的Goroutine Pool(協程池)。

Goroutine & Scheduler

Goroutine,Go語言基於並發(並行)編程給出的自家的解決方案。 Goroutine是什麼?通常Goroutine會被當做coroutine(協程)的Golang實現,從比較粗淺的層面來看,這種認知也算是合理,但實際上,Goroutine並非傳統意義上的協程,現在主流的線程模型分三種:內核級線程模型、用戶級線程模型和兩級線程模型(也稱混合型線程模型),傳統的協程庫屬於用戶級線程模型,而Goroutine和它的Go Scheduler在底層實現上其實是屬於兩級線程模型,因此,有時候為了方便理解可以簡單把Goroutine類比成協程,但心裡一定要有個清晰的認知 — Goroutine並不等同於協程。

線程那些事兒

互聯網時代以降,由於在線用戶數量的爆炸,單台服務器處理的連接也水漲船高,迫使編程模式由從前的串行模式升級到並發模型,而幾十年來,並發模型也是一代代地升級,有IO多路復用、多進程以及多線程,這幾種模型都各有長短,現代複雜的高並發架構大多是幾種模型協同使用,不同場景應用不同模型,揚長避短,發揮服務器的最大性能,而多線程,因為其輕量和易用,成為並發編程中使用頻率最高的並發模型,而後衍生的協程等其他子產品,也都基於它,而我們今天要分析的Goroutine 也是基於線程,因此,我們先來聊聊線程的三大模型:

線程的實現模型主要有3種:內核級線程模型、用戶級線程模型和兩級線程模型(也稱混合型線程模型),它們之間最大的差異就在於用戶線程與內核調度實體(KSE,Kernel Scheduling Entity)之間的對應關係上。而所謂的內核調度實體 KSE 就是指可以被操作系統內核調度器調度的對象實體(這說的啥玩意兒,敢不敢通俗易懂一點?)。簡單來說 KSE 就是內核級線程,是操作系統內核的最小調度單元,也就是我們寫代碼的時候通俗理解上的線程了(這麼說不就懂了嘛!裝什麼13)。

用戶級線程模型

用戶線程與內核線程KSE是多對一(N : 1)的映射模型,多個用戶線程的一般從屬於單個進程並且多線程的調度是由用戶自己的線程庫來完成,線程的創建、銷毀以及多線程之間的協調等操作都是由用戶自己的線程庫來負責而無須藉助系統調用來實現。一個進程中所有創建的線程都只和同一個KSE在運行時動態綁定,也就是說,操作系統只知道用戶進程而對其中的線程是無感知的,內核的所有調度都是基於用戶進程。許多語言實現的 協程庫 基本上都屬於這種方式(比如python的gevent)。

由於線程調度是在用戶層面完成的,也就是相較於內核調度不需要讓CPU在用戶態和內核態之間切換,這種實現方式相比內核級線程可以做的很輕量級,對系統資源的消耗會小很多,因此可以創建的線程數量與上下文切換所花費的代價也會小得多。

但該模型有個原罪:並不能做到真正意義上的並發,假設在某個用戶進程上的某個用戶線程因為一個阻塞調用(比如I/O阻塞)而被CPU給中斷(搶占式調度)了,那麼該進程內的所有線程都被阻塞(因為單個用戶進程內的線程自調度是沒有CPU時鐘中斷的,從而沒有輪轉調度),整個進程被掛起。即便是多CPU的機器,也無濟於事,因為在用戶級線程模型下,一個CPU關聯運行的是整個用戶進程,進程內的子線程綁定到CPU執行是由用戶進程調度的,內部線程對CPU是不可見的,此時可以理解為CPU的調度單位是用戶進程。所以很多的協程庫會把自己一些阻塞的操作重新封裝為完全的非阻塞形式,然後在以前要阻塞的點上,主動讓出自己,並通過某種方式通知或喚醒其他待執行的用戶線程在該KSE上運行,從而避免了內核調度器由於KSE阻塞而做上下文切換,這樣整個進程也不會被阻塞了。

內核級線程模型

用戶線程與內核線程KSE是一對一(1 : 1)的映射模型,也就是每一個用戶線程綁定一個實際的內核線程,而線程的調度則完全交付給操作系統內核去做,應用程序對線程的創建、終止以及同步都基於內核提供的系統調用來完成,大部分編程語言的線程庫(比如Java的java.lang.Thread、C++11的std::thread等等)都是對操作系統的線程(內核級線程)的一層封裝,創建出來的每個線程與一個獨立的KSE靜態綁定,因此其調度完全由操作系統內核調度器去做,也就是說,一個進程裡創建出來的多個線程每一個都綁定一個KSE。

這種模型的優勢和劣勢同樣明顯:優勢是實現簡單,直接借助操作系統內核的線程以及調度器,所以CPU可以快速切換調度線程,於是多個線程可以同時運行,因此相較於用戶級線程模型它真正做到了並行處理;但它的劣勢是,由於直接借助了操作系統內核來創建、銷毀和以及多個線程之間的上下文切換和調度,因此資源成本大幅上漲,且對性能影響很大。

兩級線程模型

兩級線程模型是博採眾長之後的產物,充分吸收前兩種線程模型的優點且盡量規避它們的缺點。

在此模型下,用戶線程與內核KSE是多對多(N : M)的映射模型:首先,區別於用戶級線程模型,兩級線程模型中的一個進程可以與多個內核線程KSE關聯,也就是說一個進程內的多個線程可以分別綁定一個自己的KSE,這點和內核級線程模型相似;其次,又區別於內核級線程模型,它的進程裡的線程並不與KSE唯一綁定,而是可以多個用戶線程映射到同一個KSE,當某個KSE因為其綁定的線程的阻塞操作被內核調度出CPU時,其關聯的進程中其餘用戶線程可以重新與其他KSE綁定運行。

所以,兩級線程模型既不是用戶級線程模型那種完全靠自己調度的也不是內核級線程模型完全靠操作系統調度的,而是中間態(自身調度與系統調度協同工作),也就是— 『薛定諤的模型』(誤),因為這種模型的高度複雜性,操作系統內核開發者一般不會使用,所以更多時候是作為第三方庫的形式出現,而Go語言中的runtime調度器就是採用的這種實現方案,實現了Goroutine與KSE之間的動態關聯,不過Go語言的實現更加高級和優雅;該模型為何被稱為兩級?即用戶調度器實現用戶線程到KSE的『調度』,內核調度器實現KSE到CPU上的『調度』

G-P-M 模型概述

每一個OS線程都有一個固定大小的內存塊(一般會是2MB)來做棧,這個棧會用來存儲當前正在被調用或掛起(指在調用其它函數時)的函數的內部變量。這個固定大小的棧同時很大又很小。因為2MB的棧對於一個小小的Goroutine來說是很大的內存浪費,而對於一些複雜的任務(如深度嵌套的遞歸)來說又顯得太小。因此,Go語言做了它自己的『線程』。

在Go語言中,每一個Goroutine是一個獨立的執行單元,相較於每個OS線程固定分配2M內存的模式,Goroutine的棧採取了動態擴容方式, 初始時僅為2KB,隨著任務執行按需增長,最大可達1GB(64位機器最大是1G,32位機器最大是256M),且完全由Golang自己的調度器 Go Scheduler 來調度。此外,GC還會周期性地將不再使用的內存回收,收縮棧空間。因此,Go程序可以同時並發成千上萬個Goroutine是得益於它強勁的調度器和高效的內存模型。

Go的創造者大概對Goroutine的定位就是屠龍刀,因為他們不僅讓Goroutine作為Golang並發編程的最核心組件(開發者的程序都是基於Goroutine運行的)而且Golang中的許多標準庫的實現也到處能見到Goroutine的身影,比如net/http這個包,甚至語言本身的組件runtime運行時和GC垃圾回收器都是運行在Goroutine上的,作者對Goroutine的厚望可見一斑。

任何用戶線程最終肯定都是要交由OS線程來執行的,Goroutine(稱為G)也不例外,但是G並不直接綁定OS線程運行,而是由Goroutine Scheduler中的 P – Logical Processor (邏輯處理器)來作為兩者的『中介』,P可以看作是一個抽象的資源或者一個上下文,一個P綁定一個OS線程,在Golang的實現裡把OS線程抽象成一個數據結構:M ,G實際上是由M通過P來進行調度運行的,但是在G的層面來看,P提供了G運行所需的一切資源和環境,因此在G看來P就是運行它的“CPU”,由G、P、M 這三種由Go抽像出來的實現,最終形成了Go調度器的基本結構:

  • G: 表示Goroutine,每個Goroutine對應一個G結構體,G存儲Goroutine的運行堆棧、狀態以及任務函數,可重用。 G並非執行體,每個G需要綁定到P才能被調度執行。
  • P: Processor,表示邏輯處理器, 對G來說,P相當於CPU核,G只有綁定到P(在P的local runq中)才能被調度。對M來說,P提供了相關的執行環境(Context),如內存分配狀態(mcache),任務隊列(G)等,P的數量決定了系統內最大可並行的G的數量(前提:物理CPU核數>= P的數量),P的數量由用戶設置的GoMAXPROCS決定,但是不論GoMAXPROCS設置為多大,P的數量最大為256。
  • M: Machine,OS線程抽象,代表著真正執行計算的資源,在綁定有效的P後,進入schedule循環;而schedule循環的機制大致是從Global隊列、P的Local隊列以及wait隊列中獲取G,切換到G的執行棧上並執行G的函數,調用Goexit做清理工作並回到M,如此反复。 M並不保留G狀態,這是G可以跨M調度的基礎,M的數量是不定的,由Go Runtime調整,為了防止創建過多OS線程導致系統調度不過來,目前默認最大限制為10000個。

關於P,我們需要再絮叨幾句,在Go 1.0發布的時候,它的調度器其實G-M模型,也就是沒有P的,調度過程全由G和M完成,這個模型暴露​​出一些問題:

  • 單一全局互斥鎖(Sched.Lock)和集中狀態存儲的存在導致所有Goroutine相關操作,比如:創建、重新調度等都要上鎖;
  • Goroutine傳遞問題:M經常在M之間傳遞『可運行』的Goroutine,這導致調度延遲增大以及額外的性能損耗;
  • 每個M做內存緩存,導致內存佔用過高,數據局部性較差;
  • 由於syscall調用而形成的劇烈的worker thread阻塞和解除阻塞,導致額外的性能損耗。

這些問題實在太扎眼了,導致Go1.0雖然號稱原生支持並發,卻在並發性能上一直飽受詬病,然後,Go語言委員會中一個核心開發大佬看不下了,親自下場重新設計和實現了Go調度器(在原有的GM模型中引入了P)並且實現了一個叫做 work-stealing 的調度算法:

  • 每個P維護一個G的本地隊列;
  • 當一個G被創建出來,或者變為可執行狀態時,就把他放到P的可執行隊列中;
  • 當一個G在M裡執行結束後,P會從隊列中把該G取出;如果此時P的隊列為空,即沒有其他G可以執行, M就隨機選擇另外一個P,從其可執行的G隊列中取走一半。

該算法避免了在Goroutine調度時使用全局鎖。

至此,Go調度器的基本模型確立:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 31

G-P-M 模型調度

Go調度器工作時會維護兩種用來保存G的任務隊列:一種是一個Global任務隊列,一種是每個P維護的Local任務隊列。

當通過Go關鍵字創建一個新的Goroutine的時候,它會優先被放入P的本地隊列。為了運行Goroutine,M需要持有(綁定)一個P,接著M會啟動一個OS線程,循環從P的本地隊列裡取出一個Goroutine並執行。當然還有上文提及的 work-stealing調度算法:當M執行完了當前P的Local隊列裡的所有G後,P也不會就這麼在那躺屍啥都不干,它會先嘗試從Global隊列尋找G來執行,如果Global隊列為空,它會隨機挑選另外一個P,從它的隊列里中拿走一半的G到自己的隊列中執行。

如果一切正常,調度器會以上述的那種方式順暢地運行,但這個世界沒這麼美好,總有意外發生,以下分析Goroutine在兩種例外情況下的行為。

Go runtime會在下面的Goroutine被阻塞的情況下運行另外一個Goroutine:

  • blocking syscall (for example opening a file)
  • network input
  • channel operations
  • primitives in the sync package

這四種場景又可歸類為兩種類型:

用戶態阻塞/喚醒

當Goroutine因為channel操作或者network I/O而阻塞時(實際上Golang已經用netpoller實現了Goroutine網絡I/O阻塞不會導致M被阻塞,僅阻塞G,這裡僅僅是舉個栗子),對應的G會被放置到某個wait隊列(如channel的waitq),該G的狀態由_Gruning變為_Gwaitting,而M會跳過該G嘗試獲取並執行下一個G,如果此時沒有runnable的G供M運行,那麼M將解綁P,並進入sleep狀態;當阻塞的G被另一端的G2喚醒時(比如channel的可讀/寫通知),G被標記為runnable,嘗試加入G2所在P的runnext,然後再是P的Local隊列和Global隊列。

系統調用阻塞

當G被阻塞在某個系統調用上時,此時G會阻塞在_Gsyscall狀態,M也處於 block on syscall 狀態,此時的M可被搶占調度:執行該G的M會與P解綁,而P則嘗試與其它idle的M綁定,繼續執行其它G。如果沒有其它idle的M,但P的Local隊列中仍然有G需要執行,則創建一個新的M;當系統調用完成後,G會重新嘗試獲取一個idle的P進入它的Local隊列恢復執行,如果沒有idle的P,G會被標記為runnable加入到Global隊列。

以上就是從宏觀的角度對Goroutine和它的調度器進行的一些概要性的介紹。當然,Go的調度中更複雜的搶占式調度、阻塞調度的更多細節,大家可以自行去找相關資料深入理解,本文只講到Go調度器的基本調度過程,為後面自己實現一個Goroutine Pool提供理論基礎,這里便不再繼續深入上述說的那幾個調度了。事實上如果要完全講清楚Go調度器,一篇文章的篇幅也實在是捉襟見肘,所以想了解更多細節的同學可以去看看Go調度器GPM 模型的設計者Dmitry Vyukov 寫的該模型的設計文檔《Go Preemptive Scheduler Design》以及直接去看源碼,G-P-M模型的定義放在src/runtime/runtime2.Go裡面,而調度過程則放在了src/runtime/proc.Go裡。

大規模Goroutine的瓶頸

既然Go調度器已經這麼牛逼優秀了,我們為什麼還要自己去實現一個Golang的 Goroutine Pool 呢?事實上,優秀不代表完美,任何不考慮具體應用場景的編程模式都是耍流氓!有基於GPM的Go調度器背書,Go程序的並發編程中,可以任性地起大規模的Goroutine來執行任務,官方也宣稱用Golang寫並發程序的時候隨便起個成千上萬的Goroutine毫無壓力。

然而,你起1000個Goroutine沒有問題,10000也沒有問題,10w個可能也沒問題;那,100w個呢? 1000w個呢? (這裡只是舉個極端的例子,實際編程起這麼大規模的Goroutine的例子極少)這裡就會出問題,什麼問題呢?

  1. 首先,即便每個Goroutine只分配2KB的內存,但如果是恐怖如斯的數量,聚少成多,內存暴漲,就會對GC造成極大的負擔,寫過java的同學應該知道jvm GC那萬惡的STW(Stop The World)機制,也就是GC的時候會掛起用戶程序直到垃圾回收完,雖然Go1.8之後的GC已經去掉了STW以及優化成了並行GC,性能上有了不小的提升,但是,如果太過於頻繁地進行GC,依然會有性能瓶頸;
  2. 其次,還記得前面我們說的runtime和GC也都是Goroutine嗎?是的,如果Goroutine規模太大,內存吃緊,runtime調度和垃圾回收同樣會出問題,雖然G-P-M模型足夠優秀,韓信點兵,多多益善,但你不能不給士兵發口糧(內存)吧?巧婦難為無米之炊,沒有內存,Go調度器就會阻塞Goroutine,結果就是P的Local隊列積壓,又導致內存溢出,這就是個死循環…,甚至極有可能程序直接Crash掉,本來是想享受Golang並髮帶來的快感效益,結果卻得不償失。

一個HTTP標準庫引發的血案

我想,作為Golang擁躉的Gopher們一定都使用過它的net/http標準庫,很多人都說用Golang寫web server完全可以不用借助第三方的web framework,僅用net/http標準庫就能寫一個高性能的web server,的確,我也用過它寫過web server,簡潔高效,性能表現也相當不錯,除非有比較特殊的需求否則一般的確不用借助第三方web framework,但是天下沒有白吃的午餐,net/http為啥這麼快?要搞清這個問題,從源碼入手是最好的途徑。孔子曾經曰過:源碼面前,如同裸奔。所以,高清無碼是阻礙程序猿發展大大滴絆腳石啊,源碼才是我們進步階梯,切記切記!

接下來我們就來先看看net/http內部是怎麼實現的。

net/http接收請求且開始處理的源碼放在src/net/http/server.Go裡,先從入口函數ListenAndServe進去:

func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

看到最後那個srv.Serve調用了嗎?沒錯,這個Serve方法裡面就是實際處理http請求的邏輯,我們再進入這個方法內部:

func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    ...
    // 不断循环取出TCP连接
    for {
        // 看我看我!!!
        rw, e := l.Accept()
        ...
        // 再看我再看我!!!
        Go c.serve(ctx)
    }
}

首先,這個方法的參數(l net.Listener) ,是一個TCP監聽的封裝,負責監聽網絡端口,rw, e := l.Accept()則是一個阻塞操作,從網絡端口取出一個新的TCP連接進行處理,最後Go c.serve(ctx)就是最後真正去處理這個http請求的邏輯了,看到前面的Go關鍵字了嗎?沒錯,這裡啟動了一個新的Goroutine去執行處理邏輯,而且這是在一個無限循環體裡面,所以意味著,每來一個請求它就會開一個Goroutine去處理,相當任性粗暴啊…,不過有Go調度器背書,一般來說也沒啥壓力,然而,如果,我是說如果哈,突然一大波請求湧進來了(比方說黑客搞了成千上萬的肉雞DDOS你,沒錯!就這麼倒霉!),這時候,就很成問題了,他來10w個請求你就要開給他10w個Goroutine,來100w個你就要老老實實開給他100w個,線程調度壓力陡升,內存爆滿,再然後,你就跪了…

釜底抽薪

有問題,就一定有解決的辦法,那麼,有什麼方案可以減緩大規模Goroutine對系統的調度和內存壓力?要想解決問題,最重要的是找到造成問題的根源,這個問題根源是什麼? Goroutine的數量過多導致資源侵占,那要解決這個問題就要限制運行的Goroutine數量,合理復用,節省資源,具體就是 — Goroutine池化。

超大規模並發的場景下,不加限制的大規模的Goroutine可能造成內存暴漲,給機器帶來極大的壓力,吞吐量下降和處理速度變慢還是其次,更危險的是可能使得程序crash。所以,Goroutine池化是有其現實意義的。

首先,100w個任務,是不是真的需要100w個Goroutine來處理?未必!用1w個Goroutine也一樣可以處理,讓一個Goroutine多處理幾個任務就是了嘛,池化的核心優勢就在於對Goroutine的複用。此舉首先極大減輕了runtime調度Goroutine的壓力,其次,便是降低了對內存的消耗。

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 32

有一個商場,來了1000個顧客買東西,那麼該如何安排導購員服務這1000人呢?有兩種方案:

第一,我僱1000個導購員實行一對一服務,這種當然是最高效的,但是太浪費資源了,僱1000個人的成本極高且管理困難,這些可以先按下不表,但是每個顧客到商場買東西也不是一進來就馬上買,一般都得逛一逛,選一選,也就是得花時間挑,1000個導購員一對一盯著,效率極低;這就引出第二種方案:我只僱10個導購員,就在商場裡待命,有顧客需要諮詢的時候招呼導購員過去進行處理,導購員處理完之後就回來,等下一個顧客需要諮詢的時候再去,如此往返反复…

第二種方案有沒有覺得很眼熟?沒錯,其基本思路就是模擬一個I/O多路復用,通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。關於多路復用,不在本文的討論範圍之內,便不再贅述,詳細原理可以參考 I/O多路復用

第一種方案就是net/http標準庫採用的:來一個請求開一個Goroutine處理;第二種方案就是Goroutine Pool(I/O多路復用)。

實現一個 Goroutine Pool

因為上述陳列的一些由於Goroutine規模過大而可能引發的問題,需要有方案來解決這些問題,上文已經分析過,把Goroutine池化是一種行之有效的方案,基於此,可以實現一個Goroutine Pool,復用Goroutine,減輕runtime的調度壓力以及緩解內存壓力,依托這些優化,在大規模Goroutine並發的場景下可以極大地提高並發性能。

哎瑪!前面絮絮叨叨了這麼多,終於進入正題了,接下來就開始講解如何實現一個高性能的Goroutine Pool,秒殺原生並發的Goroutine,在執行速度和占用內存上提高並發程序的性能。好了,話不多說,開始裝逼分析。

設計思路

Goroutine Pool 的實現思路大致如下:

啟動服務之時先初始化一個Goroutine Pool 池,這個Pool維護了一個類似棧的LIFO隊列,裡面存放負責處理任務的Worker,然後在client端提交task到Pool中之後,在Pool內部,接收task之後的核心操作是:

  1. 檢查當前Worker隊列中是否有可用的Worker,如果有,取出執行當前的task;
  2. 沒有可用的Worker,判斷當前在運行的Worker是否已超過該Pool的容量:{是 —> 再判斷工作池是否為非阻塞模式:[是 ——> 直接返回 nil,否 ——> 阻塞等待直至有Worker被放回Pool],否 —> 新開一個Worker(Goroutine)處理};
  3. 每個Worker執行完任務之後,放回Pool的隊列中等待。

核心調度流程如下:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 33

按照這個設計思路,​​我實現了一個高性能的Goroutine Pool,較好地解決了上述的大規模調度和資源佔用的問題,在執行速度和內存佔用方面相較於原生Goroutine並發佔有明顯的優勢,尤其是內存佔用,因為複用,所以規避了無腦啟動大規模Goroutine的弊端,可以節省大量的內存。

此外,該調度系統還有一個清理過期Worker的定時任務,該任務在初始化一個Pool之時啟動,每隔一定的時間間隔去檢查空閒Worker隊列中是否有已經過期的Worker,有則清理掉,通過定時清理過期worker,進一步節省系統資源。

完整的項目代碼可以在我的github上獲取:傳送門,也歡迎提意見和交流。

實現細節

Goroutine Pool的設計原理前面已經講過了,整個調度過程相信大家應該可以理解了,但是有一句老話說得好,空談誤國,實幹興邦,設計思路有了,具體實現的時候肯定會有很多細節、難點,接下來我們通過分析這個Goroutine Pool的幾個核心實現以及它們的聯動來引導大家過一遍Goroutine Pool的原理。

首先是Pool struct

type sig struct{}

type f func() error

// Pool accept the tasks from client,it limits the total
// of Goroutines to a given number by recycling Goroutines.
type Pool struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running Goroutines.
    running int32

    // expiryDuration set the expired time (second) of every worker.
    expiryDuration time.Duration

    // workers is a slice that store the available workers.
    workers []*Worker

    // release is used to notice the pool to closed itself.
    release chan sig

    // lock for synchronous operation.
    lock sync.Mutex

    once sync.Once
}

Pool是一個通用的協程池,支持不同類型的任務,亦即每一個任務綁定一個函數提交到池中,批量執行不同類型任務,是一種廣義的協程池;本項目中還實現了另一種協程池— 批量執行同類任務的協程池PoolWithFunc,每一個PoolWithFunc只會綁定一個任務函數pf,這種Pool適用於大批量相同任務的場景,因為每個Pool只綁定一個任務函數,因此PoolWithFunc相較於Pool會更加節省內存,但通用性就不如前者了,為了讓大家更好地理解協程池的原理,這裡我們用通用的Pool來分析。

capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker綁定一個Goroutine;running是當前正在執行任務的worker數量;expiryDuration是worker的過期時長,在空閒隊列中的worker的最新一次運行時間與當前時間之差如果大於這個值則表示已過期,定時清理任務會清理掉這個worker;workers是一個slice,用來存放空閒worker,請求進入Pool之後會首先檢查workers中是否有空閒worker,若有則取出綁定任務執行,否則判斷當前運行的worker是否已經達到容量上限,是—阻塞等待​​,否—新開一個worker執行任務;release是當關閉該Pool支持通知所有worker退出運行以防Goroutine洩露;lock是一個鎖,用以支持Pool的同步操作;once用在確保Pool關閉操作只會執行一次。

初始化Pool並啟動定期清理過期worker任務

// NewPool generates a instance of ants pool
func NewPool(size int) (*Pool, error) {
    return NewTimingPool(size, DefaultCleanIntervalTime)
}

// NewTimingPool generates a instance of ants pool with a custom timed task
func NewTimingPool(size, expiry int) (*Pool, error) {
    if size <= 0 {
        return nil, ErrInvalidPoolSize
    }
    if expiry <= 0 {
        return nil, ErrInvalidPoolExpiry
    }
    p := &Pool{
        capacity:       int32(size),
        freeSignal:     make(chan sig, math.MaxInt32),
        release:        make(chan sig, 1),
        expiryDuration: time.Duration(expiry) * time.Second,
    }
    // 启动定期清理过期worker任务,独立Goroutine运行,
    // 进一步节省系统资源
    p.monitorAndClear()
    return p, nil
}

提交任務到Pool

p.Submit(task f)如下:

// Submit submit a task to pool
func (p *Pool) Submit(task f) error {
    if len(p.release) > 0 {
        return ErrPoolClosed
    }
    w := p.getWorker()
    w.task <- task
    return nil
}

第一個if判斷當前Pool是否已被關閉,若是則不再接受新任務,否則獲取一個Pool中可用的worker,綁定該task執行。

獲取可用worker(核心)

p.getWorker()源碼:

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
    var w *Worker
    // 标志变量,判断当前正在运行的worker数量是否已到达Pool的容量上限
    waiting := false
    // 加锁,检测队列中是否有可用worker,并进行相应操作
    p.lock.Lock()
    idleWorkers := p.workers
    n := len(idleWorkers) - 1
    // 当前队列中无可用worker
    if n = p.Cap()
        
    // 当前队列有可用worker,从队列尾部取出一个使用
    } else {
        w = idleWorkers[n]
        idleWorkers[n] = nil
        p.workers = idleWorkers[:n]
    }
    // 检测完成,解锁
    p.lock.Unlock()
    // Pool容量已满,新请求等待
    if waiting {
        // 利用锁阻塞等待直到有空闲worker
        for {
            p.lock.Lock()
            idleWorkers = p.workers
            l := len(idleWorkers) - 1
            if l < 0 {
                p.lock.Unlock()
                continue
            }
            w = idleWorkers[l]
            idleWorkers[l] = nil
            p.workers = idleWorkers[:l]
            p.lock.Unlock()
            break
        }
    // 当前无空闲worker但是Pool还没有满,
    // 则可以直接新开一个worker执行任务
    } else if w == nil {
        w = &Worker{
            pool: p,
            task: make(chan f, 1),
        }
        w.run()
        // 运行worker数加一
        p.incRunning()
    }
    return w
}

上面的源碼中加了較為詳細的註釋,結合前面的設計思路,​​相信大家應該能理解獲取可用worker綁定任務執行這個協程池的核心操作,主要就是實現一個LIFO隊列用來存取可用worker達到資源復用的效果。

之所以採用LIFO後進先出隊列是因為後進先出可以保證空閒worker隊列是按照每個worker的最後運行時間從遠到近的順序排列,方便在後續定期清理過期worker時排序以及清理完之後重新分配空閒worker隊列。

這裡還要關註一個地方:達到Pool容量限制之後,額外的任務請求需要阻塞等待idle worker,這裡是為了防止無節制地創建Goroutine,事實上Go調度器有一個複用機制,每次使用Go關鍵字的時候它會檢查當前結構體M中的P中,是否有可用的結構體G。如果有,則直接從中取一個,否則,需要分配一個新的結構體G。如果分配了新的G,需要將它掛到runtime的相關隊列中,但是調度器卻沒有限制Goroutine的數量,這在瞬時性Goroutine爆發的場景下就可能來不及復用G而依然創建了大量的Goroutine ,所以ants除了復用還做了限制Goroutine數量。

其他部分可以依照註釋理解,這裡不再贅述。

任務執行

// Worker is the actual executor who runs the tasks,
// it starts a Goroutine that accepts tasks and
// performs function calls.
type Worker struct {
    // pool who owns this worker.
    pool *Pool

    // task is a job should be done.
    task chan f

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

// run starts a Goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
    Go func() {
        // 循环监听任务列表,一旦有任务立马取出运行
        for f := range w.task {
            if f == nil {
                // 退出Goroutine,运行worker数减一
                w.pool.decRunning()
                return
            }
            f()
            // worker回收复用
            w.pool.putWorker(w)
        }
    }()
}

結合前面的p.Submit(task f)p.getWorker(),提交任務到Pool之後,獲取一個可用worker,每新建一個worker實例之時都需要調用w.run()啟動一個Goroutine監聽worker的任務列表task,一有任務提交進來就執行;所以,當調用worker的sendTask(task f)方法提交任務到worker的任務隊列之後,馬上就可以被接收並執行,當任務執行完之後,會調用w.pool.putWorker(w *Worker)方法將這個已經執行完任務的worker從當前任務解綁放回Pool中,以供下個任務可以使用,至此,一個任務從提交到完成的過程就此結束,Pool調度將進入下一個循環。

Worker回收(Goroutine復用)

// putWorker puts a worker back into free pool, recycling the Goroutines.
func (p *Pool) putWorker(worker *Worker) {
    // 写入回收时间,亦即该worker的最后一次结束运行的时间
    worker.recycleTime = time.Now()
    p.lock.Lock()
    p.workers = append(p.workers, worker)
    p.lock.Unlock()
}

動態擴容或者縮小池容量

// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
    if size == p.Cap() {
        return
    }
    atomic.StoreInt32(&p.capacity, int32(size))
    diff := p.Running() - size
    if diff > 0 {
        for i := 0; i < diff; i++ {
            p.getWorker().task <- nil
        }
    }
}

定期清理過期Worker

// clear expired workers periodically.
func (p *Pool) periodicallyPurge() {
    heartbeat := time.NewTicker(p.expiryDuration)
    for range heartbeat.C {
        currentTime := time.Now()
        p.lock.Lock()
        idleWorkers := p.workers
        if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {
            p.lock.Unlock()
            return
        }
        n := 0
        for i, w := range idleWorkers {
            if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
                break
            }
            n = i
            w.task = len(idleWorkers) {
            p.workers = idleWorkers[:0]
        } else {
            p.workers = idleWorkers[n:]
        }
        p.lock.Unlock()
    }
}

定期檢查空閒worker隊列中是否有已過期的worker並清理:因為採用了LIFO後進先出隊列存放空閒worker,所以該隊列默認已經是按照worker的最後運行時間由遠及近排序,可以方便地按順序取出空閒隊列中的每個worker並判斷它們的最後運行時間與當前時間之差是否超過設置的過期時長,若是,則清理掉該Goroutine,釋放該worker,並且將剩下的未過期worker重新分配到當前Pool的空閒worker隊列中,進一步節省系統資源。

概括起來,ants Goroutine Pool的調度過程圖示如下:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 34
Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 35
Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 36
Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 37
Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 38

彩蛋

還記得前面我說除了通用的Pool struct之外,本項目還實現了一個PoolWithFunc struct—一個執行批量同類任務的協程池,PoolWithFunc相較於Pool,因為一個池只綁定一個任務函數,省去了每一次task都需要傳送一個任務函數的代價,因此其性能優勢比起Pool更明顯,這裡我們稍微講一下一個協程池只綁定一個任務函數的細節:

上碼!

type pf func(interface{}) error

// PoolWithFunc accept the tasks from client,it limits the total
// of Goroutines to a given number by recycling Goroutines.
type PoolWithFunc struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running Goroutines.
    running int32

    // expiryDuration set the expired time (second) of every worker.
    expiryDuration time.Duration

    // workers is a slice that store the available workers.
    workers []*WorkerWithFunc

    // release is used to notice the pool to closed itself.
    release chan sig

    // lock for synchronous operation.
    lock sync.Mutex

    // pf is the function for processing tasks.
    poolFunc pf

    once sync.Once
}

PoolWithFunc struct中的大部分字段和Pool struct基本一致,重點關注poolFunc pf,這是一個函數類型,也就是該Pool綁定的指定任務函數,而client提交到這種類型的Pool的數據就不再是一個任務函數task f了,而是poolFunc pf任務函數的形參,然後交由WorkerWithFunc處理:

// WorkerWithFunc is the actual executor who runs the tasks,
// it starts a Goroutine that accepts tasks and
// performs function calls.
type WorkerWithFunc struct {
    // pool who owns this worker.
    pool *PoolWithFunc

    // args is a job should be done.
    args chan interface{}

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time
}

// run starts a Goroutine to repeat the process
// that performs the function calls.
func (w *WorkerWithFunc) run() {
    Go func() {
        for args := range w.args {
            if args == nil {
                w.pool.decRunning()
                return
            }
            w.pool.poolFunc(args)
            w.pool.putWorker(w)
        }
    }()
}

上面的源碼可以看到WorkerWithFunc是一個類似Worker的結構,只不過監聽的是函數的參數隊列,每接收到一個參數包,就直接調用PoolWithFunc綁定好的任務函數poolFunc pf任務函數執行任務,接下來的流程就和Worker是一致的了,執行完任務後就把worker放回協程池,等待下次使用。

至於其他邏輯如提交task、獲取Worker綁定任務等基本複用自Pool struct,具體細節有細微差別,但原理一致,萬變不離其宗,有興趣的同學可以看我在github上的源碼:Goroutine Pool協程池 ants

Benchmarks

吹了這麼久的Goroutine Pool,那都是虛的,理論上池化可以復用Goroutine,提升性能節省內存,沒有benchmark數據之前,好像也不能服眾哈!所以,本章就來進行一次實測,驗證一下再大規模Goroutine並發的場景下,Goroutine Pool的表現是不是真的比原生Goroutine並發更好!

測試機器參數:

Pool測試

測試代碼傳送門

測試結果:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 39

這里為了模擬大規模Goroutine的場景,兩次測試的並發次數分別是100w和1000w,前兩個測試分別是執行100w個並發任務不使用Pool和使用了ants的Goroutine Pool的性能,後兩個則是1000w個任務下的表現,可以直觀的看出在執行速度和內存使用上,ants的Pool都佔有明顯的優勢。 100w的任務量,使用ants,執行速度與原生Goroutine相當甚至略快,但只實際使用了不到5w個Goroutine完成了全部任務,且內存消耗僅為原生並發的40%;而當任務量達到1000w,優勢則更加明顯了:用了70w左右的Goroutine完成全部任務,執行速度比原生Goroutine提高了100%,且內存消耗依舊保持在不使用Pool的40%左右。

PoolWithFunc測試

測試代碼傳送門

測試結果:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 40

  • Benchmarkxxx-4 格式為基准测试函数名-GoMAXPROCS,後面的-4代表測試函數運行時對應的CPU核數
  • 1 表示執行的次數
  • xx ns/op 表示每次的執行時間
  • xx B/op 表示每次執行分配的總字節數(內存消耗)
  • xx allocs/op 表示每次執行發生了多少次內存分配

因為PoolWithFunc這個Pool只綁定一個任務函數,也即所有任務都是運行同一個函數,所以相較於Pool對原生Goroutine在執行速度和內存消耗的優勢更大,上面的結果可以看出,執行速度可以達到原生Goroutine的300%,而內存消耗的優勢已經達到了兩位數的差距,原生Goroutine的內存消耗達到了ants的35倍且原生Goroutine的每次執行的內存分配次數也達到了ants45倍,1000w的任務量,ants的初始分配容量是5w,因此它完成了所有的任務依舊只使用了5w個Goroutine!事實上,ants的Goroutine Pool的容量是可以自定義的,也就是說使用者可以根據不同場景對這個參數進行調優直至達到最高性能。

吞吐量測試

上面的benchmarks出來以後,我當時的內心是這樣的:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 41

但是太順利反而讓我疑惑,因為結合我過去這20幾年的坎坷人生來看,事情應該不會這麼美好才對,果不其然,細細一想,雖然ants Groutine Pool能在大規模並發下執行速度和內存消耗都對原生Goroutine佔有明顯優勢,但前面的測試demo相信大家注意到了,裡面使用了WaitGroup,也就是用來對Goroutine同步的工具,所以上面的benchmarks中主進程會等待所有子Goroutine完成任務後才算完成一次性能測試,然而又有多少場景是單台機器需要扛100w甚至1000w同步任務的?基本沒有啊!結果就是造出了屠龍刀,可是世界上沒有龍啊!也是無情…

彼時,我內心變成了這樣:

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 42

幸好,ants在同步批量任務方面有點曲高和寡,但是如果是異步批量任務的場景下,就有用武之地了,也就是說,在大批量的任務無須同步等待完成的情況下,可以再測一下ants和原生Goroutine並發的性能對比,這個時候的性能對比也即是吞吐量對比了,就是在相同大規模數量的請求湧進來的時候,ants和原生Goroutine誰能用更快的速度、更少的內存『吞』完這些請求。

測試代碼傳送門

測試結果:

10w 吞吐量

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 43

100w 吞吐量

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 44

1000w 吞吐量

Goroutine並發調度模型深度解析之手擼一個高性能Goroutine池 45

因為在我的電腦上測試1000w吞吐量的時候原生Goroutine已經到了極限,因此程序直接把電腦拖垮了,無法正常測試了,所以1000w吞吐的測試數據只有antsPool的。

從該demo測試吞吐性能對比可以看出,使用ants的吞吐性能相較於原生Goroutine可以保持在2~6倍的性能壓制,而內存消耗則可以達到10~20倍的節省優勢。

總結

至此,一個高性能的 Goroutine Pool 開發就完成了,事實上,原理不難理解,總結起來就是一個『復用』,具體落實到代碼細節就是鎖同步、原子操作、channel通信等這些技巧的使用,ant這整個項目沒有借助任何第三方的庫,用Golang的標準庫就完成了所有功能,因為本身Golang的語言原生庫已經足夠優秀,很多時候開發Golang項目的時候是可以保持輕量且高性能的,未必事事需要藉助第三方庫。

關於ants的價值,其實前文也提及過了,ants在大規模的異步&同步批量任務處理都有著明顯的性能優勢(特別是異步批量任務),而單機上百萬上千萬的同步批量任務處理現實意義不大,但是在異步批量任務處理方面有很大的應用價值,所以我個人覺得,Goroutine Pool真正的價值還是在:

  1. 限制並發的Goroutine數量;
  2. 復用Goroutine,減輕runtime調度壓力,提升程序性能;
  3. 規避過多的Goroutine侵占系統資源(CPU&內存)。

後記

Go語言的三位最初的締造者— Rob Pike、Robert Griesemer 和Ken Thompson 中,Robert Griesemer 參與設計了Java的HotSpot虛擬機和Chrome瀏覽器的JavaScript V8引擎,Rob Pike 在大名鼎鼎的bell lab侵淫多年,參與了Plan9操作系統、C編譯器以及多種語言編譯器的設計和實現,Ken Thompson 更是圖靈獎得主、Unix之父、C語言之父。這三人在計算機史上可是元老級別的人物,特別是Ken Thompson ,是一手締造了Unix和C語言計算機領域的上古大神,所以Go語言的設計哲學有著深深的Unix烙印:簡單、模塊化、正交、組合、pipe、功能短小且聚焦等;而令許多開發者青睞於Go的簡潔、高效編程模式的原因,也正在於此。

Go語言的三個爸爸

本文從三大線程模型到Go並發調度器再到自定制的 Goroutine Pool,算是較為完整的窺探了整個Go語言並發模型的前世今生。我們也可以看到,Go的設計當然不完美,比如一直被詬病的error處理模式、不支持泛型、差強人意的包管理以及面向對像模式的過度抽象化等等,實際上沒有任何一門編程語言敢說自己是完美的。

還是那句話,任何不考慮應用場景和語言定位的爭執都毫無意義,而Go的定位從出道開始就是系統編程語言&雲計算編程語言(這個有點模糊),而Go的作者們也一直堅持的是用最簡單抽象的工程化設計完成最複雜的功能,所以如果從這個層面去看Go的並發模型,就可以看出其實除了GPM模型中引入的P ,並沒有太多革新的原創理論,兩級線程模型是早已成熟的理論,搶占式調度更不是什麼新鮮的調度模式。

Go的偉大之處是在於它誕生之初就是依照Go在谷歌:以軟件工程為目的的語言設計而設計的,Go其實就是將這些經典的理論和技術以一種優雅高效的工程化方式組合了起來,並用簡單抽象的API或語法糖開放給使用者。 Go一直致力於找尋一個高性能&開發效率的雙贏點,目前為止,它做得遠不夠完美,但足夠優秀。另外Go通過引入channel與Goroutine協同工作,將一種區別於鎖&原子操作的並發編程模式 — CSP 帶入了Go語言,對開發人員在並發編程模式上的思考有很大的啟發。

從本文中對Go調度器的分析以及antsGoroutine Pool 的設計與實現過程,對Go的並發模型做了一次解構和優化思考,在ants中的代碼實現對鎖同步、原子操作、channel通信的使用也做了一次較為全面的實踐,希望對Gopher們在Go語言並發模型與並發編程的理解上能有所裨益。

感謝閱讀。

參考

作者介紹:

潘建鋒,就職於Amazon,專注後端開發、網絡編程、高並發、微服務等。

原文鏈接:
https://taohuawu.club/high-performance-implementation-of-goroutine-pool