Categories
程式開發

PyFlink + 區塊鏈?揭秘行業領頭企業BTC.com 如何實現實時計算


大家好,我們是BTC.com 團隊。 2020 年,我們有幸接觸到了Flink 和PyFlink 生態,從團隊自身需求出發,完善了團隊內實時計算的任務和需求,搭建了流批一體的計算環境。

在實現實時計算的過程中,我們在實踐中收穫了一些經驗,在此分享一些這方面的心路歷程。主要分享的大綱如下:

困惑• 描述• 思考• 行動流批一體的架構架構效果Zeppelin、PyFlink on K8S 等實踐ZeppelinPyFlink on K8S區塊鏈領域實踐展望• 總結

01 困惑• 描述• 思考• 行動

作為工程師,我們每天都在不斷地了解需求,研發業務。

有一天,我們被拉到了一次團隊總結會議上,收到了以下的需求:

銷售總監A:

我們想要知道銷售的歷史和實時轉化率、銷售額,能不能統計一下實時的TOP5 的商品,還有就是大促時候,用戶實時訪問、商品實時瀏覽量TOP5 的情況呢,可以根據他歷史訪問的記錄實時推薦相關的嗎?

市場總監B:

我們想要知道市場推廣的效果,每次活動的實時數據,不然我們的市場投放無法準確評估效果,及時反饋啊。

研發總監C:

有些用戶的Bug 無法復現,日誌可以再實時一點嗎?傳統日誌分析,需要一定的梳理,可不可以直接清洗/ 處理相關的數據?

採購總監D:

這些年是不是流行數字化,採購這邊想預測採購需求,做一下實時分類和管理支出,預測未來供應來源,完善一下成本。這個有辦法做嗎?還有有些供應商不太穩定啊,能監控到他們的情況嗎?

運維總監E:

網站有時候訪問比較慢,沒有地方可以看到實時的機器情況,搞個什麼監控大屏,這個有辦法解決嗎?

部門領導F:

可以實現上面的人的需求嗎。

做以上的了解之後,才發現,大家對於數據需求的渴望程度,使用方不僅需要歷史的數據,而且還需要實時性的數據。

在電商、金融、製造等行業,數據有著迅猛的增長,諸多的企業面臨著新的挑戰,數據分析的實時處理框架,比如說做一些實時數據分析報表、實時數據處理計算等。

和大多數企業類似,在此之前,我們是沒有實時計算這方面的經驗和積累的。這時,就開始困惑了,怎樣可以更好地做上面的需求,在成本和效果之間取得平衡,如何設計相關的架構?

PyFlink + 區塊鏈?揭秘行業領頭企業BTC.com 如何實現實時計算 1

窮則思變,在有了困惑以後,我們就開始準備梳理已有的條件和我們到底需要什麼。

我們的業務範圍主要在區塊鏈瀏覽器與數據服務、區塊鏈礦池、多幣種錢包等。在區塊鏈瀏覽器的業務裡,BTC.com 目前已是全球領先的區塊鏈數據服務平台,礦池業務在業內排行第一,區塊鏈瀏覽器也是全球前三大瀏覽器之一。

首先,我們通過parser 解析區塊鏈上的數據,得到各方面的數據信息,可以分析出每個幣種的地址活躍度、地址交易情況、交易流向、參與程度等內容。目前,BTC.com 區塊鏈瀏覽器與行業內各大礦池和交易所等公司都有相關合作,可以更好地實現一些數據的統計、整理、歸納、輸出等。

面向的用戶,不僅有專業的區塊鏈開發人員,也有各樣的B 端和C 端用戶,C 端用戶可以進行區塊鏈地址的標註,智能合約的運行,查看智能合約相關內容等,以及鏈上數據的檢索和查看。 B 端用戶則有更專業的支持和指導,提供API、區塊鏈節點等一些的定制以及交易加速、鏈上的業務合作、數據定制等。

從數據量級來講,截至目前,比特幣大概有5 億筆交易,3000 多萬地址,22 億輸出(output:每筆交易的輸出),並且還在不斷增長中。以太坊的話,則更多。而BTC.com 的礦池和區塊鏈瀏覽器都支持多幣種,各幣種的總數據量級約為幾十T。

礦池是礦工購買礦機設備後連接到的服務平台,礦工可以通過連接礦池從而獲取更穩定的收益。這是一個需要保證7 * 24 小時穩定的服務,裡面有礦機不斷地提交其計算好的礦池下發的任務的解,礦池將達到網絡難度的解進行廣播。這個過程也可以認為是近乎是實時的,礦機通過提交到服務器,服務器內部再提交到Kafka 消息隊列,同時有一些組件監聽這些消息進行消費。而這些提交上來的解可以從中分析出礦機的工作狀態、算力、連接情況等。

