Categories
程式開發

Nexmark: 如何設計一個流計算基準測試?


如何選擇適合自己業務的流計算引擎?除了比較各自的功能矩陣外,基準測試(benchmark)便是用來評估系統性能的一個重要和常見的方法。

然而在流計算領域,目前還沒有一個行業標準的基準測試。本文將探討流計算基準測試設計上的難點,分享如何設計流計算基準測試框架——Nexmark,以及將來的規劃。

背景

隨著數據時效性對企業的精細化運營越來越重要,“實時即未來”、“實時數倉”、“數據湖” 成為了近幾年炙手可熱的詞。流計算領域的格局也在這幾年發生了巨大的變化,Apache Flink 在流批一體的方向上不斷深耕,Apache Spark 的近實時處理有著一定的受眾,Apache Kafka 也有了ksqlDB 高調地進軍流計算,而Apache Storm 卻開始逐漸地退出歷史的舞台。

每一種引擎有其優勢的地方,如何選擇適合自己業務的流計算引擎成了一個由來已久的話題。除了比較各個引擎提供的不同的功能矩陣之外,性能是一個無法繞開的評估因素。基準測試(benchmark)就是用來評估系統性能的一個重要和常見的過程。

二 現有流計算基準測試的問題

目前在流計算領域中,還沒有一個行業標準的基準測試。目前業界較為人知的流計算benchmark 是五年前雅虎Storm 團隊發布的Yahoo Streaming Benchmarks[4]。雅虎的原意是因為業界缺少反映真實場景的benchmark,模擬了一個簡單的廣告場景來比較各個流計算框架,後來被廣泛引用。具體場景是從Kafka 消費的廣告的點擊流,關聯Redis 中的廣告所屬的campaign 信息,然後做時間窗口聚合計數。

然而,正是因為雅虎團隊太過於追求還原真實的生產環境,導致這些外部系統服務(Kafka, Redis)成為了作業的瓶頸。 Ververica 曾在這篇文章[5]中做過一個擴展實驗,將數據源從Kafka 替換成了一個內置的datagen source,性能提升了37 倍!由此可見,引入的Kafka 組件導致了無法準確反映引擎真實的性能。更重要的一個問題是,Yahoo Benchmark 只包含一個非常簡單的,類似“Word Count” 的作業,它無法全面地反映當今復雜的流計算系統和業務。試想,誰會用一個簡單的“Word Count” 去衡量比較各個數據庫之間的性能差異呢?正是這些原因使得Yahoo Benchmark 無法成為一個行業標準的基準測試。這也正是我們想要解決的問題。

因此,我們認為一個行業標準的基準測試應該具備以下幾個特點:

1.可複現性

可複現性是使得benchmark 被信任的一個重要條件。許多benchmark 的結果是難以重現的。有的是因為只擺了個benchmark 結果圖,用於生成這些結果的代碼並沒有公開。有的是因為用於benchmark 的硬件不容易被別人獲取到。有的是因為benchmark 依賴的服務太多,致使測試結果不穩定。

2.能代表和覆蓋行業真實的業務場景(query 量)

例如數據庫領域非常著名的TPC-H、TPC-DS 涵蓋了大量的query 集合,來捕獲查詢引擎之間細微的差別。而且這些query 集合都立於真實業務場景之上(商品零售行業),數據規模大,因此也很受一些大數據系統的青睞。

3.能調整作業的負載(數據量、數據分佈)

在大數據領域,不同的數據規模對於引擎來說可能會是完全不同的事情。例如Yahoo Benchmark 中使用的campaign id 只有100 個,使得狀態非常小,內存都可以裝的下。這樣使得同步IO 和checkpoint 等的影響可以忽略不計。而真實的場景往往要面對大狀態,面臨的挑戰要復雜困難的多。像TPC-DS 的數據生成工具會提供scalar factor 的參數來控制數據量。其次在數據分佈上最好也能貼近真實世界的數據,如有數據傾斜,及調整傾斜比例。從而能全面、綜合地反映業務場景和引擎之間的差異。

4.有統一的性能衡量指標和採集匯總工具

基準測試的性能指標的定義需要清晰、一致,且能適用於各種計算引擎。然而流計算的性能指標要比傳統批處理的更難定義、更難採集。是流計算benchmark 最具挑戰性的一個問題,這也會在下文展開描述。

