Categories
程式開發

字節跳動Flink 單點恢復功能實踐


背景

在字節跳動的實時計算場景中,我們有很多任務(數量2k+)會直接服務於線上,其輸出時延和穩定性會直接影響線上產品的用戶體驗,這類任務通常具有如下特點:

  • 流量大,並發高(最大的任務並行度超過1w)
  • 拓撲類似於多流Join,將各個數據源做整合輸出給下游,不依賴Checkpoint
  • 沒有使用Checkpoint 並且對短時間內的小部分數據丟失不敏感(如0.5%),但對數據輸出的持續性要求極高

在Flink 現有的架構設計中,多流Join 拓撲下單個Task 失敗會導致所有Task 重新部署,耗時可能會持續幾分鐘,導致作業的輸出斷流,這對於線上業務來說是不可接受的。

針對這一痛點,我們提出單點恢復的方案,通過對network 層的增強,使得在機器下線或者Task 失敗的情況下,以短時間內故障Task 的部分數據丟失為代價,達成以下目標:

  • 作業不發生全局重啟,只有故障Task 發生Failover
  • 非故障Task 不受影響,正常為線上提供服務

解決思路

當初遇到這些問題的時候,我們提出的想法是說能不能在機器故障下線的時候,只讓在這台機器上的Tasks 進行Failover,而這些Tasks 的上下游Tasks 能恰好感知到這些失敗的Tasks,並作出對應的措施:

  • 上游:將原本輸出到Failed Tasks 的數據直接丟棄,等待Failover 完成後再開始發送數據。
  • 下游:清空Failed Tasks 產生的不完整數據,等待Failover 完成後再重新建立連接並接受數據

根據這些想法我們思考得出幾個比較關鍵點在於:

  • 如何讓上下游感知Task Failed ?
  • 如何清空下游不完整的數據 ?
  • Failover 完成後如何與上下游重新建立連接 ?

基於以上考慮我們決定基於已有的Network 層線程模型,修改上下游對於Task Failed 後的處理邏輯,讓非故障的Tasks 短時間內完成對失敗Task 的感知操作,從而使得作業持續穩定地輸出。

當前架構

注:我們的實現基於Flink-1.9,1.11 後的網絡模型加入了Unaligned Checkpoint 的特性,可能會有所變化。

我們先將Flink 的上下游Task 通信模型簡單抽像一下:

字節跳動Flink 單點恢復功能實踐 1

Figure. 上下游通信模型

上下游Task 感知彼此狀態的邏輯,分三種情況考慮:

  • Task 因為邏輯錯誤或OOM 等原因Fail,Task 自身會主動釋放network resources,給上游發送channel close 信息,給下游發送Exception。
  • TaskManager 進程被Yarn Kill,TCP 連接會被操作系統正常關閉,上游Netty Server 和下游Netty Client 可以感知到連接狀態變化。
  • 機器斷電宕機,這個情況下操作系統不會正確關閉TCP 連接,所以Netty 的Server 和Client 可能互相感知不到,這個時候我們在deploy 新的Task 後需要做一些強制更新的處理。

可以看到,在大部分情況下,Task 是可以直接感知到上下游Task 的狀態變化。了解了基礎的通信模型之後,我們可以按照之前的解決思路繼續深入一下,分別在上游發送端和下游接收端可以做什麼樣改進來實現單點恢復。

優化方案

根據我們的解決思路,我們來繪製一下單個Task 掛了之後,整個Job 的通信流程:

字節跳動Flink 單點恢復功能實踐 2

Figure. 單點恢復流程

Map(1) 失敗之後:

  1. 將Map(1) 失敗的信息通知Source(1) 、Sink(1) 和JobManager。
  2. JobManager 開始申請新的資源準備Failover,同時上游Source(1) 和下游Sink(1) 切斷和Map(1) 的數據通道,但是Source(1) 和Sink(1) 和其他Task 的數據傳輸仍正常進行。
  3. Map(1)’ 被成功調度,和上游建立連接,JobManager 通知Sink(1) 和Map(1)’ 建立連接,數據傳輸通道被恢復。

