Categories
程式開發

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼)


1 Disruptor 簡介

Disruptor 旨在在異步事件處理體系結構中提供低延遲,高吞吐量的工作隊列。它確保任何數據僅由一個線程擁有以進行寫訪問,因此與其他結構相比,減少了寫爭用。目前,包括 Apache Storm、Camel、Log4j2 在內的很多知名項目都應用了 Disruptor 以獲取高性能。

SOFATracer 也是基於Disruptor 高性能無鎖循環隊列來提供異步打印日誌到本地磁盤能力的,SOFATracer 提供兩種類似的日誌打印類型即摘要日誌和統計日誌,摘要日誌:每一次調用均會落地磁盤的日誌;統計日誌:每隔一定時間間隔進行統計輸出的日誌;無論是哪種日誌的輸出,對於SOFATracer 來說都需要保證較高的性能,以降低對於業務整體流程耗時的影響。

關於 Disruptor 的 一些原理分析可以參考:

Disruptor:https://ifeve.com/disruptor/

A High Performance Inter-Thread Messaging Library 高性能的線程間消息傳遞庫。

2 案例

先通過 Disruptor 的一個小例子來有個直觀的認識;先看下它的構造函數:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 1

  • eventFactory : 在環形緩衝區中創建事件的 factory;
  • ringBufferSize:環形緩衝區的大小,必須是2的冪;
  • threadFactory:用於為處理器創建線程;
  • producerType:生成器類型以支持使用正確的sequencer和publisher創建RingBuffer;枚舉類型,SINGLE、MULTI兩個項。對應於 SingleProducerSequencer和MultiProducerSequencer兩種Sequencer;
  • waitStrategy : 等待策略;

如果我們想構造一個 disruptor,那麼我們就需要上面的這些組件。從 eventFactory 來看,還需要一個具體的 Event 來作為消息事件的載體。 【下面按照官方給的案例進行簡單的修改作為示例】

消息事件 LongEvent ,能夠被消費的數據載體

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 2

創建消息事件的 factory

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 3

ConsumerThreadFactory

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 4

OK ,上面的這些可以滿足創建一個 disruptor 了:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 5

現在是已經有了 disruptor 了,然後通過:start 來啟動:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 6

到這裡,已經構建了一個disruptor;但是目前怎麼使用它來發布消息和消費消息呢?

發布消息

下面在 for 循環中 發布 5 條數據:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 7

消息已經發布,下面需要設定當前 disruptor 的消費處理器。前面已經有個 LongEvent 和 EventFactory ; 在 disruptor 中是通過 EventHandler 來進行消息消費的。

編寫消費者代碼

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 8

將 eventHandler 設置到 disruptor 的處理鏈上:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 9

運行結果(這裡)

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 10

3 基本概念和原理

Disruptor

整個基於 ringBuffer 實現的生產者消費者模式的容器。主要屬性:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 11

  • ringBuffer:內部持有一個 RingBuffer 對象,Disruptor 內部的事件發布都是依賴這個 RingBuffer 對象完成的;
  • executor:消費事件的線程池;
  • consumerRepository:提供存儲庫機制,用於將 EventHandler 與 EventProcessor 關聯起來;
  • started : 用於標誌當前 Disruptor 是否已經啟動;
  • exceptionHandler : 異常處理器,用於處理 BatchEventProcessor 事件週期中 uncaught exceptions;

RingBuffer

環形隊列【實現上是一個數組】,可以類比為 BlockingQueue 之類的隊列,ringBuffer 的使用,使得內存被循環使用,減少了某些場景的內存分配回收擴容等耗時操作。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 12

  • E:在事件的交換或併行協調期間存儲用於共享的數據的實現 -> 消息事件;

Sequencer

RingBuffer 中生產者的頂級父接口,其直接實現有 SingleProducerSequencer 和 MultiProducerSequencer;對應 SINGLE、MULTI 兩個枚舉值。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 13

