Categories
程式開發

Flink高級應用模式第一輯:欺詐檢測系統案例研究


在這個博客文章系列中,你將學習到三種用來構建流應用程序的強大的Flink模式:

  • 動態更新應用程序邏輯
  • 動態數據分區(混排),在運行時控制
  • 基於自定義窗口邏輯的低延遲警報(不使用窗口API)

這些模式帶來了更多使用靜態定義的數據流實現的功能,並提供了滿足複雜業務需求的構建塊。

應用程序邏輯的動態更新允許Flink作業在運行時更改,而不會因停止和代碼重新提交而導致停機。

動態數據分區提供了在運行時更改Flink分配事件和分組方式的能力。當使用可動態重新配置的應用程序邏輯構建作業時,往往會自然而然需求這樣的能力。

自定義窗口管理展示了當原生窗口API與你的需求不完全匹配時,如何使用底層進程函數API。具體來說,你將學習如何在Windows上實現低延遲警報以及如何使用計時器限制狀態增長。

這些模式都是建立在Flink核心功能的基礎上的,但框架的文檔可能不夠一目了然,因為不用具體的場景舉例的話,往往很難解釋和展示這些模式背後的機制。所以我們將通過一個實際示例來展示這些模式,這個示例為Apache Flink提供了一個真實的使用場景,那就是一個欺詐檢測引擎。我們希望本系列文章能幫助你將這些功能強大的方法放到自己的工具箱中,從而執行一些激動人心的新任務。

在該系列的第一篇博文中,我們將介紹這個演示應用程序的高級架構,描述它的各個組件及其交互。然後,我們將深入研究該系列中第一個模式的實現細節——動態數據分區

你可以在本地完整運行這個欺詐檢測演示應用程序,並通過隨附的GitHub存儲庫查看實現的細節。

欺詐檢測演示

我們的欺詐檢測演示的完整源代碼都是開源的,可以在線獲取。要在本地運行它,請訪問以下存儲庫並按照自述文件中的步驟操作:

https://github.com/afedulov/fraud-detection-demo

你會看到該演示是一個自包含的應用程序——它只需要從源構建docker和docker-compose,並且包含以下組件:

  • 帶有ZooKeeper的Apache Kafka(消息代理)
  • Apache Flink(應用程序集群
  • 欺詐檢測Web應用

這款欺詐檢測引擎的高級目標是消費一個金融交易流,並根據一組規則對其進行評估。這些規則會經常更改和調整。在實際的生產系統中,我們需要在運行時添加和刪除它們,而不會因停止和重新啟動作業而帶來高昂的代價。

在瀏覽器中轉到演示URL時,將顯示以下UI:

Flink高級應用模式第一輯:欺詐檢測系統案例研究 1

圖1:欺詐檢測演示UI

單擊“Start”按鈕後,你可以在左側看到在系統中流動的財務交易的直觀表示。可以使用頂部的滑塊控制每秒生成的交易數。中間部分用來管理Flink評估的規則。在這裡,你可以創建新規則以及發出控制命令,例如清除Flink的狀態。

演示自帶一組預定義的示例規則。你可以單擊Start按鈕,一段時間後就能觀察到UI右側部分中顯示的警報。這些警報是Flink根據預定義規則針對生成交易流的評估結果。

我們的欺詐檢測示例係統包含三大組件:

  • 前端(React)
  • 後端(SpringBoot)
  • 欺詐檢測應用程序(Apache Flink)

主要元素之間的交互如圖2所示。

Flink高級應用模式第一輯:欺詐檢測系統案例研究 2

圖2:欺詐檢測演示組件

後端向前端公開了一個REST API,用於創建/刪除規則以及發布控制命令來管理演示應用的執行。然後通過一個“Control”Kafka主題將這些前端動作轉發給Flink。後端還包括一個Transaction Generator(交易生成器)組件,該組件通過單獨的“Transactions”主題將模擬的匯款事件流發送到Flink。由Flink生成的警報由後端的“Alerts”主題的消費,並通過WebSockets轉發到UI。

現在你已經熟悉了我們這款欺詐檢測引擎的總體佈局和目標,現在我們來詳細介紹實現這種系統所需的條件。

動態數據分區

我們要研究的第一個模式是動態數據分區。

如果你過去曾經使用過Flink的DataStream API,那麼你無疑會熟悉keyBy方法。 Keying一個流會重排所有記錄,以便將具有相同key的元素分配給同一分區。這意味著所有具有相同key的記錄將由下一個運算符的同一個物理實例處理。

在典型的流應用程序中,key的選擇是固定的,由元素內的某些靜態字段確定。例如,當構建一個簡單的基於窗口的交易流聚合時,我們可能總是按交易賬戶ID進行分組。

DataStream input = // [...]
DataStream windowed = input
  .keyBy(Transaction::getAccountId)
  .window(/*window specification*/);

這種方法是在眾多用例中實現水平可擴展性的主要構建塊。但如果應用程序試圖在運行時提供業務邏輯的靈活性,這種方法還不夠用。為了理解為什麼會發生這種情況,我們首先以一個功能需求的形式為這款欺詐檢測系統制定一個現實的示例規則定義:

“只要在一周內從同一付款人向同一收款人累計付款總額超過1,000,000美元,就會發出警報。 ”

在這個公式中,我們可以發現許多能夠在新提交的規則中指定的參數,甚至可能稍後在運行時修改或調整它們:

  • 匯總字段(付款金額)
  • 分組字段(付款人+收款人)
  • 匯總函數(總和)
  • 窗口持續時間(1週)
  • 限制(1000000)
  • 限制運算符(更大)

因此,我們將使用下面這樣簡單的JSON格式來定義上述參數:

{
  "ruleId": 1,
  "ruleState": "ACTIVE",
  "groupingKeyNames": ["beneficiaryId", "payeeId"],
  "aggregateFieldName": "paymentAmount",
  "aggregatorFunctionType": "SUM",
  "limitOperatorType": "GREATER",
  "limit": 1000000,
  "windowMinutes": 10080
}

在這裡,重要的是要了解groupingKeyNames確定的是事件的實際物理分組——必須將具有相同指定參數值(例如,25號付款人->12號收款人)的所有交易匯總到評估運算符的同一個物理實例中。自然,在Flink API中以這種方式分發數據的過程是通過一個keyBy()函數實現的。
Flink的keyBy()文檔)中的大多數示例都使用硬編碼的KeySelector,其會提取特定固定事件的字段。但是,為了支持所需的靈活性,我們必鬚根據規則的定義以更加動態的方式提取它們。為此,我們將不得不使用一個額外的運算符,該運算符為每個事件做準備以將其分發到正確的聚合實例。

