Categories
程式開發

基於Flink+ClickHouse打造輕量級點擊流實時數倉


Flink 和ClickHouse 分別是實時計算和(近實時)OLAP 領域的翹楚,也是近些年非常火爆的開源框架,很多大廠都在將兩者結合使用來構建各種用途的實時平台,效果很好。關於兩者的優點就不再贅述,本文來簡單介紹筆者團隊在點擊流實時數倉方面的一點實踐經驗。

點擊流及其維度建模

所謂點擊流(click stream),就是指用戶訪問網站、App 等Web 前端時在後端留下的軌跡數據,也是流量分析(traffic analysis)和用戶行為分析(user behavior analysis)的基礎。點擊流數據一般以訪問日誌和埋點日誌的形式存儲,其特點是量大、維度豐富。以我們一個中等體量的普通電商平台為例,每天產生約200GB 左右、數十億條的原始日誌,埋點事件100+ 個,涉及50+ 個維度。

按照Kimball 的維度建模理論,點擊流數倉遵循典型的星形模型,簡圖如下。

基於Flink+ClickHouse打造輕量級點擊流實時數倉 1

點擊流數倉分層設計

點擊流實時數倉的分層設計仍然可以藉鑑傳統數倉的方案,以扁平為上策,盡量減少數據傳輸中途的延遲。簡圖如下。

基於Flink+ClickHouse打造輕量級點擊流實時數倉 2

DIM 層:維度層,MySQL 鏡像庫,存儲所有維度數據。 ODS 層:貼源層,原始數據由Flume 直接進入Kafka 的對應topic。 DWD 層:明細層,通過Flink 將Kafka 中數據進行必要的ETL 與實時維度join 操作,形成規範的明細數據,並寫回Kafka 以便下游與其他業務使用。再通過Flink 將明細數據分別寫入ClickHouse 和Hive 打成大寬表,前者作為查詢與分析的核心,後者作為備份和數據質量保證(對數、補數等)。 DWS 層:服務層,部分指標通過Flink 實時匯總至Redis,供大屏類業務使用。更多的指標則通過ClickHouse 物化視圖等機制週期性匯總,形成報表與頁面熱力圖。特別地,部分明細數據也在此層開放,方便高級BI 人員進行漏斗、留存、用戶路徑等靈活的ad-hoc 查詢,這些也是ClickHouse 遠超過其他OLAP 引擎的強大之處。

要點與註意事項

Flink 實時維度關聯

Flink 框架的異步I/O 機制為用戶在流式作業中訪問外部存儲提供了很大的便利。針對我們的情況,有以下三點需要注意:

使用異步MySQL 客戶端,如Vert.x MySQL Client。 AsyncFunction 內添加內存緩存(如Guava Cache、Caffeine 等),並設定合理的緩存驅逐機制,避免頻繁請求MySQL 庫。實時維度關聯僅適用於緩慢變化維度,如地理位置信息、商品及分類信息等。快速變化維度(如用戶信息)則不太適合打進寬表,我們採用MySQL 表引擎將快變維度表直接映射到ClickHouse 中,而ClickHouse 支持異構查詢,也能夠支撐規模較小的維表join場景。未來則考慮使用MaterializedMySQL 引擎(當前仍未正式發布)將部分維度表通過bi​​nlog 鏡像到ClickHouse。

Flink-ClickHouse Sink 設計

可以通過JDBC(flink-connector-jdbc)方式來直接寫入ClickHouse,但靈活性欠佳。好在clickhouse-jdbc 項目提供了適配ClickHouse 集群的BalancedClickhouseDataSource 組件,我們基於它設計了Flink-ClickHouse Sink,要點有三:

寫入本地表,而非分佈式表,老生常談了。按數據批次大小以及批次間隔兩個條件控制寫入頻率,在part merge 壓力和數據實時性兩方面取得平衡。目前我們採用10000 條的批次大小與15 秒的間隔,只要滿足其一則觸發寫入。 BalancedClickhouseDataSource 通過隨機路由保證了各ClickHouse 實例的負載均衡,但是只是通過週期性ping 來探活,並屏蔽掉當前不能訪問的實例,而沒有故障轉移——亦即一旦試圖寫入已經失敗的節點,就會丟失數據。為此我們設計了重試機制,重試次數和間隔均可配置,如果當重試機會耗儘後仍然無法成功寫入,就將該批次數據轉存至配置好的路徑下,並報警要求及時檢查與回填。

當前我們僅實現了DataStream API 風格的Flink-ClickHouse Sink,隨著Flink 作業SQL 化的大潮,在未來還計劃實現SQL 風格的ClickHouse Sink,打磨健壯後會適時回饋給社區。另外,除了隨機路由,我們也計劃加入輪詢和sharding key hash 等更靈活的路由方式。

還有一點就是,ClickHouse 並不支持事務,所以也不必費心考慮2PC Sink 等保證exactly once 語義的操作。如果Flink 到ClickHouse 的鏈路出現問題導致作業重啟,作業會直接從最新的位點(即Kafka 的latest offset)開始消費,丟失的數據再經由Hive 進行回填即可。

ClickHouse 數據重平衡

ClickHouse 集群擴容之後,數據的重平衡(reshard)是一件麻煩事,因為不存在類似HDFS Balancer 這種開箱即用的工具。一種比較簡單粗暴的思路是修改ClickHouse 配置文件中的shard weight,使新加入的shard 多寫入數據,直到所有節點近似平衡之後再調整回來。但是這會造成明顯的熱點問題,並且僅對直接寫入分佈式表才有效,並不可取。

因此,我們採用了一種比較曲折的方法:將原表重命名,在所有節點上建立與原表schema 相同的新表,將實時數據寫入新表,同時用clickhouse-copier 工具將歷史數據整體遷移到新表上來,再刪除原表。當然在遷移期間,被重平衡的表是無法提供服務的,仍然不那麼優雅。如果大佬們有更好的方案,歡迎交流。

結束

關於Flink 和ClickHouse 等組件的配置、調優、延遲監控、權限管理等知識,筆者在之前的博客中多少講到過,更多詳情請見作者原文鏈接:

https://www.jianshu.com/p/bedead165403