Categories
程式開發

深入解讀Flink窗口的應用與實現


本文根據Apache Flink 系列直播整理而成,由Apache Flink Contributor、OPPO 大數據平台研發負責人張俊老師分享。 主要內容如下:

  1. 整體思路與學習路徑
  2. 應用場景與編程模型
  3. 工作流程與實現機制

整體思路與學習路徑

深入解讀Flink窗口的應用與實現 1

當我們碰到一項新的技術時,我們應該怎樣去學習並應用它呢? 在我個人看來,有這樣一個學習的路徑,應該把它拆成應用和實現兩塊。 首先應該從它的應用入手,然後再深入它的實現。

應用主要分為三個部分,首先應該了解它的應用場景,比如窗口的一些使用場景。 然後,進一步地我們去了解它的編程接口,最後再深入了解它的一些抽象概念。 因為一個框架或一項技術,肯定有它的編程接口和抽象概念來組成它的編程模型。 我們可以通過查看文檔的方式來熟悉它的應用。 在對應用這三個部分有了初步的了解後,我們就可以通過閱讀代碼的方式去了解它的一些實現了。

實現部分也分三個階段,首先從工作流程開始,可以通過API 層面不斷的下鑽來了解它的工作流程。 接下來是它整體的設計模式,通常對一些框架來說,如果能構建一個比較成熟的生態,一定是在設計模式上有一些獨特的地方,使其有一個比較好的擴展性。 最後是它的數據結構和算法,因為為了能夠處理海量數據並達到高性能,它的數據結構和算法一定有獨到之處。 我們可以做些深入了解。

以上大概是我們學習的一個路徑。 從實現的角度可以反哺到應用上來,通常在應用當中,剛接觸某個概念的時候會有一些疑惑。 當我們對實現有一些了解之後,應用中的這些疑惑就會迎刃而解。

為什麼要關心實現

舉個例子:

深入解讀Flink窗口的應用與實現 2

看了這個例子我們可能會有些疑惑:

  • ReduceFunction 為什麼不用計算每個key 的聚合值?
  • 當key 基數很大時,如何有效地觸發每個key 窗口計算?
  • 窗口計算的中間結果如何存儲,何時被清理?
  • 窗口計算如何容忍late data ?

當你了解了實現部分再回來看應用這部分,可能就有種醍醐灌頂的感覺。

應用場景與編程模型

實時數倉的典型架構

深入解讀Flink窗口的應用與實現 3

■ 第一種最簡單架構,ODS 層的Kafka 數據經過Flink 的ETL 處理後寫入DW 層的Kafka,再通過Flink 聚合寫入ADS 層的MySQL 中,做這樣一個實時報表展現。

缺點:由於MySQL 存儲數據有限,所以聚合的時間粒度不能太細,維度組合不能太多。

■ 第二種架構相對於第一種引入了OLAP 引擎,同時也不用Flink 來做聚合,通過Druid 的Rollup 來做聚合。

缺點:因為Druid 是一個存儲和查詢引擎,不是計算引擎。 當數據量巨大時,比如每天上百億、千億的數據量,會加劇Druid 的導入壓力。

■ 第三種架構在第二種基礎上,採用Flink 來做聚合計算寫入Kafka,最終寫入Druid。

缺點:當窗口粒度比較長時,結果輸出會有延遲。

第四種架構在第三種基礎上,結合了Flink 聚合和Druid Rollup。 Flink 可以做輕度的聚合,Druid 做Rollup 的匯總。 好處是Druid 可以實時看到Flink 的聚合結果。

Window 應用場景

深入解讀Flink窗口的應用與實現 4

■ 聚合統計:從Kafka 讀取數據,根據不同的維度做1分鐘或5分鐘的聚合計算,然後結果寫入MySQL 或Druid 中。

■ 記錄合併:對多個Kafka 數據源在一定的窗口範圍內做合併,結果寫入ES。 例如:用戶的一些行為數據,針對每個用戶,可以對其行為做一定的合併,減少寫入下游的數據量,降低ES 的寫入壓力。

