Categories
程式開發

基於Flink+Hive構建流批一體準實時數倉


基於Hive 的離線數倉往往是企業大數據生產系統中不可缺少的一環。 Hive 數倉有很高的成熟度和穩定性,但由於它是離線的,延時很大。 在一些對延時要求比較高的場景,需要另外搭建基於Flink 的實時數倉,將鏈路延時降低到秒級。 但是一套離線數倉加一套實時數倉的架構會帶來超過兩倍的資源消耗,甚至導致重複開發。

想要搭建流式鏈路就必須得拋棄現有的Hive 數倉嗎? 並不是,借助Flink 可以實現已有的Hive 離線數倉準實時化。本文整理自Apache Flink Committer、阿里巴巴技術專家李勁松在InfoQ技術公開課的分享,文章將分析當前離線數倉實時化的難點,詳解Flink 如何解決Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。 文章大綱如下:

  1. 離線數倉實時化的難點
  2. Flink 在流批一體的探索
  3. 構建流批一體準實時數倉應用實踐

離線數倉實時化的難點

離線數倉

基於Flink+Hive構建流批一體準實時數倉 1

上圖是一個典型的離線數倉,假設現在公司有一個需求,目前公司的數據量很大,需要每天出一個報表且輸出到業務數據庫中。 首先是剛入庫的業務數據,大致分為兩種,一種是MySQL 的binlog,另外一種是業務系統中的業務打點,這個日誌打點信息可以通過Flume等工具去採集,再離線入庫到數倉中。 然後隨著業務越來越多,業務中的各個表可以做一些抽象,抽象的好處是更好的管理和更高效的數據復用和計算復用。 所以數倉就分成了多層(明細層、中間層、服務層等等),每一層存的是數據表,數據表之間通過HiveSQL 的計算來實現ETL 轉換。

不止是HiveSQL ,Hive 只是靜態的批計算,而業務每天都要出報表,這意味著每天都要進行計算,這種情況下會依賴於調度工具和血緣管理:

  • 調度工具:按照某個策略把批計算調度起來。
  • 血緣管理:一個任務是由許多個作業組合而成,可能有非常複雜的表結構層次,整個計算是一個非常複雜的拓撲,作業間的依賴關係非常複雜(減少冗餘存儲和計算,也可以有較好的容錯),只有當一級結束後才能進行下一級的計算。

當任務十分龐大的時候,我們得出結果往往需要很長的一段時間,也就是我們常說的T+1,H+1 ,這就是離線數倉的問題。

第三方工具

基於Flink+Hive構建流批一體準實時數倉 2

上面說過,離線數倉不僅僅是簡單的Hive 計算,它還依賴了其它的第三方工具,比如:

  • 使用Flume 來入庫,但存在一定的問題,首先,它的容錯可能無法保證Exactly-Once 效果,需要下游再次進行去重操作。 其次,自定義邏輯需要通過一些手段,比如腳本來控制。 第三,離線數倉並不具備良好的擴展能力,當數據劇增時,增加原本的並發數就比較困難了。
  • 基於調度工具的作業調度會帶來級聯的計算延遲,比如凌晨1點開始計算昨天的數據,可能需要到早上6、7點才能做完,並且無法保證在設置的調度時間內數據可以完全ready 。 此外,級聯的計算還會帶來複雜的血緣管理問題,大任務的Batch 計算可能會突然打滿集群的資源,所以也要求我們對於負載管理進行考量,這些都會給業務增加負擔。

無論是離線數倉還是第三方工具,其實主要的問題還是“慢”,如何解決慢的問題,此時就該實時數倉出場了。

實時數倉

基於Flink+Hive構建流批一體準實時數倉 3

實時數倉其實是從Hive+HDFS 的組合換成了Kafka,ETL 的功能通過Flink 的流式處理解決。 此時就不存在調度和血緣管理的問題了,通過實時不斷的增量更新,最終輸出到業務的DB 中。

雖然延時降低了,但此時我們會面臨另外一些問題:

  • 歷史數據丟失,因為Kafka 只是臨時的存儲介質,數據會有一個超時的時間(比如只保存7天的數據),這會導致我們的歷史數據丟失。
  • 成本相對較高,實時計算的成本要大於離線計算。

Lambda 架構

基於Flink+Hive構建流批一體準實時數倉 4

所以此時很多人就會選擇一套實時一套離線的做法,互不干擾,根據任務是否需要走實時的需求來對需求進行分離。