在業務上,我們需要進行歷史數據和實時數據的計算。

歷史數據要關聯一些幣價,歷史交易信息,而這些交易信息需要一直保存,是一種典型的批處理任務。

每當有新區塊的確認,就有一些數據可以得到處理和分析,比如某個地址在這個區塊裡發生了一筆交易,那麼可以從其交易流向去分析是什麼樣的交易,挖掘交易相關性。或者是在這個區塊裡有一些特殊的交易,比如segwit 的交易、比如閃電網絡的交易,就是有一些這個幣種特有的東西可以進行解析分析和統計。並且在新區塊確認時的難度預測也有所變化。

還有就是大額交易的監控,通過新區塊的確認和未確認交易,鎖定一些大額交易,結合地址的一些標註,鎖定交易流向,更好地進行數據分析。

還有是一些區塊鏈方面的OLAP 方面的需求。

PyFlink + 區塊鏈?揭秘行業領頭企業BTC.com 如何實現實時計算 2

總結了在數據統計方面的需求和問題以後,我們就開始進行思考:什麼是最合適的架構,如何讓人員參與少、成本低?

解決問題,無非就是提出假設,通過度量,然後刷新認知。

PyFlink + 區塊鏈?揭秘行業領頭企業BTC.com 如何實現實時計算 3

在瀏覽了一些資料以後,我們認為,大部分的計算框架都是通過輸入,進行處理,然後得到輸出。首先,我們要獲取到數據,這里數據可以從MySQL 也可以從Kafka,然後進行計算,這裡計算可以是聚合,也可以是TOP 5 類型的,在實時的話,可能還會有窗口類型的。在計算完之後,將結果做下發,下發到消息渠道和存儲,發送到微信或者釘釘,落地到MySQL 等。

團隊一開始嘗試了Spark,搭建了Yarn,使用了Airflow 作為調度框架,通過做MySQL 的集成導入,開發了一些批處理任務,有著離線任務的特點,數據固定、量大、計算週期長,需要做一些複雜操作。

在一些批處理任務上,這種架構是穩定的,但是隨著業務的發展,有了越來越多的實時的需求,並且實時的數據並不能保證按順序到達,按時間戳排序,消息的時間字段是允許前後有差距的。在數據模型上,需求驅動式的開發,成本相對來說,Spark 的方式對於當時來說較高,對於狀態的處理不是很好,導致影響一部分的效率。

其實在2019 年的時候,就有在調研一些實時計算的事情,關注到了Flink 框架,當時還是以Java 為主,整體框架概念上和Spark 不同,認為批處理是一種特殊的流,但是因為團隊沒有Java 方面的基因和沈淀,使用Flink 作為實時計算的架構,在當時就暫告一個段落。

在2020 年初的時候,不管是 Flink 社區” 還是InfoQ,還是B 站,都有在推廣PyFlink,而且當時尤其是程鶴群[1]和孫金城[2]的視頻以及孫金城老師的博客[3]的印象深刻。於是就想嘗試PyFlink,其有著流批一體的優勢,而且還支持Python 的一些函數,支持pandas,甚至以後還可以支持Tensorflow、Keras,這對我們的吸引力是巨大的。在之後,就在構思我們的在PyFlink 上的流批一體的架構。

02 流批一體的架構

架構

首先我們要梳理數據,要清楚數據從哪裡來。在以Spark 為主的時期,數據是定期從數據源加載(增量)數據,通過一定的轉換邏輯,然後寫入目的地,由於數據量和業務需要,延遲通常在小時級別,而實時的話,需要盡可能短的延遲,因此將數據源進行了分類,整體分成了幾部分,一部分是傳統的數據我們存放在MySQL 持久化做保存,這部分之後可以直接作為批處理的計算,也可以導入Hive ,做進一步的計算。實時的部分,實際上是有很多思路,一種方式是通過MySQL 的Binlog 做解析,還有就是MySQL 的CDC 功能,在多方考量下,最後我們選擇了Kafka,不僅是因為其是優秀的分佈式流式平台,而且團隊也有對其的技術沉澱。

並且實際上在本地開發的時候,安裝Kafka 也比較方便,只需要Brew Install Kafka,而且通過Conduktor 客戶端,也可以方便的看到每個Topic 的情況。於是就對現有的Parser 進行改造,使其支持Kafka,在當收到新的區塊時,會立即向Kafka 發送一個消息,然後進行處理。

大概是在2018 年的時候,團隊將整體的業務遷移到了Kubernetes 上,在業務不斷發展的過程中,其對開發和運維上來說,減輕了很多負擔,所以建議有一定規模的業務,最好是遷移到Kubernetes,其對成本的優化,DevOps,以及高可用的支持,都是其他平台和傳統方式無法比擬的。