從這個流程,我們可以將優化分為三個模塊,分別為上游發送端、下游接收端和JobManager。

上游發送端的優化

我們再細化一下上游發送端的相關細節,

字節跳動Flink 單點恢復功能實踐 3

Figure. 上游數據發送流程

(1) Netty Server 收到Client 發送的Partition Request 後,在對應的Subpartition 註冊讀取數據的SubpartitionView 和Reader。

(2) RecordWriter 發送數據到不同的Subpartitions,每個Subpartition 內部維護一個buffer 隊列,並將讀取數據的Reader 放入到Readers Queue 中。 (Task 線程)

(3) Netty 線程讀取Readers Queue,取出對應的Reader 並讀取對應Subpartition 中的buffer 數據,發送給下游。 (Netty 線程)

我們的期望是上游發送端在感知到下游Task 失敗之後,直接將發送到對應Task 的數據丟棄。那麼我們的改動邏輯,在這個示意圖中,就是Subpartition 通過Netty Server 收到下游Task Fail 的消息後,將自己設置為Unavailable,然後RecordWriter 在發送數據到指定Subpartition 時,判斷是否可用,如果不可用則直接將數據丟棄。而當Task Failover 完成後重新與上游建立連接後,再將該Subpartition 置為Available,則數據可以重新被消費。

發送端的改動比較簡單,得益於Flink 內部對Subpartition 的邏輯做了很好的抽象,並且可以很容易的通過參數來切換Subpartition 初始化的類型,我們在這裡參考PipelinedSubpartition 的實現,根據上述的邏輯,實現了我們自己的Subpartition 和對應的View。

下游接收端的優化

同樣,我們來細化一下下游接收端的細節:

字節跳動Flink 單點恢復功能實踐 4

Figure. 下游數據接收流程

仔細來看,其實和上游的線程模型頗有類似之處:

(1) InputGate 初始化所有的Channel 並通過Netty Client 和上游Server 建立連接。

(2) InputChannel 接收到數據後,緩存到buffer 隊列中並將自己的引用放入到Channels Queue 裡。 (Netty 線程)

(3) InputGate 通過InputProcessor 的調用,從Queue 里拉取Channel 並讀取Channel 中緩存的buffer 數據,如果buffer 不完整(比如只有半條record),那麼則會將不完整的buffer 暫存到InputProcessor 中。 (Task 線程)

這裡我們期望下游接收端感知到上游Task 失敗之後,能將對應InputChannel 的接收到的不完整的buffer 直接清除。不完整的buffer 存儲在InputProcessor 中,那麼我們如何讓InputProcessor 知道哪個Channel 出現了問題?

簡單的方案是說,我們在InputChannel 中直接調用InputGate 或者InputProcessor,做buffer 清空的操作,但是這樣引入一個問題,由於InputChannel 收到Error 是在Netty 線程,而InputProcessor 的操作是在Task 線程,這樣跨線程的調用打破了已有的線程模型,必然會引入鎖和調用時間的不確定性,增加架構設計的複雜度,並且因為InputProcessor 會對每一條record 都有調用,稍有不慎就會帶來性能的下降。

我們沿用已有的線程模型,Client 感知到上游Task 失敗的消息之後告知對應的Channel,Channel 向自己維護的receivedBuffers 的末尾插入一個UnavailableEvent,並等待InputProcessor 拉取並清空對應Channel 的buffer 數據。示意圖如下所示,紅色的模塊是我們新增的部分:

字節跳動Flink 單點恢復功能實踐 5

Figure. 下游改動示意圖

JobManager 重啟策略的優化