在高級層面上,我們的主要處理管道如下所示:

DataStream alerts =
    transactions
        .process(new DynamicKeyFunction())
        .keyBy(/* some key selector */);
        .process(/* actual calculations and alerting */)

先前我們已經確定,每個規則都定義一個groupingKeyNames參數,該參數用來指定將哪些字段組合用於傳入事件的分組。每個規則都可以使用這些字段的任意組合。同時,每個傳入事件都可能需要根據多個規則進行評估。這意味著這些事件可能需要同時出現在與不同規則相對應的評估運算符的多個並行實例上,因此需要進行分叉。用DynamicKeyFunction()來確保此類事件的分派。

Flink高級應用模式第一輯:欺詐檢測系統案例研究 3

圖3:具有動態Key函數的分叉事件

DynamicKeyFunction迭代一組已定義的規則,並提取所需的分組key來為每個要由keyBy()函數處理的事件作準備:

public class DynamicKeyFunction
    extends ProcessFunction {
   ...
  /* Simplified */
  List rules = /* 初始化的规则.
                        细节会在未来的文章中讨论. */;

  @Override
  public void processElement(
      Transaction event,
      Context ctx,
      Collector out) {

      for (Rule rule :rules) {
       out.collect(
           new Keyed(
               event,
               KeysExtractor.getKey(rule.getGroupingKeyNames(), event),
               rule.getRuleId()));
      }
  }
  ...
}

KeysExtractor.getKey()使用反射來從事件中提取groupingKeyNames字段的必需值,並將它們組合為單個串聯的字符串key,例如“ {beneficiaryId = 25; payeeId = 12}”。 Flink將計算該key的哈希值,並將此特定組合的處理分配給集群中的特定服務器。這將跟踪25號付款人和12號收款人之間的所有交易,並在期望的時間窗口內評估定義的規則。
注意,這裡引入了具有以下簽名的包裝類Keyed,作為DynamicKeyFunction的輸出類型

public class Keyed {
  private IN wrapped;
  private KEY key;
  private ID id;

  ...
  public KEY getKey(){
      return key;
  }
}

該POJO的字段包含以下信息:wrapped是原始交易事件,key是使用KeysExtractor的結果,id是導致事件分配的Rule的ID(根據特定於規則的分組邏輯)。
這種類型的事件將成為主處理管道中keyBy()函數的輸入,並允許在實現動態數據混排的最後步驟中使用一個簡單的lambda表達式作為一個(KeySelector)。

DataStream alerts =
    transactions
        .process(new DynamicKeyFunction())
        .keyBy((keyed) -> keyed.getKey());
        .process(new DynamicAlertFunction())

有了DynamicKeyFunction,我們可以隱式複制事件,以便在Flink集群中並行執行各條規則評估。這樣一來,我們獲得了一個重要的屬性——規則處理的水平可擴展性。我們的系統將能夠通過向集群添加更多服務器來處理更多規則,也就是提高並行度。實現此屬性的代價是重複數據,這可能會成為一個問題,具體取決於特定的參數集,例如傳入數據速率、可用網絡帶寬和事件負載大小等。在實際場景中可以應用其他優化,例如合併具有相同groupingKeyNames的規則評估,或添加一個過濾層,以在處理特定規則時剝離所有字段中不需要的事件。

結束語

在這篇博文中,我們用一個示例用例(欺詐檢測引擎)討論了對Flink應用程序提供動態運行時更改能力的原因。我們描述了整體架構及其組件之間的交互,並提供了在dockerized設置中構建和運行示例欺詐檢測應用程序的指引。然後,我們展示了將動態數據分區模式實現為第一個基礎構建塊以實現靈活的運行時配置的細節操作。

為了將重心放在描述模式的核心機制上,我們將DSL和基礎規則引擎的複雜性降到了最低。走下去的話,不難想像我們會添加一些擴展,例如允許使用更複雜的規則定義,包括某些事件的過濾、邏輯規則鏈接以及其他更高級的功能。

在本系列的第二部分中,我們將描述規則如何進入正在運行的欺詐檢測引擎。此外,我們將詳細介紹管道的主要處理函數——DynamicAlertFunction()的實現細節。

Flink高級應用模式第一輯:欺詐檢測系統案例研究 4

圖4:端到端管道

在下一篇文章中,我們將看到如何在運行時利用Flink的廣播流來幫助指導欺詐檢測引擎中的處理(動態應用程序更新模式)。

原文鏈接

https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html