■ 雙流join:針對雙流join 的場景,如果全量join 的話,成本開銷會非常大。 所以就要考慮基於窗口來做join。

Window 抽象概念

深入解讀Flink窗口的應用與實現 5

■TimestampAssigner: 時間戳分配器,假如我們使用的是EventTime 時間語義,就需要通過TimestampAssigner 來告訴Flink 框架,元素的哪個字段是事件時間,用於後面的窗口計算。

■KeySelector:Key 選擇器,用來告訴Flink 框架做聚合的維度有哪些。

■WindowAssigner:窗口分配器,用來確定哪些數據被分配到哪些窗口。

■狀態:狀態,用來存儲窗口內的元素,如果有AggregateFunction,則存儲的是增量聚合的中間結果。

■ AggregateFunction(可選):增量聚合函數,主要用來做窗口的增量計算,減輕窗口內State 的存儲壓力。

■扳機:觸發器,用來確定何時觸發窗口的計算。

■ Evictor(可選):驅逐器,用於在窗口函數計算之前(後)對滿足驅逐條件的數據做過濾。

■WindowFunction:窗口函數,用來對窗口內的數據做計算。

■收藏家:收集器,用來將窗口的計算結果發送到下游。

上圖中紅色部分都是可以自定義的模塊,通過自定義這些模塊的組合,我們可以實現高級的窗口應用。 同時Flink 也提供了一些內置的實現,可以用來做一些簡單應用。

Window 編程接口

stream      
  .assignTimestampsAndWatermarks(…)     <-    TimestampAssigner
  .keyBy(...)                           <-    KeySelector       
  .window(...)                          <-    WindowAssigner        
  [.trigger(...)]                       <-    Trigger         
  [.evictor(...)]                       <-    Evictor
  .reduce/aggregate/process()           <-    Aggregate/Window function

首先我們先指定時間戳和Watermark 如何生成;然後選擇需要聚合的維度的Key;再選擇一個窗口和選擇用什麼樣的觸發器來觸發窗口計算,以及選擇驅逐器做什麼樣的過濾;最後確定窗口應該做什麼樣計算。

下面是一個示例:

深入解讀Flink窗口的應用與實現 6

接下來我們詳細看下每個模塊。

■窗口分配器

深入解讀Flink窗口的應用與實現 7

總結一下主要有3類窗口:

  • 時間窗
  • 計數窗口
  • 自訂視窗

■窗觸發

Trigger 是一個比較重要的概念,用來確定窗口什麼時候觸發計算。

Flink 內置了一些Trigger 如下圖:

深入解讀Flink窗口的應用與實現 8

■ Trigger 示例

深入解讀Flink窗口的應用與實現 9

假如我們定義一個5分鐘的基於EventTime 的滾動窗口,定義一個每2分觸發計算的Trigger,有4條數據事件時間分別是20:01、20:02、20:03、20:04,對應的值分別是1、2、3、2,我們要對值做Sum 操作。

初始時,State 和Result 中的值都為0。

深入解讀Flink窗口的應用與實現 10

當第一條數據在20:01進入窗口時,State 的值為1,此時還沒有到達Trigger 的觸發時間。

深入解讀Flink窗口的應用與實現 11

第二條數據在20:02進入窗口,State 中的值為1+2=3,此時達到2分鐘滿足Trigger 的觸發條件,所以Result 輸出結果為3。

深入解讀Flink窗口的應用與實現 12

第三條數據在20:03進入窗口,State 中的值為3+3 = 6,此時未達到Trigger 觸發條件,沒有結果輸出。

深入解讀Flink窗口的應用與實現 13

第四條數據在20:04進入窗口,State中的值更新為6+2=8,此時又到了2分鐘達到了Trigger 觸發時間,所以輸出結果為8。 如果我們把結果輸出到支持update 的存儲,比如MySQL,那麼結果值就由之前的3更新成了8。

■ 問題:如果Result 只能append?

