Categories
程式開發

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力?


眾所周知,Flink是當前最為廣泛使用的計算引擎之一,它使用checkpoint 機制進行容錯處理[1],checkpoint會將狀態快照備份到分佈式存儲系統,供後續恢復使用。在Alibaba內部,我們使用的存儲主要是HDFS,當同一個集群的Job到達一定數量後,會對HDFS造成非常大的壓力,本文將介紹一種大幅度降低HDFS壓力的方法——小文件合併。

背景

不管使用 FsStateBackend、RocksDBStateBackend 還是 NiagaraStateBackend,Flink 在進行 checkpoint 的時候,TM 會將狀態快照寫到分佈式文件系統中,然後將文件句柄發給 JM,JM 完成全局 checkpoint 快照的存儲,如下圖所示。

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 1

對於全量checkpoint 來說,TM 將每個checkpoint 內部的數據都寫到同一個文件,而對於RocksDBStateBackend/NiagaraStateBackend 的增量checkpoint [2]來說,則會將每個sst 文件寫到一個分佈式系統的文件內。當作業量很大,且作業的並發很大時,則會對底層HDFS 形成非常大的壓力:1)大量的RPC 請求會影響RPC 的響應時間(如下圖所示);2)大量文件對NameNode內存造成很大壓力。

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 2

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 3

在 Flink 中曾經嘗試使用 ByteStreamStateHandle 來解決小文件多的問題[3],將小於一定閾值的 state 直接發送到 JM,由 JM 統一寫到分佈式文件中,從而避免在 TM 端生成小文件。但是這個方案有一定的局限性,閾值設置太小,還會有很多小文件生成,閾值設置太大,則會導致 JM 內存消耗太多有 OOM 的風險。

1 小文件合併方案

針對上面的問題我們提出一種解決方案——小文件合併。

在原來的實現中,每個 sst 文件會打開一個CheckpointOutputStream,每個 CheckpointOutputStream 對應一個 FSDataOutputStream,將本地文件寫往一個分佈式文件,然後關閉 FSDataOutputStream,生成一個 StateHandle。如下圖所示:

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 4

小文件合併則會重用打開的FSDataOutputStream,直至文件大小達到預設的閾值為止,換句話說多個sst 文件會重用同一個DFS 上的文件,每個sst 文件佔用DFS 文件中的一部分,最終多個StateHandle 共用一個物理文件,如下圖所示。

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 5

在接下來的章節中我們會描述實現的細節,其中需要重點考慮的地方包括:

  1. 並發 checkpoint 的支持
    Flink 天生支持​​並發checkpoint,小文件合併方案則會將多個文件寫往同一個分佈式存儲文件中,如果考慮不當,數據會寫串或者損壞,因此我們需要有一種機制保證該方案的正確性,詳細描述參考2.1 節。

  2. 防止誤刪文件
    我們使用引用計數來記錄文件的使用情況,僅通過文件引用計數是否降為 0 進行判斷刪除,則可能誤刪文件,如何保證文件不會被錯誤刪除,我們將會在 2.2 節進行闡述。

  3. 降低空間放大
    使用小文件合併之後,只要文件中還有一個 statehandle 被使用,整個分佈式文件就不能被刪除,因此會佔用更多的空間,我們在 2.3 節描述了解決該問題的詳細方案。

  4. 異常處理
    我們將在 2.4 節闡述如何處理異常情況,包括 JM 異常和 TM 異常的情況。

2.5 節中會詳細描述在 Checkpoint 被取消或者失敗後,如何取消 TM 端的 Snapshot,如果不取消 TM 端的 Snapshot,則會導致 TM 端實際運行的 Snapshot 比正常的多。

在第 3 節中闡述了小文件合併方案與現有方案的兼容性;第 4 節則會描述小文件合併方案的優勢和不足;最後在第 5 節我們展示在生產環境下取得的效果。

2 設計實現

本節中我們會詳細描述整個小文件合併的細節,以及其中的設計要點。

這裡我們大致回憶一下 TM 端 Snapshot 的過程:

  1. TM 端 barrier 對齊
  2. TM Snapshot 同步操作
  3. TM Snapshot 異步操作

其中上傳 sst 文件到分佈式存儲系統在上面的第三步,同一個 checkpoint 內的文件順序上傳,多個 checkpoint 的文件上傳可能同時進行。

2.1 並發 checkpoint 支持

Flink 天生支持​​並發 checkpoint,因此小文件合併方案也需要能夠支持並發 checkpoint,如果不同 checkpoint 的 sst 文件同時寫往一個分佈式文件,則會導致文件內容損壞,後續無法從該文件進行 restore。

在FLINK-11937[4] 的提案中,我們會將每個checkpoint 的state 文件寫到同一個HDFS 文件,不同checkpoint 的state 寫到不同的HDFS 文件– 換句話說,HDFS 文件不跨Checkpoint 共用,從而避免了多個客戶端同時寫入同一個文件的情況。

後續我們會繼續推進跨 Checkpoint 共用文件的方案,當然在跨 Checkpoint 共用文件的方案中,並行的 Checkpoint 也會寫往不同的 HDFS 文件。

2.2 防止誤刪文件

復用底層文件之後,我們使用引用計數追踪文件的使用情況,在文件引用數降為 0 的情況下刪除文件。但是在某些情況下,文件引用數為 0 的時候,並不代表文件不會被繼續使用,可能導致文件誤刪。下面我們會詳細描述開啟並發 checkpoint 後可能導致文件誤刪的情況,以及解決方案。