這套架構看似解決了所有問題,但實際帶來的問題也是非常多。 首先,Lambda 架構造成了離線和實時的割裂問題,它們解決的業務問題都是一樣的,但是兩套方案讓同樣的數據源產生了不同的計算結果。 不同層級的表結構可能不一致,並且當數據產生不一致的問題時,還需要去進行比對排查。

隨著這套Lambda 架構越走越遠,開發團隊、表結構表依賴、計算模型等都可能會被割裂開,越到後面越會發現,成本越來越高,而統一的代價越來越大。

基於Flink+Hive構建流批一體準實時數倉 5

那麼問題來了,實時數倉會耗費如此大的資源,且還不能保留歷史數據,Lambda 架構存在如此多的問題,有什麼方案可以解決呢?

數據湖

基於Flink+Hive構建流批一體準實時數倉 6

數據湖擁有不少的優點,原子性可以讓我們做到準實時的批流一體,並且支持已有數據的修改操作。 但是畢竟數據湖是新一代數倉存儲架構,各方面都還不是很完美,目前已有的數據湖都強依賴於Spark(當然Flink 也正在擁抱數據湖),將數據遷移到數據湖需要團隊對遷移成本和人員學習成本進行考量。

如果沒有這麼大的決心遷移數據湖,那有沒有一個稍微緩和一些的方案加速已有的離線數倉呢?

Flink 在批流一體上的探索

統一元數據

基於Flink+Hive構建流批一體準實時數倉 7

Flink 一直持續致力於離線和實時的統一,首先是統一元數據。 簡單來說就是把Kafka表的元數據信息存儲到HiveMetaStore中,做到離線和實時的表Meta 的統一。

(目前開源的實時計算並沒有一個較為完善的持久化MetaStore,Hive MetaStore 不僅能保存離線表,也可以承擔實時計算的MetaStore 能力)。

統一計算引擎

基於Flink+Hive構建流批一體準實時數倉 8

同樣的元數據之後,實時和離線的表結構和層次可以設計成一樣,接下來就是可以共用:

  • 同一套SQL,Flink 自身提供批流一體的ANSI-SQL 語法,可以大大減小用戶SQL 開發者和運維者的負擔,讓用戶專注於業務邏輯。
  • 同一個引擎,Flink 的流和批復用一套優化和Runtime 框架,現階段的大數據引擎還遠遠達不到完全穩定的情況,所以仍然有很多時候需要我們去深入的分析和優化,一套引擎可以讓開發者專注單個技術棧,避免需要接觸多個技術棧,而只有技術廣度,沒有技術深度。

統一數據

基於Flink+Hive構建流批一體準實時數倉 9

分析了元數據和計算引擎的統一,更進一步,是否能統一實時和離線的數據,避免數據的不一致,避免數據的重複存儲和重複計算。 ETL 計算是否能統一呢? 既然實時表設計上可以和離線表一模一樣,是否可以乾脆只有實時表的ETL 計算,離線表從實時表裡獲取數據?

並且,通過實時鏈路可以加速離線鏈路的數據準備,批計算可以把調度換成流輸入。

基於Flink+Hive構建流批一體準實時數倉 10

Flink Hive/File Streaming Sink 即為解決這個問題,實時Kafka 表可以實時的同步到對於的離線表中:

  • 離線表作為實時的歷史數據,填補了實時數倉不存在歷史數據的空缺。
  • 數據批量準實時攝入為Ad-hoc 查詢離線表提供了準實時輸入。

此時離線的批計算也可以交由實時調度,在實時任務處理中某個契機(Partition Commit 見後續) 自行調度離線那塊的任務進行數據同步操作。

此時實時和離線的表已經基本統一,那麼問題來了,Kafka 中的表和Hive 中的表能否就共用一張表呢? 我的想法是之後可能會出現以下情況,在數倉中定義一張表,分別對應著Kafka 和Hive+HDFS 兩種物理存儲:

  • 用戶在進行insert 操作時,就自然插入到了Kafka 的實時table 當中,同時生成另外一條鏈路,自動同步到Hive Table 當中。 這樣這一張表就非常的完整,不僅滿足實時的需求,而且擁有歷史的數據。
  • 一個SQL讀取這樣的一個Hybrid Source ,根據你的查詢語句後面的where 條件,自動路由到Hive 的歷史數據,或者是Kafka 的實時數據。 根據一定的規則先讀Hive 歷史數據,再讀Kafka 實時數據,當然這裡有一個問題,它們之間通過什麼標識來切換呢? 一個想法是數據中或者Kafka 的Timestamp。

