Categories
程式開發

Flink高級應用模式第二輯:應用模式的動態更新


在本系列的第一篇文章中,我們對欺詐檢測引擎的目標和所需功能給出了高層次的描述。我們還解釋瞭如何讓Apache Flink中的數據分區基於可修改的規則來定制,替代使用硬編碼的KeysExtractor實現。

我們特意略過了關於如何初始化應用的規則,以及在運行時有哪些方法來更新這些規則的細節內容。在這篇文章中我們將具體介紹這些細節。你將學習如何將第一部分中描述的數據分區方法與動態配置結合起來使用。只要共同使用這兩種模式,調整很多業務邏輯時就不用再重新編譯代碼和重新部署Flink作業了。

規則廣播

首先我們來看一下先前定義的數據處理管道:

DataStream alerts =
    transactions
        .process(new DynamicKeyFunction())
        .keyBy((keyed) - > keyed.getKey());
        .process(new DynamicAlertFunction())

DynamicKeyFunction提供動態數據分區,而DynamicAlertFunction負責執行處理事務的主要邏輯,並根據已定義的規則發送警報消息。

本系列的第一篇文章簡化了用例,並假定應用的規則集已預先初始化,可以通過DynamicKeyFunction中的List訪問。

public class DynamicKeyFunction
    extends ProcessFunction {

  /* Simplified */
  List rules = /* Rules that are initialized somehow.*/;
  ...
}

顯然,在初始化階段就可以直接在Flink作業的代碼內部向這個列表添加規則(創建一個List對象,使用它的add方法)。這樣做的主要缺點是每次修改規則後都需要重新編譯作業。在現實的欺詐檢測系統中規則會經常更改,因此從業務和運營角度來看,這種方法是不可接受的。我們需要另一種方式。

接下來是在上篇文章中引入的規則定義示例:

Flink高級應用模式第二輯:應用模式的動態更新 1

圖1:規則定義

上一篇文章提到,DynamicKeyFunction使用groupingKeyNames來提取消息鍵。該規則第二部分中的參數由DynamicAlertFunction使用:它們定義所執行操作的實際邏輯及其參數(例如警報觸發閾值)。這意味著在DynamicKeyFunction和DynamicAlertFunction中必須存在相同的規則。為了獲得這個結果,我們將使用Apache Flink的數據分發廣播機制

下圖展示了我們正在構建系統的最終作業圖:

Flink高級應用模式第二輯:應用模式的動態更新 2

圖2:欺詐檢測Flink作業的作業圖

事務處理管道的主要模塊有:

  • 事務源(Transaction Source),它並行消費來自Kafka分區的事務消息。
  • 動態鍵函數(Dynamic Key Function),使用一個動態鍵執行數據強化(enrichment)。後續的keyBy對這個動態鍵進行哈希處理,並在隨後的運算符的所有並行實例之間對數據進行分區操作。
  • 動態警報函數(Dynamic Alert Function),可生成一個數據窗口並基於該窗口創建警報。

Apache Flink內部的數據交換

上面的作業圖還指出了運算符之間的各種數據交換模式。為了解廣播模式的工作機制,我們先走一小段彎路,討論Apache Flink的分佈式運行時中存在哪些消息傳播方法。

  • 事務源之後的FORWARD連接意味著事務源運算符的一個並行實例消費的所有數據,都將精確傳輸到後續DynamicKeyFunction運算符的一個實例上。它還指出兩個連接的運算符(在上述情況下為12)並行度相同。此通信模式如圖3所示。橙色圓圈表示事務,虛線矩形表示相聯運算符的並行實例。

Flink高級應用模式第二輯:應用模式的動態更新 3

圖3:跨運算符實例傳遞的FORWARD消息

  • DynamicKeyFunction和DynamicAlertFunction之間的HASH連接意味著,對於每個消息都將計算一個​​哈希碼,並且消息將在下一個運算符的可用並行實例之間平均分配。需要使用keyBy從Flink顯式“請求”這樣的連接。

Flink高級應用模式第二輯:應用模式的動態更新 4