在開發作業的過程中,我們在盡可能的使用Flink SQL,同時結合一些Java、Python 的UDF、UDAF、UDTF。每個作業通過初始化類似於以下的語句,形成一定的模式:

self.source_ddl=""'
CREATE TABLE SourceTable (xxx int) WITH
'''
self.sink_ddl=""'
CREATE TABLE SinkTable (xxx int) WITH
'''
self.transform_ddl=""'
INSERT INTO SinkTable
SELECT udf(xxx)
FROM SourceTable
GROUP BY FROM_UNIXTIME(`timestamp`, 'yyyyMMdd')
'''

在未來的話,會針對性地將數據進行分層,按照業界通用的ODS、DWD、DWS、ADS,分出原始層,明細層和匯總層,進一步做好數據的治理。

效果

最終我們團隊基於PyFlink 開發快速地完成了已有的任務,部分是批處理作業,處理過去幾天的數據,部分是實時作業,根據Kafka 的消息進行消費,目前還算比較穩定。

部署時選擇了Kubernetes,具體下面會進行分享。在K8S 部署了Jobmanager 和Taskmanager,並且使用Kubernetes 的job 功能作為批處理作業的部署,之後考慮接入一些監控平台,比如Prometheus 之類的。

在成本方面,由於是使用的Kubernetes 集群,因此在機器上只有擴展主機的成本,在這種方式上,成本要比傳統的Yarn 部署方式要低,並且之後Kuberntes 會支持原生部署,在擴展Jobmanager 和Taskmanager 上面會更加方便。

03 Zeppelin、PyFlink on K8S 等實踐

Zeppelin 是我們用來進行數據探索和邏輯驗證,有些數據在本地不是真實數據,利用Zeppelin 連接實際的鏈上數據,進行計算的邏輯驗證,當驗證完成後,便可轉換成生產需要的代碼進行部署。

PyFlink + 區塊鏈?揭秘行業領頭企業BTC.com 如何實現實時計算 4

一、Kubernetes 上搭建PyFlink 和Zeppelin

整理後的部署Demo 在github,可以參閱相關鏈接[4]。關於配置文件,修改以下配置的作用。

(1). 修改configmap 的flink-conf.yaml 文件的taskmanager 配置。

taskmanager.numberOfTaskSlots: 10

調整Taskmanager 可以調整運行的job 的數量。

(2). 在Zeppelin 的dockerfile 中修改zeppelin-site.xml 文件。

cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml;
sed -i 's#127.0.0.1#0.0.0.0#g' conf/zeppelin-site.xml;
sed -i 's#auto#local#g' conf/zeppelin-site.xml

修改請求來源為0.0.0.0,如果是線上環境,建議開啟白名單,加上auth 認證。修改interpreter 的啟動模式為local,auto 會導致在集群啟動時,以K8s 的模式啟動,目前K8s 模式只支持Spark,local 模式可以理解為,Zeppelin 將在本地啟動一個連接Flink 的interpreter 進程。 Zeppelin 和在本地提交Flink 作業類似,也需要PyFlink 的基礎環境,所以需要將Flink 對應版本的jar 包放入鏡像內。

Zeppelin 的ingress 中添加websocket 配置。

nginx.ingress.kubernetes.io/configuration-snippet: |
proxy_set_header Upgrade "websocket";
proxy_set_header Connection "Upgrade";

Zeppelin 在瀏覽器需要和server 端建立socket 連接,需要在ingress 添加websocket 配置。

Flink 和Zeppelin 數據持久化的作用。

volumeMounts:
- mountPath: /zeppelin/notebook/
name: data
volumes:
- name: data
persistentVolumeClaim:
claimName: zeppelin-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: zeppelin-pvc
spec:
storageClassName: efs-sc
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi

對Flink 的/opt/flink/lib 目錄做持久化的目的,是當我們需要新的jar 包時,可以直接進入Flink 的pod 進行下載,並存放到lib 目錄,保證jobmanager 和taskmanager 的jar 版本一致,同時也無需更換鏡像。 Zeppelin 的任務作業代碼會存放在/zeppelin/notebook/ 目錄下,目的是方便保存編寫好的代碼。

Flink 命令提交job 作業的方式。

(1). 本地安裝PyFlink,Python 需要3.5及以上版本。

$ pip3 install apache-flink==1.11.1

(2). 測試Demo

def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
sink_ddl = """
create table Results (word VARCHAR, `count` BIGINT) with ( 'connector' = 'print')
"""
t_env.sql_update(sink_ddl)
elements = [(word, 1) for word in content.split(" ")]
# 这里也可以通过 Flink SQL
t_env.from_elements(elements, ["word", "count"])
.group_by("word")
.select("word, count(1) as count")
.insert_into("Results")
t_env.execute("word_count")

if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
word_count()

或者是實時處理的Demo:

def handle_kafka_message():
s_env = StreamExecutionEnvironment.get_execution_environment()
# s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
source_ddl=""'
CREATE TABLE SourceTable (
word string
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'Topic',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2121',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
'''
sink_ddl = """
create table Results (word VARCHAR) with ('connector' = 'print')
"""
st_env.sql_update(sink_ddl)
st_env.sql_update(source_ddl)
st_env.from_path("source").insert_into("sink")
st_env.execute("KafkaTest")