我們也研究了很多其他的流計算相關的基準測試,包括:StreamBench、HiBench、BigDataBench,但是它們都在上述幾個基本面有所欠缺。基準測試的行業標杆無疑是TPC 發布的一系列benchmark,如TPC-H,TPC-DS。然而這些benchmark 是面向傳統數據庫、傳統數倉而設計的,並不適用於今天的流計算系統。例如benchmark 中沒有考慮事件時間、數據的亂序、窗口等流計算中常見的場景。因此我們不得不考慮重新設計並開源一個流計算基準測試框架——Nexmark。

地址:https://github.com/nexmark/nexmark。

三 Nexmark 基準測試框架的設計

為了提供一個滿足以上幾個基本面的流計算基準測試,我們設計和開發了Nexmark 基準測試框架,並努力讓其成為流計算領域的標準benchmark 。

Nexmark 基準測試框架來源於NEXMark 研究論文[1],以及Apache Beam Nexmark Suite[6],並在其之上進行了擴展和完善。 Nexmark 基準測試框架不依賴任何第三方服務,只需要部署好引擎和Nexmark,通過腳本nexmark/bin/run_query.sh all 即可等待並獲得所有query 下的benchmark 結果。下面我們將探討Nexmark 基準測試在設計上的一些決策。

移除外部source、sink 依賴

如上所述,Yahoo Benchmark 使用了Kafka 數據源,卻使得最終結果無法準確反映引擎的真實性能。此外,我們還發現,在benchmark 快慢流雙流JOIN 的場景時,如果使用了Kafka 數據源,慢流會超前消費(快流易被反壓),導致JOIN 節點的狀態會緩存大量超前的數據。這其實不能反映真實的場景,因為在真實的場景下,慢流是無法被超前消費的(數據還未產生)。所以我們在Nexmark 中使用了datagen source,數據直接在內存中生成,數據不落地,直接向下游節點發送。多個事件流都由單一的數據生成器生成,所以當快流被反壓時,也能抑制慢流的生成,較好地反映了真實場景。

與之類似的,我們也移除了外部sink 的依賴,不再輸出到Kafka/Redis,而是輸出到一個空sink 中,即sink 會丟棄收到的所有數據。

通過這種方式,我們保證了瓶頸只會在引擎自身,從而能精確地測量出引擎之間細微的差異。

指標

批處理系統benchmark 的metric 通常採用總體耗時來衡量。然而流計算系統處理的數據是源源不斷的,無法統計query 耗時。因此,我們提出三個主要的metric:吞吐、延遲、CPU。 Nexmark 測試框架會自動幫我們採集metric,並做匯總,不需要部署任何第三方的metric 服務。

■ 吞吐

吞吐(throughput)也常被稱作TPS,描述流計算系統每秒能處理多少條數據。由於我們有多個事件流,所有事件流都由一個數據生成器生成,為了統一觀測角度,我們採用數據生成器的TPS,而非單一事件流的TPS。我們將一個query 能達到的最大吞吐,作為其吞吐指標。例如,針對Flink 引擎,我們通過Flink REST API 暴露的 .numRecordsOutPerSecond metric 來獲取當前吞吐量。

■ 延遲

延遲(Latency)描述了從數據進入流計算系統,到它的結果被輸出的時間間隔。對於窗口聚合,Yahoo Benchmark 中使用output_system_time – window_end 作為延遲指標,這其實並沒有考慮數據在窗口輸出前的等待時間,這種計算結果也會極大地受到反壓的影響,所以其計算結果是不准確的。一種更準確的計算方式應為output_system_time – max(ingest_time)。然而在非窗口聚合,或雙流JOIN 中,延遲又會有不同的計算方式。

所以延遲的定義和採集在流計算系統中有很多現實存在的問題,需要根據具體query 具體分析,這在參考文獻[2]中有詳細的討論,這也是我們目前還未在Nexmark 中實現延遲metric 的原因。

■CPU

