Categories
程式開發

GrowingIO 響應式編程探索和實踐


作者:林生生,GrowingIO 運營產品線研發經理,主要負責GrowingIO 智能運營產品線研發管理工作。

背景

GrowingIO 是一家提供增長平台的公司。在2018 年初我們推出了基於底層數據能力的智能運營平台,結合精準的用戶分群,數據採集以及多種運營方式,幫助企業客戶用數據驅動用戶運營,隨時驗證假設,助力產品增長。產品有以下特點:

支持多種觸達用戶的渠道:站內:彈窗、資源位,站外:Push、短信、Webhook。多平台支持,彈窗支持:App、Web、H5和小程序。輕鬆建立數據運營的閉環。

下圖是運營平台站外觸達業務流程圖。用戶可以隨時發起一次站外運營活動,通常是一個站外的觸點(推送、短信、Webhook)。後台系統需要查詢底層的數據平台接口,獲取此次活動對應的人群信息,同時組裝活動數據並對外投遞任務。

GrowingIO 響應式編程探索和實踐 1

在這個業務場景,需要解決如下幾個問題:

系統外界輸入是突發的,無法提前預估量級,系統需要在不斷變化的負載中保持即時響應。依賴底層數據服務,如果外部系統無法工作,為了保證回彈性需要有熔斷和恢復機制。業務流程較長,為保證及時響應需要對任務進行異步處理。

綜上,為了最大化利用服務器資源、提高服務穩定性和優化終端用戶體驗,GrowingIO 服務端團隊在異步與反應式編程上做了一些實踐。本文將介紹在優化過程中的探索與思考,希望能為讀者帶來幫助。

異步與響應式

傳統服務端程序一般採用同步阻塞模型,通過分配更多線程來支撐更多請求,這符合常人思維模式,但在突發流量的情況下,同步模型可能會導致線程池耗盡,基於一個請求一個線程的服務模式無法做到動態伸縮。

GrowingIO 響應式編程探索和實踐 2

而異步編程的做法是基於一個共享的線程池,所有操作都是回調。如果遇到耗時的操作,線程並不會阻塞等待操作完成,而是會被釋放迴線程池中繼續接受新的請求。等到耗時操作完成後(一般都是IO操作),通過消息機制重新向線程池申請線程恢復之前的請求代碼。

GrowingIO 響應式編程探索和實踐 3

我們可以簡單寫個程序簡單實驗一下,實現相同邏輯:1. 查詢db 2. 查詢外部系統3. 組裝信息返回。唯一區別是一個是同步調用的實現,另一個是採用完全異步的方式實現。

本地使用相同的jmeter 參數模擬並發測試,得到結果如下,從左到右每列的含義分別為:請求名稱、請求數目、失敗請求數目、錯誤率(本次測試中出現錯誤的請求的數量/請求的總數)、平均響應時間、最短響應時間、最大響應時間、90%用戶響應時間、95%用戶響應時間、99%用戶響應時間、吞吐量。

總體測試結果如下:

GrowingIO 響應式編程探索和實踐 4

GrowingIO 響應式編程探索和實踐 5

同步代碼測試結果

GrowingIO 響應式編程探索和實踐 6

異步代碼測試結果

同步代碼總共完成了260 次請求,平均響應時間約5 秒,因為阻塞程序耗盡了線程池導致程序出現了拒絕服務的情況,產生了13% 的錯誤率。

異步代碼整體吞吐量有明顯提升,相同時間內完成了3000 次請求。錯誤率為0 ,並且整體沒有出現拒絕服務的情況。

可以看到基於消息驅動機制的異步系統能極大提高資源利用率,提高系統的吞吐量。而響應式系統則在消息驅動的基礎上增加了三個要求:及時響應性、回彈性和可擴展性。

簡單來說具備以下四個特點的系統可以稱為一個響應式系統:

即時響應性,這個是響應式系統的核心目標。一個具有響應性的系統就是一個無論在什麼情況下都能快速對客戶的操作做出反饋的系統,包括事件、用戶請求、失敗場景,最終目的是保證客戶良好的體驗。回彈性,指的是系統從故障災難中恢復的能力。主要分兩部分,一個是系統需要考慮失敗的情況,二是系統要能從失敗中恢復回來。擴展性(彈性),指的是系統在不斷變化的工作負載之下依然保持即時響應性。可擴展分為單機縱向擴展和橫向線性擴展。這裡主要指的是系統可以通過分片、複製等方式進行橫向擴展,從而避免系統產生明顯的性能瓶頸。消息驅動的,這是響應式系統的基礎。從上面異步系統的優勢和原理分析可以看到,基於消息驅動的程序能最大化利用機器資源,同時鬆散耦合的設計創建了一個能讓業務邏輯保持清潔的環境,顯式的隔離失敗有利於系統自動恢復。

GrowingIO 響應式編程探索和實踐 7

對應的響應式編程是一種程序設計思想,在java 8 中首次引入了響應式流的規範,即Reactive Streams 接口。 Reactive Streams 非常類似於JPA 或JDBC,都是API 規範,實際使用時需要採用對應的具體實現。 JDK 提供的Reactive Streams 接口:

org.reactivestreams.Publisher“: 代表一個潛在的無界數據源,根據Subscriber 的需要發布新的數據。org.reactivestreams.Subscriber“: 數據源消費者,通過Subscription 向數據源請求數據。org.reactivestreams.Subscription“: 代表一次數據消費請求。org.reactivestreams.Processor“: 處理器,代表一個既能發布數據也能消費數據的組件。

GrowingIO 響應式編程探索和實踐 8

Reactive Streams API 的範圍是找到一組最小的接口,這些接口將描述必要的操作和實體,從而實現具有非阻塞背壓的異步數據流。社區對於Reactive Streams 的實現比較多,這裡做一個簡單的匯總和對比。

GrowingIO 響應式編程探索和實踐 9

總結一下,如果是移動設備使用rxjava 是比較合適的選擇。如果是在服務端使用spring 框架做開發,採用基於reactor 實現的webflux 更合適。如果是對性能要求很高,業務相對簡單的場景,選擇vertx 可以最大限度發揮機器性能。而gio 的真實場景是服務端的複雜業務系統,同時使用scala 作為開發語言並且使用play 作為web 開發框架。所以在系統構建之初很自然的選擇了akka 作為我們的響應式系統的實現基礎。

使用Actor 構建反應式系統

在最初的時候並沒有直接採用akka-stream,而是選擇更為簡單,建模能力更強的akka​​-actor 作為系統實現的基礎。 Akka-actor 是基於actor 模型構建的異步工具包, 使用akka-actor 可以很輕鬆的進行基於消息驅動的異步編程。 Actor 的基礎就是消息傳遞,一個actor 可以認為是一個基本的計算單元,它能接收消息並執行運算,它也可以發送消息給其他actor。 Actors 之間相互隔離,它們之間並不共享內存,所以Actor 不需要去關注鎖和內存原子性等一系列多線程常見的問題。

GrowingIO 響應式編程探索和實踐 10

Akka-actor 最核心的實現包含三個部分:

Mailbox:可以是一個有界或者無界的消息隊列,用於存放所有收到的消息。 Behavior:具體的消息處理邏輯。 State:actor 包含的狀態,每個actor 的狀態都是獨立的避免鎖競爭。

Actor 本身是不綁定線程的,相同進程的actor 共享一個線程池,mailbox 是一個runnable 對象,核心邏輯就是從隊列中取出消息調用behavior 進行處理。

override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() // 先处理系统级别消息
processMailbox() // 然后处理普通消息
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}

在同一個進程中,可以通過調整akka-actor 線程池大小來進行縱向負載伸縮。同時,akka-actor 支持在一個系統中綁定不同類型、數量的線程池。比如在一些耗時較長的IO 場景下可以單獨配置一個線程池起到隔離的目的。對需要橫向擴展的場景,akka 提供了基於gossip 協議的點對點去中心化集群解決方案akka-cluster。

GrowingIO 響應式編程探索和實踐 11

Akka-cluster 通過gossip 協議進行成員之間的發現和狀態同步,同時提供了更高層的集群工具:

Cluster Singleton:全局唯一實例,能保證實例的全局唯一性,同時在實例出現問題的時clsuter 能在另一個節點上重建它。 Cluster Sharding:通過sharding,集群中的actor 能跨越多個節點通過actorRef 標識進行交互,不需要關心它們在集群中的物理定位。 Distributed Data:當需要在一個cluster的節點之間共享數據時,Distributed Data 提供了k/v 存儲API。 Distributed Publish Subscribe:集群中的actor 可以發布訂閱點對點的廣播消息。

理論上使用akka-actor 和akka-cluster 可以使系統具備極強的擴展性(彈性)。但是在實際使用中我們並沒有採用akka-cluster 去擴展系統,原因也很簡單,akka-cluster 生產案例太少,功能上過於復雜,不利於大規模推廣。最終我們使用了傳統的消息中間件作為系統橫向擴展的解決方案。在單機內使用akka-actor,涉及到跨節點通信的場景使用消息中間件進行通信。

GrowingIO 響應式編程探索和實踐 12

在系統回彈性方面,akka-actor 提供了基於層級的監督機制。可以把整個actor 系統看做是一棵樹,每個actor 實例都是樹中的一個節點。監督機制指的是每個actor 都是其子actor 的監督者,需要針對子actor 制定一個錯誤處理策略。

GrowingIO 響應式編程探索和實踐 13

對應到具體的業務系統裡,我們將整個流程分割成多個actor 實現,為了實現監督與錯誤恢復,需要創建一個頂層route actor 來引用所有具體的業務actor 。如果某個業務actor 遇到問題並拋出了異常,異常會被監管者route actor 來處理。監管者可以選擇恢復出現問題的actor 或者重啟,也可能會將其停止掉,這依賴於問題的嚴重程度和恢復策略。 Akka-actor 中有以下4 種錯誤處理策略:

恢復子節點,保持子節點當前積累的內部狀態。重啟子節點,清除子節點的內部狀態。永久地停止子節點。拋出錯誤向上傳遞錯誤,由更高級的節點處理。

最終我們基於actor 實現了整個業務流程:當一個用戶發起一次站外活動請求,主應用(基於play)會將活動的元數據寫入數據庫中然後立馬返回結果到前端,達到及時響應的目的。同時將活動請求封裝成一個actor 消息,異步的投遞給route actor 進行後續的任務處理。 Route actor 會根據接收到的具體消息類型進行路由分發,分別是User Insight Actor(查詢人群信息) – Build Push Task Actor(查詢db 組裝task) – Checkpoint Actor(存儲task 信息)- Publish Task Actor(發布task到kafka)。

GrowingIO 響應式編程探索和實踐 14

採用消息驅動的方式設計系統取得了一些好處:

程序之間耦合性更低,每個actor 只需要維護好一小段邏輯。整個流程是異步處理的,用戶體驗良好。消息的生產和消費可以跨服務器,橫向擴展變得簡單。

從Actor 到Stream

上文提到我們創建了一個route actor 將所有業務actor 組織到一起,這樣既能起到一個監督的作用,也可以知道全局的邏輯視圖。但是這種實現方式也會帶來一個問題,整體編排較為複雜。對於帶有分支與合併邏輯的處理流更是難以描述,對後續新增流程也沒有約束,只能人為約定一個順序,比如在上面的比較靠前,可維護性比較差。

GrowingIO 響應式編程探索和實踐 15

又因為整個流程中User Insight Actor 部分依賴外部數據查詢系統,比較容易成為整個系統的瓶頸。在負載不斷變化的情況下,外部查詢可能會失敗,從而對系統整體可用性造成影響。針對這個問題需要設計對應的限流機制和重試機制。上面提到Actor 的mailbox 本身就是一個隊列,如果在負載過高的情況下消息是可以丟棄的,只需要指定actor 的maibox 類型為有界隊列即可。假如消息不能被丟棄,可以採用令牌桶算法實現限流功能。對於重試機制,User Insight Actor 本身是無狀態的,這裡很自然想到在失敗時重新發送試消息到User Insight Actor 本身進行重試。