if __name__ == '__main__':
handle_kafka_message()

(3). 本地測試Flink 命令提交job 作業。

$ flink run -m localhost:8081 -py word_count.py
python/table/batch/word_count.py
Job has been submitted with JobID 0a31b61c2f974bcc3f344f57829fc5d5
Program execution finished
Job with JobID 0a31b61c2f974bcc3f344f57829fc5d5 has finished.
Job Runtime: 741 ms

(4). 如果存在多個Python 文件,可以先zip 打包後再進行提交作業。

$ zip -r flinkdemo.zip ./*
$ flink run -m localhost:8081 -pyfs flinkdemo.zip -pym main

(5). Kubernetes 通過集群的CronJob 定時調度來提交Job,之後會做自研一些UI 後台界面做作業管理與監控。

04 在區塊鏈領域實踐

隨著區塊鏈技術的越來越成熟,應用越來越多,行業標準化、規範化的趨勢也開始顯現,也越來越依賴於雲計算、大數據,畢竟是數字經濟的產物。 BTC.com 也在紮根於區塊鏈技術基礎設施,為各類公司各類應用提供數據和業務上的支持。

近些年,有個詞火遍了IT 業界,中台,不管是大公司還是創業公司,都喜歡扯上這個概念,號稱自己業務中台,數據中台等。我們的理解中,中台是一種整合各方面資源的能力,從傳統的單兵作戰,到提升武器裝備後勤保障,提升作戰能力。在數據上打破數據孤島,在需求快速變化的前台和日趨穩定的後台中取得平衡。而中台更重要的是服務,最終還是要回饋到客戶,回饋到合作夥伴。

在區塊鏈領域,BTC.com 有著深厚的行業技術積累,可以提供各方面數據化的能力。比如在利用機器學習進行鏈上數據的預估,預估eth 的gas price,還有最佳手續費等,利用keras 深度學習的能力,進行一些回歸計算,在之後也會將Flink、機器學習和區塊鏈結合起來,對外提供更多預測類和規範化分類的數據樣本,之前是在用定時任務不斷訓練模型,與Flink 結合之後,會更加實時。在這方面,以後也會提供更多的課題,比如幣價與Defi,輿情,市場等的關係,區塊鏈地址與交易的標註和分類。甚至於將機器學習訓練的模型,放於IPFS 網絡中,通過去中心化的代幣進行訓練,提供方便調用樣本和模型的能力。

在目前,BTC.com 推出了一些通過數據挖掘實現的能力,包括交易推送、OLAP 鏈上分析報表等,改善和提升相關行業和開發者實際的體驗。我們在各種鏈上都有監控節點,監控各區塊鍊網絡的可用性、去中心化程度,監控智能合約。在接入一些聯盟鏈、隱私加密貨幣,可以為聯盟鏈、隱私加密貨幣提供這方面的數據能力。

BTC.com 將為區塊鏈產業生態發展做出更多努力,以科技公司的本質,以技術發展為第一驅動力,以市場和客戶為導向,開發創新和融合應用,做好基礎設施。

05 展望與總結

從實時計算的趨勢,到流批一體的架構,通過對PyFlink 和Flink 的學習,穩定在線上運行了多種作業任務,對接了實際業務需求。並且搭建了Zeppelin 平台,使得業務開發上更加方便。在計算上盡可能地依賴SQL,方便各方面的集成與調試。

在社區方面,PyFlink 也是沒有令我們失望的,較快的響應能力,不斷完善的文檔。在Confluence[5]上也可以看到一些Flink Improvement Proposals,其中也有一些是PyFlink 相關的,在不遠的將來,還會支持Pandas UDAF,DataStream API,ML API,也期望在之後可以支持Joblistener,總之,在這裡也非常感謝相關團隊。

未來的展望,總結起來就是,通過業務實現數據的價值化。而數據中台的終局,是將數據變現。

更多 Pylink 詳情了解,請參考 PyFlink 社區扶持計劃“。

PyFlink + 區塊鏈?揭秘行業領頭企業BTC.com 如何實現實時計算 5

參考鏈接:

[1]https://www.bilibili.com/video/BV1yt4y127sL

[2]https://www.bilibili.com/video/BV1W7411o7Tj

[3]https://enjoyment.cool

[4]https://github.com/liuyangovo/Flink-Zeppelin-Demo

[5]https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+提案