圖4:在運算符實例之間傳遞的哈希消息(通過keyBy

  • REBALANCE分佈是由對rebalance()的顯式調用或併行度的更改(對於圖2中的作業圖而言,為12->1)引起的。調用rebalance()會使數據以循環方式重新分區,並且在某些情況下可以幫助減輕數據偏斜。

Flink高級應用模式第二輯:應用模式的動態更新 5

圖5:跨運算符實例傳遞的REBALANCE消息

圖2中的欺詐檢測作業圖包含一個附加數據源:規則源(Rules Source)。它還從Kafka消費。規則通過BROADCAST通道“混合”到主處理數據流中。在運算符之間傳輸數據的其他方法(例如forward、hash或rebalance),會讓每個消息只可在接收的運算符的並行實例之一中處理;相比之下,broadcast會讓每個消息在broadcast stream連接的運算符的所有並行實例的輸入上可用。這使得broadcast方法適用於多種需要影響所有消息處理的任務,而無需考慮它們的鍵或源分區。

Flink高級應用模式第二輯:應用模式的動態更新 6

圖6:跨運算符實例傳遞的BROADCAST消息

注意:實際上Flink中有一些更特殊的數據分區方案,我們在這裡沒有提到。如果你想了解更多信息,請參閱Flink有關流分區的文檔

廣播狀態模式

為了使用規則源,我們需要將其“連接”到主數據流:

// Streams setup
DataStream transactions = [...]
DataStream rulesUpdateStream = [...]

BroadcastStream rulesStream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);

// Processing pipeline setup
 DataStream alerts =
     transactions
         .connect(rulesStream)
         .process(new DynamicKeyFunction())
         .keyBy((keyed) -> keyed.getKey())
         .connect(rulesStream)
         .process(new DynamicAlertFunction())

如你所見,可以調用broadcast方法並指定狀態描述符,從任何常規流中創建廣播流。 Flink假定在處理主數據流的事件時需要存儲和檢索廣播的數據,因此總是從該狀態描述符自動創建相應的廣播狀態(broadcast state)。這與其他的Apache Flink狀態類型是不一樣的,其他類型中你需要在處理函數的open()方法中對其進行初始化。另請注意,廣播狀態始終具有鍵值格式(MapState)。

public static final MapStateDescriptor RULES_STATE_DESCRIPTOR =
        new MapStateDescriptor("rules", Integer.class, Rule.class);

連接到rulesStream會導致處理函數的簽名發生某些變化。上一篇文章在這裡做了一點簡化,用的是ProcessFunction。但是,DynamicKeyFunction實際上是一個BroadcastProcessFunction。

public abstract class BroadcastProcessFunction {

    public abstract void processElement(IN1 value,
                                        ReadOnlyContext ctx,
                                        Collector out) throws Exception;

    public abstract void processBroadcastElement(IN2 value,
                                                 Context ctx,
                                                 Collector out) throws Exception;

}

這裡的區別在於增加了processBroadcastElement方法,規則流的消息將通過該方法到達。下面新版本的DynamicKeyFunction允許在運行時通過這個流,修改數據分配鍵的列表:

public class DynamicKeyFunction
    extends BroadcastProcessFunction {


  @Override
  public void processBroadcastElement(Rule rule,
                                     Context ctx,
                                     Collector out) {
    BroadcastState broadcastState = ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);
    broadcastState.put(rule.getRuleId(), rule);
  }

  @Override
  public void processElement(Transaction event,
                           ReadOnlyContext ctx,
                           Collector out){
    ReadOnlyBroadcastState rulesState =
                                  ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);
    for (Map.Entry entry : rulesState.immutableEntries()) {
        final Rule rule = entry.getValue();
        out.collect(
          new Keyed(
            event, KeysExtractor.getKey(rule.getGroupingKeyNames(), event), rule.getRuleId()));
    }
  }
}

在上面的代碼中,processElement()接收事務,而processBroadcastElement()接收規則更新。創建新規則後將按圖6所示分配,並使用processBroadcastState將其保存在運算符的所有並行實例中。我們使用規則的ID作為存儲和引用各個規則的鍵。我們不再迭代硬編碼的List,而是迭代動態更新的廣播狀態的條目。

在將規則存儲在廣播MapState中時,DynamicAlertFunction遵循相同的邏輯。如第一部分中所述,processElement輸入中的每個消息會由一個特定規則處理,並通過DynamicKeyFunction帶有相應ID的“預標記”。我們需要做的就是使用提供的ID從BroadcastState中檢索相應規則的定義,並根據該規則所需的邏輯對其進行處理。在這一階段,我們還將消息添加到內部函數狀態,以便在所需的數據時間窗口上執行計算。我們將在欺詐檢測系列的最後一篇文章中探討如何做到這一點。

小結

本文,我們繼續研究了使用Apache Flink構建的欺詐檢測系統的用例。我們研究了在並行運算符實例之間分配數據的各種方式,而最重要的是探討了廣播狀態。我們演示瞭如何通過廣播狀態模式提供的功能來配合和增強動態分區(本系列第一部分中介紹的一種模式)。在運行時發送動態更新的能力是Apache Flink的強大功能,適用於其他多種用例,例如控制狀態(清除/插入/修復)、運行A/B實驗或執行ML模型係數的更新等。

原文鏈接:https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html