Categories
程式開發

閒魚端側如何實現實時CEP引擎


閒魚端側如何實現實時CEP引擎

背景

用戶來閒魚,主要是為了獲得自己關心的內容。隨著閒魚的體量越來越大,內容也變得越來越豐富。閒魚基於用戶畫像,可以將用戶關心的內容推送給用戶。具體在哪些場景下才需要觸發推送?我們定義了很多觸發規則,包括停留時長、點擊路徑等。起初我們把觸發規則的邏輯放在服務端(Blink)運行。但實踐下來發現Blink存在諸多限制:

  • 服務端要對客戶端埋點進行數據清洗,考慮到閒魚的DAU已經突破2000w,這個量是非常龐大的,非常消耗服務端資源;
  • Blink的策略是實時執行的,同樣因為資源問題,現在只能同時上線十幾個策略。

如何解決這些問題呢,我們開始考慮能否將Blink的策略跑在客戶端!

CEP模型

Blink,作為是Flink的一個分支,最初是阿里巴巴內部創建的,針對Flink進行了改進,所以我們這裡還是圍繞Flink討論。 CEP(Complex Event Process)是Flink中的一個子庫,用來快速檢測無盡數據流中的複雜模式。

Flink CEP

Flink的CEP的核心是NFA(Non-determined Finite Automaton),全稱叫不確定的有限狀態機。提到NFA,就不得不提Jagrati Agrawal等撰寫的關於NFA模型的論文《Efficient Pattern Matching over Event Streams》,本篇論文中描述了NFA的匹配原理。

閒魚端側如何實現實時CEP引擎 1

上面這張圖,就是一個不確定的有限狀態機,它由狀態(State)還有之間的連線(StateTransition)組成的。

  • 狀態(State):狀態是根據flink腳本里面的代碼來決定的,最終會有一個 $end$的Final狀態
  • 轉換(StateTransition):State的轉換條件,包括 take/proceed/ignore

不同的條件,代表的含義不同:

  • take: 滿足條件,獲取當前元素,進入下一狀態
  • proceed:不論是否滿足條件,不獲取當前元素,直接進入下狀態(如optional)並進行判斷是否滿足條件。
  • ignore:不滿足條件,忽略,進入下一狀態。

我們只要在端上實現這樣一個狀態機,就可以實現一個CEP引擎。

Python CEP

對於客戶端來說,首先要解決的問題是如何構建一個CEP環境。經過調研,可以復用集團的端智能容器(Walle),作為Python容器可以執行cep的策略。

在構建NFA之前,首先要解決的一個問題是數據來源,手淘信息流團隊有一套完整的解決方案BehaviX/BehaviR,可以對UT埋點進行結構化,能很好的結合Walle容器來觸發策略。有了事件來源,還需要解決的是Python腳本如何執行。 Walle平台可以將多個Python腳本打包下載並執行,因此,我們可以將CEP封裝成一個Python的庫,然後跟策略腳本一起下發。

最終的整體架構設計如下圖所示:

閒魚端側如何實現實時CEP引擎 2

本文重點介紹下如何用Python來實現一個CEP的編譯器,這個編譯器主要用來將CEP的描述語言轉換成為NFA。

編譯器原理

在Flink中,java側會有一套完善的API來編寫一個策略腳本,《efficient Pattern Matching over Event Streams》論文中還定義了一套完備的DSL描述語言,也是會轉化成java文件去調用這些API去完成匹配。那麼接下來會重點討論,flink是如何將上述API轉化成NFA去匹配,以及Python CEP如何實現上述一套完整API接口。

Pattern

在Flink裡面,是通過 Pattern來構建這個NFA,首先用它描述這個不確定性狀態機。首先是構建一個 Pattern的一個鍊錶,得到這個鍊錶之後,會將每個Pattern映射成為 State的圖,點與點之間會通過 StateTransition來連接。以下面的Python代碼為例,看下如何API是如何工作的:

例如,需要創建這樣一個規則,描述如下:

以start事件開始,後續跟隨一個middle的事件,後面緊跟著一個end事件作為結尾

用Pattern編寫如下所示:

.followed_by('middle').where(SimpleCondition())
.next_('end').where(SimpleCondition())

這個代碼裡面聲明了3個Pattern,依次命名為 startmiddleend。 Pattern裡面保存了指向前面節點的引用 previous,整個Pattern鍊錶構建完如下圖所示:

閒魚端側如何實現實時CEP引擎 3

最終拿到的是 end節點的一個引用 Ref,Pattern中會有一個變量指向前一個節點,這樣就可以得到一個Pattern的反向鍊錶。

Pattern的對外接口定義如下:

# 静态方法,用来生成起始的pattern
@staticmethod
def begin(self, name):
pass
# 标记紧接着的事件
def followed_by(self, name):
pass
# 标记不需要紧跟的事件
def not_followed_by(self, name):
pass
# 标记紧跟的事件
def next_(self, name):
pass
# 标记事件循环次数
def times(self, times):
pass
# 标记当前事件触发的条件
def where(self, condition):
pass
# 标记当前事件的and条件
def and_(self, condition):
pass
# 标记当前事件的or条件
def or_(self, condition):
pass
# 用于聚合
def group_by(self, fields):
pass
# 用于聚合,渠道特定字段的值
def fields(self, key_by_state_name, field):
pass
# 用于聚合,统计事件具体的数量
def count(self, field, condition):
pass