深入解讀Flink窗口的應用與實現 14

如果Result 不支持update 操作,只能append 的話,則會輸出2條記錄,在此基礎上再做計算處理就會引起錯誤。

這樣就需要PurgingTrigger 來處理上面的問題。

■ PurgingTrigger 的應用

深入解讀Flink窗口的應用與實現 15

和上面的示例一樣,唯一的不同是在ContinuousEventTimeTrigger 外麵包裝了一個PurgingTrigger,其作用是在ContinuousEventTimeTrigger 觸發窗口計算之後將窗口的State 中的數據清除。

再看下流程:

深入解讀Flink窗口的應用與實現 16

前兩條數據先後於20:01和20:02進入窗口,此時State 中的值更新為3,同時到了Trigger的觸發時間,輸出結果為3。

深入解讀Flink窗口的應用與實現 17

由於PurgingTrigger 的作用,State 中的數據會被清除。

深入解讀Flink窗口的應用與實現 18

當後兩條數據進入窗口之後,State 重新從0開始累計並更新為5,輸出結果為5。

由於結果輸出是append 模式,會輸出3和5兩條數據,然後再做Sum 也能得到正確的結果。

上面就是PurgingTrigger 的一個簡單的示例,它還支持很多有趣的玩法。

■ DeltaTrigger 的應用

有這樣一個車輛區間測試的需求,車輛每分鐘上報當前位置與車速,每行進10公里,計算區間內最高車速。

深入解讀Flink窗口的應用與實現 19

首先需要考慮的是如何來劃分窗口,它不是一個時間的窗口,也不是一個基於數量的窗口。 用傳統的窗口實現比較困難,這種情況下我們考慮使用DeltaTrigger 來實現。

下面是簡單的代碼實現:

深入解讀Flink窗口的應用與實現 20

如何提取時間戳和生成水印,以及選擇聚合維度就不贅述了。 這個場景不是傳統意義上的時間窗口或數量窗口,可以創建一個GlobalWindow,所有數據都在一個窗口中,我們通過定義一個DeltaTrigger,並設定一個閾值,這裡是10000(米)。 每個元素和上次觸發計算的元素比較是否達到設定的閾值,這裡比較的是每個元素上報的位置,如果達到了10000(米),那麼當前元素和上一個觸發計算的元素之間的所有元素落在同一個窗口裡計算,然後可以通過Max 聚合計算出最大的車速。

■ 思考點

上面這個例子中我們通過GlobalWindow 和DeltaTrigger 來實現了自定義的Window Assigner 的功能。 對於一些複雜的窗口,我們還可以自定義WindowAssigner,但實現起來不一定簡單,倒不如利用GlobalWindow 和自定義Trigger 來達到同樣的效果。

下面這個是Flink 內置的CountWindow 的實現,也是基於GlobalWindow 和Trigger 來實現的。

深入解讀Flink窗口的應用與實現 21

■開窗器

Flink 內置了一些Evictor 的實現。

深入解讀Flink窗口的應用與實現 22

■ TimeEvictor 的應用

基於上面的區間測速的場景,每行進10公里,計算區間內最近15分鐘最高車速。

深入解讀Flink窗口的應用與實現 23

實現上只是在前面基礎上增加了Evictor 的使用,過濾掉窗口最後15分鐘之前的數據。

深入解讀Flink窗口的應用與實現 24

■窗口功能

Flink 內置的WindowFunction 有兩種類型,第一種是AggregateFunction,它是高級別的抽象,主要用來做增量聚合,每來一條元素都做一次聚合,這樣狀態裡只需要存最新的聚合值。

  • 優點:增量聚合,實現簡單。
  • 缺點:輸出只有一個聚合值,使用場景比較局限。

深入解讀Flink窗口的應用與實現 25

第二種是ProcessWindowFunction,它是低級別的抽像用來做全量聚合,每來一條元素都存在狀態裡面,只有當窗口觸發計算時才會調用這個函數。