Hive Streaming Sink 的實現

基於Flink+Hive構建流批一體準實時數倉 11

Flink 1.11 前已經有了StreamingFileSink,在1.11 中不但把它集成到SQL 中,讓這個Hive Streaming Sink 可以像離線的Hive SQL 那樣,所有的業務邏輯都由SQL 去處理,而且帶來了進一步的增量。

接下來介紹下Hive/File Streaming Sink,分為兩個組件,FileWriter 和PartitionCommitter:

  • FileWriter 組件可以做到分區感知,通過checkpoint 機制可以保證Exactly-Once(分佈式場景是不可靠的,需要通過兩階段提交+ 文件Rename 的冪等性),FileWriter 也提供了Rolling 相關的參數,這個Rolling指的是我們的流式處理過程,它可以通過兩個參數來控制執行頻率,file-size 就是每個數據流的大小,rollover-interval 就是時長間隔。 但是需要注意,checkpoint 不宜設置太頻繁,以免產生過多的小文件。
  • Partition Committer,通過一系列的業務邏輯處理後得到的Finished Flies就直接可用了嗎? 因為我們典型的Hive 表都是分區表,當一個分區就緒後,還需要通知下游,Partition 已經處理完成,可以同步到Hive metastore 中了。 我們需要在合適的時機來有效的trigger 特定的Partition commit。 Partition committer 總的來說,就是完成了Hive 分區表的數據及元數據的寫入,甚至可以完成通知調度系統開始執行之後的Batch 作業。

基於Flink+Hive構建流批一體準實時數倉 12

因為流式作業是不間斷的在運行的,如何設置分區提交的時間,某個分區什麼時候提交它呢?

  • 第一種是默認策略Process time ,也就是我們所說的事件被處理時的當前系統時間,但是缺點也比較明顯,可能出現各種各樣的數據不完整。
  • 推薦策略就是partition-time,這種策略可以做到提交時的語義明確且數據完整,partition 字段就是由event time ,也就是事件產生的時間所得到的。

如果當前時間Current time > 分區產生的時間+ commitDelay 延時,即是可以開始進行分區提交的時間。 一個簡單的例子是小時分區,比如當前已經12點過1分了,已經過了11點的分區+ 一個小時,所以我們可以說不會再有11點分區的數據過來了,就可以提交11點的分區。 (要是有LateEvent 怎麼辦?所以也要求分區的提交是冪等的。)

基於Flink+Hive構建流批一體準實時數倉 13

接下來介紹分區的提交具體作用,最直接的就是寫SuccessFile 和Add partition 到Hive metastore。

Flink 內置支持了Hive-MetaStore和SuccessFile,只要配置”sink.partition-commit.policy.kind” 為“metastore,success-file”,即可做到在commit 分區的時候自動add 分區到Hive 中,而且寫SuccessFile,當add 操作完成的時候,這個partition 才真正的對Hive 可見。

Custom 機制允許自定義一個Partition Commit Policy 的類,實現這個類可以做到在這個分區的任務處理完成後:比如觸發下游的調度、Statistic Analysis、又或者觸發Hive 的小文件合併。 (當然觸發Hive 的小文件合併不但需要啟動另一個作業,而且做不到一致性保證,後續Flink 也會有進一步的探索,在Flink 作業中,主動完成小文件的合併)。

實時消費

基於Flink+Hive構建流批一體準實時數倉 14

不止是準實時的數據攝入,Flink 也帶來了維表關聯Hive 表和流實時消費Hive 表。

我們知道Flink 是支持維表關聯查詢MySQL 和HBase 的,在計算中維護一個LRU 的緩存,未命中查詢MySQL 或HBase。 但是沒有Lookup 的能力怎麼辦呢? 數據一般是放在離線數倉中的,所以業務上我們一般採用Hive Table 定期同步到HBase 或者MySQL。 Flink 也可以允許直接維表關聯Hive 表,目前的實現很簡單,需要在每個並發中全量Load Hive 表的所有數據,只能針對小表的關聯。

傳統的Hive Table 只支持按照批的方式進行讀取計算,但是我們現在可以使用流的方式來監控Hive 裡面的分區/文件生成,也就是每一條數據過來,都可以實時的進行消費計算,它也是完全復用Flink Streaming SQL 的方式,可以和HBase、MySQL、Hive Table 進行Join 操作,最後再通過FileWriter 實時寫入到Hive Table 中。