JobManager 重啟策略可以參考社區已有的RestartIndividualStrategy,比較重要的區別是,在重新deploy 這個失敗的Task 後,我們需要通過ExecutionGraph 中的拓撲信息,找到該Task 的下游Tasks,並通過Rpc 調用讓下游Tasks 和這個新的上游Tasks 重新建立連接。

這裡實現有一個難點是如果JobManager 去update 下游的Channel 信息時,舊的Channel 對應的buffer 數據還沒有被清除怎麼辦?我們這里通過新增CachedChannelProvider 來處理這一邏輯:

字節跳動Flink 單點恢復功能實踐 6

Figure. 更新Partition

如圖所示,以Channel – 1 為例,如果JobManager 更新Channel 的Rpc 請求到來時Channel 處於不可用狀態,那麼我們直接利用Rpc 請求中攜帶的Channel 信息來重新初始化Channel。以Channel – 3 為例,如果Rpc 請求到來時Channel 仍然可用,那麼我們將Channel 信息暫時緩存起來,等Channel – 3 中所有數據消費完畢後,通知CachedChannelProvider,然後再通過CachedChannelProvider 去更新Channel。

這裡還需要特別提到一點,在字節跳動內部我們實現了預留TaskManager 的功能,當Task 出現Failover 時,能夠直接使用TaskManager 的資源,大大節約了Failover 過程數據丟失的損耗。

實現中的關鍵點

整個解決的思路其實是比較清晰的,相信大家也比較容易理解,但是在實現中仍然有很多需要注意的地方,舉例如下:

  • 上面提到JobManager 發送的Rpc 請求如果過早,那麼會暫時緩存下來等待Channel 數據消費完成。而此時作業的狀態是未知的,可能一直處於僵死的狀態(比如卡在了網絡IO 或者磁盤IO 上),那麼Channel 中的Unavailable Event 就無法被InputProcessor 消費。這個時候我們通過設置一個定時器來做兜底策略,如果沒有在定時器設置的時間內完成Channel 的重新初始化,那麼該Task 就會自動下線,走單點恢復的Failover 流程​​。
  • 網絡層作為Flink 內線程模型最複雜的一個模塊,我們為了減少改動的複雜度和改動的風險,在設計上沒有新增或修改Netty 線程和Task 線程之間通信的模型,而是藉助於已有的線程模型來實現單點恢復的功能。但在實現過程中因為給Subpartition 和Channel 增加了類似isAvailable 的狀態位,所以在這些狀態的修改上需要特別注意線程可見性的處理,避免多線程讀取狀態不一致的情況發生。

收益

目前在字節跳動內部,單點恢復功能已經上線了1000+ 作業,在機器下線、網絡抖動的情況下,下游在上游作業做Failover 的過程幾乎沒有感知。

接下來我們以下面這個作業拓撲為例,在作業正常運行時我們手動Kill 一個Container,來看看不同並行度作業開啟單點恢復的效果:

字節跳動Flink 單點恢復功能實踐 7

Figure. 測試作業拓撲

我們在1000 和4000 並行度的作業上進行測試,每個slot 中有2 個Source 和1 個Joiner 共3 個Task,手動Kill 一個Container 後,從故障恢復時間和斷流影響兩個維度進行收益計算:

字節跳動Flink 單點恢復功能實踐 8

結論: 可以看到,在4000 個Slot 的作業裡,如果不開啟單點恢復,作業整體的Failover 時間為81s,同時 對於下游服務來說 ,上游服務斷流81s,這在實時服務線上的場景中明顯是不可接受的。而開啟了單點恢復和預留資源後,Kill 1 個Container 只會影響4 個Slot,且Failover 的時間只有5s,同時 對於下游服務來說,上游服務產生的數據減少4/4000=千分之一,持續5s ,效果是非常顯而易見的。

本文轉載自公眾號字節跳動技術團隊(ID:toutiaotechblog)。

原文鏈接

字節跳動Flink 單點恢復功能實踐