深入解讀Flink窗口的應用與實現 26

  • 優點:可以獲取到窗口內所有數據的迭代器,實現起來比較靈活;可以獲取到聚合的Key 以及可以從上下文Context 中獲取窗口的相關信息。
  • 缺點:需要存儲窗口內的全量數據,State 的壓力較大。

同時我們可以把這兩種方式結合起來使用,通過AggregateFunction 做增量聚合,減少中間狀態的壓力。 通過ProcessWindowFunction 來輸出我們想要的信息,比如聚合的Key 以及窗口的信息。

工作流程和實現機制

上一節我們介紹了窗口的一些抽象的概念,包括它的編程接口,通過一些簡單的示例介紹了每個抽象概念的的用法。

這一節我們深入的研究以下窗口底層是怎麼實現的。

WindowOperator 工作流程

首先看下WindowOperator 的工作流程,代碼做了一些簡化,只保留了核心步驟。

深入解讀Flink窗口的應用與實現 27

主要包括以下8個步驟:

  1. 獲取element 歸屬的windows
  2. 獲取element 對應的key
  3. 如果late data,跳過
  4. 將element 存入window state
  5. 判斷element 是否觸發trigger
  6. 獲取window state,注入window function
  7. 清除window state
  8. 註冊timer,到窗口結束時間清理window

窗口狀態

前面提到的增量聚合計算和全量聚合計算,這兩個場景所應用的State 是不一樣的。

如果是全量聚合,元素會添加到ListState 當中,當觸發窗口計算時,再把ListState 中所有元素傳遞給窗口函數。

深入解讀Flink窗口的應用與實現 28

如果是增量計算,使用的是AggregatingState,每條元素進來會觸發AggregateTransformation 的計算。

深入解讀Flink窗口的應用與實現 29

看下AggregateTransformation 的實現,它會調用我們定義的AgregateFunction 中的createAccumulator 方法和add 方法並將add 的結果返回,所以State 中存儲的就是accumulator 的值,所以比較輕量級。

深入解讀Flink窗口的應用與實現 30

視窗功能

在觸發窗口計算時會將窗口中的狀態傳遞給emitWindowContents 方法。 這裡會調用我們定義的窗口函數中的process 方法,將當前的Key、Window、上下文Context、窗口的內容作為參數傳給它。 在此之前和之後會分別調用evictBefore 和evictAfter 方法把一些元素過濾掉。 最終會調用windowState 的clear 方法,再把過濾之後的記錄存到windowState 中去。 從而達到evictor 過濾元素的效果。

深入解讀Flink窗口的應用與實現 31

窗口觸發

最後看下Trigger 的實現原理。 當我們有大量的Key,同時每個Key 又屬於多個窗口時,我們如何有效的觸發窗口的計算呢?

Flink 利用定時器來保證窗口的觸發,通過優先級隊列來存儲定時器。 隊列頭的定時器表示離當前時間最近的一個,如果當前定時器比隊列頭的定時器時間還要早,則取消掉隊列頭的定時器,把當前的時間註冊進去。

深入解讀Flink窗口的應用與實現 32深入解讀Flink窗口的應用與實現 33

當這次定時器觸發之後,再從優先級隊列中取下一個Timer,去調用trigger 處理的函數,再把下一個Timer 的時間註冊為定時器。 這樣就可以循環迭代下去。

深入解讀Flink窗口的應用與實現 34

總結

本文主要分享了Flink 窗口的應用與實現。 首先介紹了學習一項新技術的整體思路與學習路徑,從應用入手慢慢深入它的實現。 然後介紹了實時數倉的典型架構發展歷程,之後從窗口的應用場景、抽象概念、編程結構詳細說明了窗口的各個組成部分。 並通過一些示例詳細展示了各個概念之間配合使用可以滿足什麼樣的使用場景。 最後深入窗口的實現,從源碼層面說明了窗口各模塊的工作流程。

推薦閱讀:

  • Apache Flink運維和實戰系列文章

  • Apache Flink 零基礎入門到進階系列文章