EventHandler

事件處置器,改接口用於對外擴展來實現具體的消費邏輯。如上面 Demo 中的 LongEventHandler ;

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 14

  • event : RingBuffer 已經發布的事件;
  • sequence : 正在處理的事件的序列號;
  • endOfBatch : 用來標識否是來自 RingBuffer 的批次中的最後一個事件;

SequenceBarrier

消費者路障,規定了消費者如何向下走。事實上,該路障算是變向的鎖。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 15

waitStrategy 決定了消費者採用何種等待策略。

WaitStrategy

Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.

EventProcessor 的等待策略;具體實現在 disruptor 中有 8 種:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 16

這些等待策略不同的核心體現是在如何實現 waitFor 這個方法上。

EventProcessor

事件處理器,實際上可以理解為消費者模型的框架,實現了線程 Runnable 的 run 方法,將循環判斷等操作封在了裡面。該接口有三個實現類:

1、BatchEventProcessor

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 17

  • xceptionHandler:異常處理器;
  • DataProvider:數據來源,對應 RingBuffer;
  • EventHandler:處理 Event 的回調對象;
  • SequenceBarrier:對應的序號屏障;
  • TimeoutHandler:超時處理器,默認情況為空,如果要設置,只需要要將關聯的 EventHandler 實現 TimeOutHandler 即可;

如果我們選擇使用 EventHandler 的時候,默認使用的就是 BatchEventProcessor,它與 EventHandler 是一一對應,並且是單線程執行。

如果某個 RingBuffer 有多個 BatchEventProcessor,那麼就會每個 BatchEventProcessor 對應一個線程。

2、WorkProcessor

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 18

基本和 BatchEventProcessor 類似,不同在於用於處理 Event 的回調對像是 WorkHandler。

原理圖

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 19

無消費者情況下,生產者保持生產,但是 remainingCapacity 保持不變。

在寫 Demo 的過程中,本來想通過不設定消費者來觀察 RingBuffer 可用容量變化的。但是驗證過程中,一直得不到預期的結果,(注:沒有設置消費者,只有生產者),先看結果:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 20

從結果來看,remainingCapacity 的值應該隨著 發布的數量 遞減的;但是實際上它並沒有發生任何變化。

來看下 ringBuffer.remainingCapacity() 這個方法:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 21

這裡面又使用 sequencer.remainingCapacity() 這個方法來計算的。上面的例子中使用的是 ProducerType.SINGLE,那來看 SingleProducerSequencer 這個里面 remainingCapacity 的實現。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 22

來解釋下這段代碼的含義:

假設當前ringBuffer 的bufferSize 是8 ;上次申請到的序列號是5,其實也就是說已經生產過佔用的序列號是5;假設當前已經消費到的序列號是3,那麼剩餘的容量為:8 -(5-2) = 5。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 23

因為這裡我們可以確定 bufferSize 和 produced 的值了,那麼 remainingCapacity 的結果就取決於 getMinimumSequence 的計算結果了。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 24

這個方法是從 Sequence 數組中獲取最小序列 。如果 sequences 為空,則返回 minimum。回到上一步,看下 sequences 這個數組是從哪裡過來的,它的值在哪裡設置的。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 25

gatingSequences是 SingleProducerSequencer 父類 AbstractSequencer 中的成員變量:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 26

gatingSequences 是在下面這個方法裡面來管理的。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 27

這個方法的調用棧向前追溯有這幾個地方調用了:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 28

WorkerPool 來管理多個消費者;hangdlerEventsWith 這個方法也是用來設置消費者的。但是在上面的測試案例中我們是想通過不設定消費者只設定生成者來觀察環形隊列的佔用情況,所以 gatingSequences 會一直是空的,因此在計算時會把 produced 的值作為 minimum 返回。這樣每次計算就相當於:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 29

也就驗證了為何在不設定消費者的情況下,remainingCapacity 的值會一直保持不變。