資源使用率是很多流計算benchmark 中忽視的一個指標。由於在真實生產環境,我們並不會限制流計算引擎所能使用的核數,從而給系統更大的彈性。所以我們引入了CPU 使用率,作為輔助指標,即作業一共消耗了多少核。通過吞吐/cores,可以計算出平均每個核對於吞吐的貢獻。對於進程的CPU 使用率的採集,我們沒有使用JVM CPU load,而是藉鑑了YARN 中的實現,通過採樣 /proc//stat 併計算獲得,該方式可以獲得較為真實的進程CPU 使用率。因此我們的Nexmark 測試框架需要在測試開始前,先在每台機器上部署CPU 採集進程。

Query 與Schema

Nexmark 的業務模型基於一個真實的在線拍賣系統。所有的query 都基於相同的三個數據流,三個數據流會有一個數據生成器生成,來控制他們之間的比例、數據偏斜、關聯關係等等。這三個數據流分別是:

用戶(Person):代表一個提交拍賣,或參與競標的用戶。拍賣(Auction):代表一個拍賣品。競標(Bid):代表一個對拍賣品的出價。

我們一共定義了16 個query,所有的query 都使用ANSI SQL 標準語法。基於SQL ,我們可以更容易地擴展query 測試集,支持更多的引擎。然而,由於Spark 在流計算功能上的限制,大部分的query 都無法通過Structured Streaming 來實現。因此我們目前只支持測試Flink SQL 引擎。

Nexmark: 如何設計一個流計算基準測試? 1

作業負載的配置化

我們也支持配置調整作業的負載,包括數據生成器的吞吐量以及吞吐曲線、各個數據流之間的數據量比例、每個數據流的數據平均大小以及數據傾斜比例等等。具體的可以參考Source DDL 參數。

四 實驗結果

我們在阿里雲的三台機器上進行了Nexmark 針對Flink 的基準測試。每台機器均為ecs.i2g.2xlarge 規格,配有Xeon 2.5 GHz CPU (8 vCores) 以及32 GB 內存,800 GB SSD 本地磁盤。機器之間的帶寬為2 Gbps。

測試了Flink-1.11 版本,我們在這3 台機器上部署了Flink standalone 集群,由1 個JobManager,8 個TaskManager (每個只有1 slot)組成,都是4 GB內存。集群默認並行度為8。開啟checkpoint 以及exactly once 模式,checkpoint 間隔3 分鐘。使用RocksDB 狀態後端。測試發現,對於有狀態的query,每次checkpoint 的大小在GB 級以上,所以有效地測試的大狀態的場景。

Datagen source 保持1000 萬每秒的速率生成數據,三個數據流的數據比例分別是Bid: 92%,Auction: 6%,Person: 2%。每個query 都先運行3 分鐘熱身,之後3 分鐘採集性能指標。

運行nexmark/bin/run_query.sh all 後,打印測試結果如下:

Nexmark: 如何設計一個流計算基準測試? 2

五 總結

我們開發和設計Nexmark 的初衷是為了推出一套標準的流計算benchmark 測試集,以及測試流程。雖然目前僅支持了Flink 引擎,但在當前也具有一定的意義,例如:

推動流計算benchmark 的發展和標準化。作為Flink 引擎版本迭代之間的性能測試工具,甚至是日常回歸工具,及時發現性能回退的問題。在開發Flink 性能優化的功能時,可以用來驗證性能優化的效果。部分公司可能會有Flink 的內部版本,可以用作內部版本與開源版本之間的性能對比工具。

當然,我們也計劃持續改進和完善Nexmark 測試框架,例如支持Latency metric,支持更多的引擎,如Spark Structured Streaming, Spark Streaming, ksqlDB, Flink DataStream 等等。也歡迎有誌之士一起加入貢獻和擴展。

參考及引用:

[1]皮特·塔克(Pete Tucker)和克里斯汀·塔夫特(Kristin Tufte)。 “ NEXMark –數據流查詢基準”。 2010年6月。

[2]Jeyhun Karimov和Tilmann Rabl。 “基準測試分佈式流數據處理系統”。 arXiv:1802.08496v2 [cs.DB] 2019年6月

[3]王養軍 “流處理系統基準:StreamBench”。 2016年5月。

[4]https://github.com/yahoo/streaming-benchmarks

[5]https://www.ververica.com/blog/extending-the-yahoo-streaming-benchmark

[6]https://beam.apache.org/documentation/sdks/java/testing/nexmark/