不同接口會生成不同的消費策略的節點,具體細節可以參考 StateTransition。有了Pattern鍊錶,接下來就需要編譯器(Compiler)了,它主要是將Pattern鍊錶轉化成NFA圖,首先來看下NFA的2個核心組件:StateStateTransition

State

結構定義如下:


def __init__(self, name, state_type):
        self.__name = name                # 节点的名称,同Pattern的名称
        self.__state_type = state_type    # 节点的类型:Start/Normal/Stop/Final
        self.__state_transitions = []    # 到其他节点的边

State一共有4種類型:Start/Final/Normal/Stop

生成NFA的過程就是將反向解析Pattern鍊錶的過程,大概的過程如下:

  1. 創建一個 $end$的結束節點( Final
  2. 再從後往前創建每個state節點,作為中間節點( Normal/Stop
  3. 最後創建一個開始節點( Start)State的名稱就是Pattern的節點名稱,創建完成之後如下圖所示。

閒魚端側如何實現實時CEP引擎 4

Transition

State代表了當前狀態機的狀態,不同狀態之前的切換定義成 StateTransition

結構定義如下:


def __init__(self, source_state, action, target_state, condition):
        self.__source_state = source_state    # 开始的State节点
        self.__action = action                # 具体action类型:take/ignore/proceed
        self.__target_state = target_state    # 结束的State节点
        self.__condition = condition        # 节点之间的判断条件

邊的生成邏輯跟Pattern的事件消費策略相關,以下是事件消費策略:

    STRICT = 0# 严格匹配下个
    SKIP_TILL_NEXT = 1# 跳过下一个
    SKIP_TILL_ANY = 2# 跳过任意一个
    NOT_FOLLOW = 3# 非跟随模式
    NOT_NEXT = 4# 非紧邻模式

不同的消費策略,得到的狀態機如下圖所示:

閒魚端側如何實現實時CEP引擎 5

  • STRICT: 如果命中了事件了,會進到下個狀態
  • SKIP_TILL_NEXT: 如果命中了會進入下一個狀態,否則會再當前節點循環,進入ignore的邊
  • SKIP_TILL_ANY: 不管是否命中條件,都會一直在當前狀態循環
  • NOT_FOLLOW: 如果遇到了一個匹配的,就會進入Stop狀態
  • NOT_NEXT: 如果命中一條,則進入Stop狀態

在Pattern中,不同的接口會創建出不同的消費策略節點,例如 followed_by接口會創建 SKIP_TILL_NEXT的節點。

Times

如果有的規則,要求特定的事件,循環出現幾次,那現在就要用到times接口。比如瀏覽3次寶貝這個規則,規則就可以寫成:

SimpleCondition

最終就會得到一個 Times=3的Pattern,編譯器在拿到這個Pattern之後,一樣先創建一個$end$的Final節點,在處理times的時候,會創建重複的節點,只不過名稱不同,不同的點之間用take鏈接起來,如下圖所示:

閒魚端側如何實現實時CEP引擎 6

Python CEP聚合

Flink是通過InputStream將匹配的事件轉移給CEPOperator,執行聚合操作;但是在客戶端的聚合,一次執行就一個事件流,所以可以將聚合簡化到一次匹配過程中,因此我們對於Flink的聚合操作做了改造,使其更適合端上的場景。

那麼聚合的腳本寫法如下:

.followed_by('middle').where(SimpleCondition())
.next_('end').where(self.end_filter)
.group_by('group_by').fields('start', 'userId')

這裡聲明了,以 start節點中的 userId作為聚合的節點,我們就會得到如下的 Pattern鍊錶:

閒魚端側如何實現實時CEP引擎 7

在解析 group_by節點的時候,我們需要做個特殊處理,判斷如果有聚合節點,我們就需要再 $end$節點和前面節點之間插入一個聚合的節點和哨兵位節點,哨兵位節點命名為 $aggregationStartState$,最終效果如下圖所示:

閒魚端側如何實現實時CEP引擎 8

在NFA匹配的過程中,當匹配結束,就可以將匹配到的事件流,傳到聚合節點,再進一步聚合。$aggregationStartState$節點和 group_by節點之間,是通過proceed結合,不需要滿足特定條件就可以執行。

具體的實現過程如下,可見與Flink不同的是,我們創建了一個特殊的 State節點 AggregationState

def __create_aggregation_state(self, sink_state):
# 渠道聚合节点的condition
    _aggregation_condition = self.__current_pattern.get_aggregation_condition()

# 创建AggregationState
    not_next = AggregationState(
        self.__current_pattern.get_name(),
StateType.Normal,
        _aggregation_condition.get_key_by_state_name(),
        _aggregation_condition.get_field())
    self.__states.append(not_next)

# 获取take的条件
    take_condition = self.__get_take_condition(self.__current_pattern)
    not_next.add_take(sink_state, take_condition)

# 将游标指向上一个节点
    self.__following_pattern = self.__current_pattern
    self.__current_pattern = self.__current_pattern.get_previous()

return not_nex

Show me the code

講了太多原理的東西,接下來看下代碼裡面如何工作的,先來看下如何來編寫一個CEP策略。

策略腳本

現在看下如何寫一個完整的python版本的cep規則,以寶貝詳情頁為例,規則描述如下:

需要匹配用戶查看3次寶貝詳情頁

那規則的寫法如下:

_pattern = Pattern.begin('e1').where(KVCondition('scene', 'Page_xyItemDetail')).times(3)

# 2. 将需要匹配的事件流_batch_data和待匹配的Pattern
# CEP内部会先将pattern转化成NFA,然后再用NFA去匹配事件流
_cep = CEP.pattern(_batch_data['eventSeq'], _pattern)

# 用来选择的逻辑
def select_function(data):
pass

# 3. 匹配完成,通过cep的select接口查询匹配到的结果
self.result = _cep.select(select_function)

CEP.pattern()函數里面,會先創建 NFA,然後去進行匹配,可見整個匹配策略腳本非常的短小精悍。

生成NFA

如下代碼用來將 Pattern鍊錶轉化成 NFA圖:

if self.__current_pattern.get_quantifier().get_consuming_strategy() == ConsumingStrategy.NOT_FOLLOW:
raiseException('NotFollowedBy is not supported as a last part of a Pattern!')
# 校验Pattern的名称,必须唯一
self.__check_pattern_name_uniqueness()
# 校验Pattern的策略
self.__check_pattern_skip_strategy()
# 首先创建Final节点
sink_state = self.__create_ending_state()
# 判定是否有聚合节点
if self.__current_pattern.get_aggregation_condition() isnotNone:
# 首先创建聚合节点
    sink_state = self.__create_aggregation_state(sink_state)
# 然后创建聚合几点的起始节点
    sink_state = self.__create_aggregation_start_state(sink_state)
# 创建状态机中的中间节点,此函数会循环知道Start节点的Pattern
sink_state = self.__create_middle_states(sink_state)
# 最后创建Start节点
self.__create_start_state(sink_state)
# 根据state列表和window来创建NFA
return NFA(self.__states, self.__window_time, False)

效果

閒魚已經上了幾個策略,整體看來比較穩定,不過還有很多優化的空間。從實測效果來看,端側從觸發策略到執行Action用時不會超過1s,其中還包含了一次網絡請求的時間。

性能數據

  • 執行時間

閒魚端側如何實現實時CEP引擎 9

單個腳本,執行時間大概在100ms左右。

  • 內存使用

閒魚端側如何實現實時CEP引擎 10

現在內存使用峰值還是比較高,大概在15M左右。關於內存過大的問題,目前正在討論一個方案:Python CEP可以持久化當前NFA的狀態,然後再觸發策略的時候,只帶從未觸發過的事件流,避免很多重複計算。之前運行一次腳本要處理500個事件,現在可能就縮減到100之內,可以極大的減小內存消耗。同時帶來另外一個問題,就是執行腳本的都會有一個IO操作,耗時會增加。

Flink與客戶端對比

現在對於Flink和客戶端Python CEP做一個簡單的對比:

閒魚端側如何實現實時CEP引擎 11

相比Flink,端側CEP還是有它的優勢,在端側可以直接利用客戶端的埋點信息進行計算,運行時長縮減了80%,而且也支持動態發布。 Python腳本支持2端通投,在保證2端埋點一致的前提下,也極大的減少了維護成本。

未來

現在端計算還存在很多待優化的地方:

  1. 端計算是用Python實現,無法做到像Flink的狀態機常駐內存,每次都要重新創建匹配,帶來了額外的消耗
  2. 在事件流的清洗上面,現在是通過回朔拿到之前的事件流,存在大量的重複計算,後續可以藉鑑Flink的Window機制來進行優化。
  3. 目前編譯器暫時還不支持Group Pattern,後續還要對其進行擴展。
  4. Python腳本現在還是需要手動編寫,後續還可以考慮通過DSL來自動生成。

整體看來,Python腳本執行策略還是有一定的性能損耗,不管是在創建NFA或者是匹配過程,後續可以考慮將匹配引擎用C++實現,然後真正做到常駐內存,從而做到高效的執行效率。後期做到NFA持久化之後,C++也可以復用整套持久化協議,從而優化整個引擎的執行效率。除此之外,策略在執行的過程中,還可以考慮用TensorFlowLite優化參數策略參數,從而真正做到千人前面的策略。

參考文檔

  1. 對於Flink的理解
  2. CEP in Flink(1) – CEP規則解析
  3. https://flink.apache.org/
  4. 《Efficient Pattern Matching over Event Streams》
  5. https://github.com/apache/flink 1

本文轉載自公眾號閒魚技術(ID:XYtech_Alibaba)。

原文鏈接

https://mp.weixin.qq.com/s/_jPQvX3aiKp4ScnqgHgyow