以下圖為例,maxConcurrentlyCheckpoint = 2

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 6

上圖中共有 3 個 checkpoint,其中 chk-1 已經完成,chk-2 和 chk-3 都基於 chk-1 進行,chk-2 在 chk-3 前完成,chk-3 在註冊 4.sst 的時候發現,發現 4.sst 在 chk-2 中已經註冊過,會重用 chk-2 中 4.sst 對應的 stateHandle,然後取消 chk-3 中的 4.sst 的註冊,並且刪除 stateHandle,在處理完 chk-3 中 4.sst 之後,該 stateHandle 對應的分佈式文件的引用計數為 0,如果我們這個時候刪除分佈式文件,則會同時刪除 5.sst 對應的內容,導致後續無法從 chk-3 恢復。

這裡的問題是如何在 stateHandle 對應的分佈式文件引用計數降為0 的時候正確判斷是否還會繼續引用該文件,因此在整個checkpoint 完成處理之後再判斷某個分佈式文件能否刪除,如果真個checkpoint 完成發現文件沒有被引用,則可以安全刪除,否則不進行刪除。

2.3 降低空間放大

使用小文件合併方案後,每個 sst 文件對應分佈式文件中的一個 segment,如下圖所示:

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 7

文件僅能在所有segment 都不再使用時進行刪除,上圖中有4 個segment,僅segment-4 被使用,但是整個文件都不能刪除,其中segment[1-3] 的空間被浪費掉了,從實際生產環境中的數據可知,整體的空間放大率(實際佔用的空間/ 真實有用的空間)在1.3 – 1.6 之間。

為了解決空間放大的問題,在 TM 端起異步線程對放大率超過閾值的文件進行壓縮。而且僅對已經關閉的文件進行壓縮。

整個壓縮的流程如下所示:

  1. 計算每個文件的放大率
  2. 如果放大率較小則直接跳到步驟 7
  3. 如果文件 A 的放大率超過閾值,則生成一個對應的新文件 A‘(如果這個過程中創建文件失敗,則由 TM 負責清理工作)
  4. 記錄 A 與 A’ 的映射關係
  5. 在下一次 checkpoint X 往 JM 發送落在文件 A 中的 StateHandle 時,則使用 A` 中的信息生成一個新的 StateHandle 發送給 JM
  6. checkpoint X 完成後,我們增加A' 的引用計數,減少A 的引用計數,在引用計數降為0 後將文件A 刪除(如果JM 增加了A' 的引用,然後出現異常,則會從上次成功的checkpoint 重新構建整個引用計數器)
  7. 文件壓縮完成

2.4 異常情況處理

在 checkpoint 的過程中,主要有兩種異常:JM 異常和 TM 異常,我們將分情況闡述。

2.4.1 JM 異常

JM 端主要記錄StateHandle 以及文件的引用計數,引用計數相關數據不需要持久化到外存中,因此不需要特殊的處理,也不需要考慮transaction 等相關操作,如果JM 發送failover,則可以直接從最近一次complete checkpoint 恢復,並重建引用計數即可。

2.4.2 TM 異常

TM 異常可以分為兩種:1)該文件在之前 checkpoint 中已經匯報過給 JM;2)文件尚未匯報過給 JM,我們會分情況闡述。

  1. 文件已經匯報過給 JM

    文件匯報過給 JM,因此在 JM 端有文件的引用計數,文件的刪除由 JM 控制,當文件的引用計數變為 0 之後,JM 將刪除該文件。

  2. 文件尚未匯報給 JM

    該文件暫時尚未匯報過給 JM,該文件不再被使用,也不會被 JM 感知,成為孤兒文件。這種情況暫時有外圍工具統一進行清理。

2.5 取消 TM 端 snapshot

像前面章節所說,我們需要在checkpoint 超時/失敗時,取消TM 端的snapshot,而Flink 則沒有相應的通知機制,現在FLINK-8871[5] 在追踪相應的優化,我們在內部增加了相關實現,當checkpoint 失敗時會發送RPC 數據給TM,TM 端接受到相應的RPC 消息後,會取消相應的snapshot。

3 兼容性

小文件合併功能支持從之前的版本無縫遷移過來。從之前的 checkpoint restore 的的步驟如下:

  1. 每個 TM 分到自己需要 restore 的 state handle
  2. TM 從遠程下載 state handle 對應的數據
  3. 從本地進行恢復

小文件合併主要影響的是第 2 步,從遠程下載對應數據的時候對不同的 StateHandle 進行適配,因此不影響整體的兼容性。

4 優勢和不足

  • 優勢:大幅度降低 HDFS 的壓力:包括 RPC 壓力以及 NameNode 內存的壓力
  • 不足:不支持 State 多線程上傳的功能(State 上傳暫時不是 checkpoint 的瓶頸)

5 線上環境的結果

在該方案上線後,對Namenode 的壓力大幅降低,下面的截圖來自線上生產集群,從數據來看,文件創建和關閉的RPC 有明顯下降,RPC 的響應時間也有大幅度降低,確保順利度過雙十一。

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 8

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 9

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 10

阿里巴巴Flink踩坑經驗:如何大幅降低HDFS壓力? 11

參考文獻

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html

[2] https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html

[3] https://www.slideshare.net/dataArtisans/stephan-ewen-experiences-running-flink-at-very-large-scale

[4] https://issues.apache.org/jira/browse/FLINK-11937

[5] https://issues.apache.org/jira/browse/FLINK-8871