GrowingIO 響應式編程探索和實踐 16

這個方案比較簡單,如果要滿足一些特殊場景下的需求,比如設定重試次數,延遲執行重試請求,指定重試失敗後的降級策略,只能通過定制一些邏輯實現,但是要做到代碼靈活復用需要花費大量時間進行設計。

上述方案都能滿足業務需求,總體來講通過actor 模型可以快速實現輕量業務異步封裝,但面對相對複雜業務邏輯時還是存在一些局限:

難以簡單優雅實現多異步任務編排,路由方案過於復雜,不直觀。重試機制、限流機制等和業務無關的功能複用性不高。

這也是為什麼後來採用了akka-stream 來對處理流程進行重構。 Akka-stream 是基於akka-actor 的Reactive Streams 規範實現,具備以下特點:

具有處理無限數量的元素的能力異步地按序處理元素實現了非阻塞的背壓

並在上層提供了更加抽象靈活的DSL 封裝,即source、sink、flow 組件。

Source 即響應流的源頭,源頭具有一個數據出口。我們可以通過各種數據來創建一個Source:

val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))

Sink 就是流的最終目的地,包含一個數據入口,我們可以如下來創建Sink:

val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)

Flow 就是流的中間組件,包含一個數據入口和數據出口。我們可以這樣來創建Flow:

val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)

而整個業務流可以通過基礎組件構成的圖和網絡來表示:

GrowingIO 響應式編程探索和實踐 17

流式操作可以類比成流水線,每個算子都是一道處理工序,數據源就是加工原材料,經過多道工序處理後最後輸出一個成品。

上文提到為了實現對系統速率的控制,引入了限流的邏輯,比如基於令牌桶算法的實現,只有程序拿到了令牌才能進入下一段處理邏輯,本質上這種實現方式是同步阻塞的,而且真實情況下下游節點可能完全能承載更多的請求。為了解決數據源和下游節點處理速度不一致的問題,在Reactive Streams 的規範裡引入了背壓機制,本質上是一種由處理者向數據源發起數據請求,從而進行速度調整的一種方式。 Akka-Stream 提供了一套開箱即用的背壓功能,其實現方式和Reactive Streams 一致,下游subscriber 通過發送subscription 到上游的publisher 主動請求需要處理的元素數量。這樣就能從整個數據流的源頭進行速率控制,採用pull 而不是push 的模式能讓系統按需保持最大的處理能力,同時又不會崩潰。

GrowingIO 響應式編程探索和實踐 18

下面是基於akka stream 重構後的處理流,簡單對比akka actor 的實現方式,基於操作符的組合代碼更加清晰易讀,可以輕鬆實現複雜任務編排。

GrowingIO 響應式編程探索和實踐 19

從底層實現來講,akka-stream 底層還是基於akka-actor 進行工作的,只是在上層提供了更高視角的DSL 封裝。這種靈活的編程方式能極大提高代碼復用性和可維護性。

總結

本文記錄了GrowingIO 服務團隊在針對具體業務場景進行反應式系統設計的實踐總結,從異步編程到使用actor 模型構建基於消息驅動的系統,為了降低系統複雜度提高可維護性又引入了akka-stram 作為反應式流的編排框架。最後,希望能與對反應式技術感興趣的同學多多交流,打個小廣告:我們的工程團隊持續在招聘中~ 服務端、前端、大數據各種攻城獅都缺,感興趣朋友歡迎砸簡歷 https://www.growingio.com/joinus“。

參考資料:

https://info.lightbend.com/rs/558-NCX-702/images/COLL-ebook-Reactive-Microservices-Architecture.pdf

https://learning.oreilly.com/library/view/applied-akka-patterns

https://freecontent.manning.com/akka-in-action-why-use-clustering/

https://doc.akka.io/

關於GrowingIO

GrowingIO 是國內領先的一站式數字化增長整體方案服務商。為產品、運營、市場、數據團隊及管理者提供客戶數據平台、廣告分析、產品分析、智能運營等產品和諮詢服務,幫助企業在數字化轉型的路上,提升數據驅動能力,實現更好的增長。