構建流批一體準實時數倉應用實踐

基於Flink+Hive構建流批一體準實時數倉 15

案例如下:通過Flume 採集日誌打點Logs,計算各年齡層的PV,此時我們存在兩條鏈路:

  • 一條是實時鏈路,通過輸入訪問日誌,關聯Hive 的User 表來計算出所需要的結果到業務DB 中。
  • 而另一條則是離線鏈路,我們需要Hive 提供小時分區表,來實現對歷史數據的Ad-hoc 查詢。

基於Flink+Hive構建流批一體準實時數倉 16

這裡就是我們剛剛提到的,雖然是對應兩個database:realtime_db 和offline_db,但是它們共用一份元數據。

對於Hive 表我們可以通過Flink SQL 提供的Hive dialect 語法,然後通過Hive 的DDL 語法來在Flink 中創建Hive 表,這裡設置PARTITION BY 天和小時,是與實時鏈路的不同之處,因為實時鏈路是沒有分區概念的。

如何在表結構裡避免分區引起的Schema 差異? 一個可以解決的方案是考慮引入Hidden Partition 的定義,Partition 的字段可以是某個字段的Computed Column,這也可以與實際常見的情況做對比,如天或小時是由時間字段計算出的,之後是下面的三個參數:

  • sink.partition-commit.trigger,指定什麼時候進行partition 的commit,這裡設置了partition-time,用於保證exactly-once;

  • partition.time-extractor.timestamp-pattern,怎樣從partition 中提取時間,相當於設置了一個提取格式;

  • sink.partition-commit.policy.kind,既partition commit 所要進行的操作,也就是剛剛提到的metastore,success-file。

之後設置回默認的Flink dialect,創建Kafka 的實時表,通過insert into 將Kafka 中的數據同步到Hive 之中。

基於Flink+Hive構建流批一體準實時數倉 17

這部分是關於Kafka 中的表如何通過Dim join 的方式,拿到User 表的年齡字段。 圖中需要關心的是lookup.join.cache.ttl 這個參數,我們會將user 這張表用類似於broadcast 的方式,廣播到每一個task 中,但是這個過程中可能出現Hive 中的table 存在更新操作,這裡的1h 就說明,數據有效期僅為1 小時。 創建view 的目的是將Dim join 所需要的process time 加上(Dim Join 需要定義Process time 是個不太自然的過程,後續也在考慮如何在不破壞SQL 語義的同時,簡化DimJoin 的語法。)

基於Flink+Hive構建流批一體準實時數倉 18

通過實時Pipeline 的手段消費Hive Table,而不是通過調度或者以往手動觸發的batch 作業,第一個參數streaming-source.enable,打開流處理機制,然後使用start-offset 參數指定從哪個分區/文件開始消費。 此時,整個流批一體準實時數倉應用基本算是完成啦。

未來規劃

Hive 作為分區級別管理的Table Format 在一些方便有比較大的限制,如果是新型的Table Format 比如Iceberg 會有更好的支持,未來Flink 會在下面幾個方面加強:

  • Flink Hive/File Streaming Sink 的Auto Compaction(Merging) 能力,小文件是實時的最大阻礙之一。
  • Flink 擁抱Iceberg,目前在社區中已經開發完畢Iceberg Sink,Iceberg Source 正在推進中,可以看見在不遠的將來,可以直接將Iceberg 當做一個消息隊列,且,它保存了所有的歷史數據,達到真正的流批統一。
  • 增強Flink Batch 的Shuffle,目前完全的Hash Shuffle 帶來了很多問題,比如小文件、隨機IO、Buffer 管理帶來的OOM,後續開源Flink (1.12) 會加強力量引入SortedShuffle 以及ShuffleService。
  • Flink Batch BoundedStream 支持,舊的Dataset API 已經不能滿足流批統一的架構,社區(1.12) 會在DataStream 上提供Batch 計算的能力。

更多細節,可以查看InfoQ 公開課的完整視頻回放:

https://live.infoq.cn/room/390

講師介紹:

李勁松,花名之信,阿里巴巴技術專家,Apache Flink Committer。 2014 年起專注於阿里內部Galaxy 流計算框架;2017 年起開始Flink 研發,主要專注於Batch 計算、數據結構與類型。