4 SOFATracer 中 Disruptor 實踐

SOFATracer 中,AsyncCommonDigestAppenderManager 對 Disruptor 進行了封裝,用於處理外部組件的Tracer摘要日誌。該部分借助 AsyncCommonDigestAppenderManager 的源碼來分析下 SOFATracer 如何使用Disruptor 的。

SOFATracer 中使用了兩種不同的事件模型,一種是 SOFATracer 內部使用的 StringEvent , 一種是外部擴展使用的 SofaTacerSpanEvent。這里以 SofaTacerSpanEvent 這種事件模型來分析。 StringEvent 消息事件模型對應的是 AsyncCommonAppenderManager 類封裝的disruptor。

SofaTracerSpanEvent ( -> LongEvent)

定義消息事件模型,SofaTacerSpanEvent 和前面 Demo 中的 LongEvent 基本結構是一樣的,主要是內部持有的消息數據不同,LongEvent 中是一個 long 類型的數據,SofaTacerSpanEvent 中持有的是 SofaTracerSpan 。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 30

Consumer ( -> LongEventHandler)

Consumer 是 AsyncCommonDigestAppenderManager 的內部類;實現了 EventHandler 接口,這個 consumer 就是作為消費者存在的。

在 AsyncCommonAppenderManager 中也有一個,這個地方個人覺得可以抽出去,這樣可以使得AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager 的代碼看起來更乾淨。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 31

SofaTracerSpanEventFactory (-> LongEventFactory)

用於產生消息事件的 Factory。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 32

ConsumerThreadFactory (-> LongEventThreadFactory )

用來產生消費線程的 Factory。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 33

構建 Disruptor

Disruptor 的啟動委託給了 AsyncCommonDigestAppenderManager 的 start 方法來執行。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 34

來看下,SOFATracer 中具體是在哪裡調用這個 start 的:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 35

  • CommonTracerManager : 這個里面持有了 AsyncCommonDigestAppenderManager 類的一個單例對象,並且是 static 靜態代碼塊中調用了 start 方法;這個用來輸出普通日誌;
  • SofaTracerDigestReporterAsyncManager:這裡類裡面也是持有了AsyncCommonDigestAppenderManager 類的一個單例對像,並且提供了 getSofaTracerDigestReporterAsyncManager 方法來獲取該單例,在這個方法中調用了 start 方法;該對像用來輸出摘要日誌;

發布事件

前面的 Demo 中是通過一個 for 循環來發布事件的,在 SOFATracer 中的事件發布無非就是當有 Tracer 日誌需要輸出時會觸發發布,那麼對應的就是日誌的 append 操作,將日誌 append 到環形緩衝區。

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 36

SOFATracer 事件發布的調用邏輯:

螞蟻金服分佈式鏈路跟踪組件 SOFATracer 中 Disruptor 實踐(含源碼) 37

追溯調用的流程,可以知道當前 span 調用 finish 時或者 SOFATracer 中調用 reportSpan 時就相當於發布了一個消息事件。

5 小結

本文對 SOFATracer 中使用 Disruptor 來進行日誌輸出的代碼進行了簡單的分析,更多內部細節原理可以自行看下SOFATracer 的代碼。 SOFATracer 作為一種比較底層的中間件組件,在實際的業務開發中基本是無法感知的。但是作為技術來學習,還是有很多點可以挖一挖。

SOFATracer:https://github.com/sofastack/sofa-tracer

如果有小伙伴對中間件感興趣,歡迎加入我們團隊,歡迎來撩;對 SOFAStack 技術體係有興趣的可以關注 SOFAStack:https://www.sofastack.tech/community/

本文歸檔在 sofastack.tech。

本文轉載自公眾號金融級分佈式架構(ID:Antfin_SOFA)。

原文鏈接

https://mp.weixin.qq.com/s/Cl30-5Ywnvp2qSN1FhLtAw