Categories
程式開發

Uber的Kafka實踐:踩坑5年,隨時像替換汽車引擎一樣替換Kafka


Uber從2013年開始使用Kafka。 Uber的主流應用,如打車、外賣等服務,都需要實時處理數據,所有核心的Business都是通過Kafka進行消息的傳遞。這決定了Kafka在Uber的技術平台中佔據非常核心的定位。經過7年的發展,Uber的Kafka集群已經發展成為了全球數一數二的規模,每天處理PB級別的數據、Trillion級別的消息。

從2013年到2018年,Uber主要是踩坑,修復各種Bug。到現階段,整個消息平台已經相當複雜,三分之二的代碼是自研,開源Kafka僅作為平台核心部分。但這個消息平台也不會被Kafka所綁定,當整個系統自動化、標準化工作完成後,又可以用潛在的其他更好的開源軟件,像替換汽車引擎一樣將這個核心替換掉。

近日InfoQ記者採訪了Uber Staff Engineer富羽鵬,了解Uber消息平台實踐過程,他將在QCon全球軟件開發大會(北京站)2020 分享主題為《Uber 大規模實時數據平台架構演進與實踐》的演講。

InfoQ:您認為應用開源Kafka時,它本身有哪些問題是普通使用者必須要解決的? Uber的Kafka有何特殊性?

富羽鵬:Uber部署的Kafka集群規模,是全球數一數二的,每天處理PB級別的數據、Trillion級別的消息。對於一般的小企業,一兩個集群就夠了。在Uber,Kafka的Topic數量非常多,生產者和消費者的量級也非常大,還有很多精細化的不同場景的Use Case,我們會針對不同的場景部署集群,並對容災性有比較多的考慮。

如果說只從代碼量來看,開源Kafka只是我們在整個系統裡面的最核心(Core)的一部分。從生態系統周邊,我們層層把它(Core)包圍起來,幾乎有2/3是Uber自己開發的,來解決各種具體的問題。

所以對於中小企業來說,如果沒有Uber這麼大規模和復雜性的話,建議直接使用付費的解決方案。 Kafka開源幾年了,已經是一個比較成熟的系統,但是到真正生產環境裡,要很好的正確的使用和運維這套系統的話,需要針對不同場景,解決很多的設置和參數調優問題,出了故障還要知道如何快速排查與修復。需要使用者把這套系統吃透,這需要很多年的積累和沈淀。

也就是說對於一個新的企業來說,如果想很快的能把這個開源Kafka使用進去的話,有很多坑要踩,有很多路要走。

但對Uber來說,解決的問題已經不是使用開源軟件踩坑的問題了。 Uber這種級別規模的部署,導致我們會遇到很多其他公司可能遇到不了的一些問題。在高吞吐量、高並發性出現的時候,會觸發很多在普通的流量情況下遇不到的問題。我們Kafka的架構演進,都是因為目前的規模量帶來的,需要我們不斷的對系統進行迭代。

InfoQ:在Uber裡,是否有實時備份機制 ISR機制嗎?一份Topic整體算起來會有多少份備份?

富羽鵬:Uber在備份的問題上考慮的非常多。

備份並不是越多越好。備份越多,管理起來就越麻煩。另外一方面,備份的成本也非常昂貴。 Uber作為一個上市公司,成本是我們非常關注的一點。我們需要保證每一個副本,都是從容災意義上去考慮的。在Uber內部,比如其他團隊或組織裡Kafka的Service用戶,都需要了解他們到底使用了多少副本,重要的集群用四副本,不重要的、允許數據丟失的使用三副本。

與此同時,跨數據中心的拷貝,會略微複雜一些。每個Service需要在哪些Region或者Data Center進行拷貝,需要跟我們的用戶進行協商。我們可以提供SLA,把數據鏈發到某個Cluster上,我們需要保證在一定時間內,將它複製到指定的數據中心。如果是跨數據中心讀取的話,其實這並不是一個非常經濟的選擇,因為這包括對網絡帶寬的使用,更長的延時。

對於實時和離線,處理會有很大的區別。對於離線數據會永久性的來保存。但實時數據本身是個流數據,它更像是一個Buffer,是數據的一個緩存。我們對數據有一定的保存的時間的,比如說大部分的集群的話,我們只把數據保留三天,三天之後我們會把數據刪除。大部分下游的消費者,他們更關注的是實時數據的消費。也就是說絕大多數的Service來講,他們會在一天之內讀走數據。我們留“三天”,更多的是為我們下游的消費者進行容災。如果需要有更久的時間,比如說超過三天,我們會從Data Lake裡面,再把這個數據再給讀出來,再放到Kafka  Topic裡面,再次消費。所以從副本使用的角度來講,更關鍵的看Data Lake那一邊,他們會將數據保留多久。

在一個Topic四個副本的機制下,最少也得有兩個Cluster,所以最少也會有八副本存在。再加多數據中心,副本數量就會加倍。 Uber每天產生PB級的數據,因此副本也至少有PB級。這一點,我們自己也在跟開源社區合作,做Kafka的Remote Storage功能。傳統的Kafka都是在使用Host本身的硬盤和內存來存儲數據。從Cost角度來講,SSD的價格是非常高的。同時我們發現,絕大多數數據的消費者只需要去讀過去六個小時之內的數據。所以我們決定去修改本身的Broker代碼,它會把數據在設定的時間裡直接拷貝到指定的Storage。可以讓Kafka本身理解本地數據和Remote數據之間的關係,通過使用這樣一套機制,把本地的副本量給降下來。

InfoQ:Uber有沒有必須不能丟數據的場景?以及不能容忍亂序的場景?如果有,能具體描述場景和問題是怎麼解決的嗎?

富羽鵬第一類場景是我們服務的一些最核心的業務。這種場景下,數據的可靠性、系統可靠性非常重要,數據不能有任何丟失。為了滿足數據的Lossless,我們做了很多相關的定制化開發。

從宏觀的Pipeline角度來看,相當於說是一個Topic,有上游的生產者Producer發數據,同時下游有Consumer來消費數據。但是在系統角度上來看,其實這個還有很多的Stage。比如說它從Client Service,從Producer來講,它是在一個client這邊,是在客戶端這邊開始發數據,它發的第一點,並不是直接發到我們cluster broker上面,而是發到我們叫REST Proxy這樣一個REST Server上面。然後從REST Proxy的話,它會再發給這樣一個Broker。然後這個Broker Cluster,用一個數據拷貝的Pipeline,拷到其他的一個Cluster。因為這樣一個多區域的、多集群的架構。每個地域的生產者它都往本地集群發數據,但是對於消費者來講,他們有時候可能比較關注全局的數據量。所以,我們會有一個數據拷貝的Pipeline來拷到另外一個Cluster。

針對數據流的不同的Stage,我們要真正做到Lossless,就需要對每個Stage去考慮數據容災。比如說機器壞了、數據量丟失,是不是應該重置、Retry,或者找其他的地方做一個Buffer。基於每一點,我們都做了一些相應的定制化開發。另一個是要做數據的審查。就是怎麼知道我們任何的數據丟失。針對Pipeline的每一個Stage,我們都有拿一些數據,或者拿一些Matrix來進行比較。保證整個Pipeline下沒有任何的一個數據丟失。

第二類場景是日誌收集。這一類特點是數據量特別大。 Uber有成千上萬個Service,每天產生很多的日誌,我們需要把日誌通過Kafka給聚合起來。對於我們的Cluster就有非常高的時效性和高吞吐量支持的要求。對於這一場景我們做了提高吞吐量的優化。

第三類場景是將 Kafka 作為數據庫的更改數據捕獲(change data capture, CDC)。特點是數據庫對的transaction順序性要求比較高。我們針對數據的有序性、吞吐量的要求也做了一些特別的開發工作。

第四類場景是將 Kafka 作為流處理平台數據的來源。在Uber 我們有一個開源的流處理平台運行Flink 的job,它們從Kafka 讀取數據進行實時計算,計算完成後也可以將數據發回給Kafka,再傳回到下游;我們專門設置了一個集群,跟Flink做了深度的一些整合,做了一定的優化。

第五類場景也是一個非常有意思的定制化開發的,Kafka的協議之上做了Dead Letter Queue(DLQ),可以允許將個別不能暫時處理的信息放到另外一個Queue裡面,之後再重新處理這些已經失敗的Message。對於Kafka的Consumer API來講,如果有不能處理的Message通常就兩個選擇,要么直接丟棄消息,不處理,繼續往前走;要么就不斷的重試。

但是Uber有一些非常重要的Topic,Kafka本身的處理方法是不符合我們的要求的。比如說在處理一些跟Money相關的事情的時候,每個Message都是非常重要的。如果說Message丟失,客戶的賬就對不上了,這是不行的。但是系統又不能卡在這裡,阻止後面的流水進來。

這種情況下我們需要有一個額外緩存的功能,如果當前處理不了,那就先把它分到另外的地方。然後過了一段時間再重新進行處理。我們在Kafka這一層之上,做另外的一套Message Platform來封裝這些額外的功能。

InfoQ:您認為Kafka的演進,在Uber大概分為幾個階段,分別解決的關鍵問題是什麼

富羽鵬:Uber裡Kafka的演進,我認為主要分為三個階段。

第一階段,最早期的時候,是2013年到2015年之間,從架構角度來講,我們主要是在使用Kafka開源版本。主要是提高穩定性,因為那個時候Kafka本身也比較早期,主要做很多Bugfix、調優、多語言支持,讓Kafka更好的匹配到Uber的各個不同的Use Case,讓整個架構可以更加穩定的運行起來。那時Uber的流量增長也很快,我們要保障在流量起來後,也不至於擊垮集群。

另外還有一個有意思的工作是“數據拷貝”。我們對跨數據中心的拷貝做了非常多的優化。在開源的項目裡面,Kafka原生提出了一個MirrorMaker的項目,用於集群之間的拷貝。但Uber遇到了很多問題,於是開源了自己的一個拷貝項目uReplicator。主要原因是在消息規模非常大的時候,MirrorMaker有一些的性能、可靠性的問題,於是我們重新對整個架構進行了大改,最終開發了我們自己這樣的一個開源項目。

第二階段,從2015年到2018年之間。我們觀察到之前的第一階段,遇到了很多的一些可靠性、擴展性方面的問題,我們更需要去打造一個更加成熟的消息平台。這個時候對Uber的業務在爆炸性的增長,幾乎就是每六個月Uber的Business會翻一番,也就意味著我們的數據量每六個月也會翻一番。但是我們的團隊的人員並不能翻倍,於是需要考慮自動化方面的開發,來應對數據量和集群數量增長帶來的挑戰。另外這個時候也做了不少多租戶管理方面的定制化工作,比如用戶配額、用戶黑名單以及重要Topic的物理隔離。此外,對於穩定性和容災機制也做了很多,逐漸形成多地多活、備份集群的架構,另外前面所講的DLQ也是那個時候開發出來的。

第三階段,是從2018年到現在。這個階段我們希望建立一個標準化、智能化與自動化的消息處理平台。因為我們的應用場景越來越多,數據查詢、數據拷貝的需求越來越多,我們的系統越做越複雜。

我們整個生態系統,過去幾年上下游有一些用戶在開發自己的Client或者開源的Client和我們的Kafka平台進行交互。發展到一定階段後,我們發現在整個Uber內部,林林總總有相當數量的不同語言、不同版本的客戶端在跟我們的平台進行交互,讓我們管理起來非常的困難。同時這些用戶也會讓我們的系統開發演進帶來額外的挑戰。比如說當我們要加一個新的功能或者棄用某些功能時,我們要考慮是不是有某些用戶的客戶端不能升級,會不會給他們的Service帶來影響。當這個公司規模發展到一定階段的時候,標準化就成為了必須要做得一件事情。

其中一個很重要的標準化工作是做Consumer端的Proxy。這項工作目前只有Uber在做,其他的公司和開源還沒有開始。在開源與Uber的Kafka架構中,生產者這邊都有一個REST Proxy。但是在消費者這端,因為邏輯會復雜很多,還沒有Proxy這樣的概念,我們在做的工作就是填補這個空白。

這個Consumer Proxy不但可以簡化很多客戶端的API,同時也可以打破很多Kafka對於Consumer的一些限制。比如說Kafka有個限制是Consumer數量不能比Partition數量多。但是當我們做了Consumer Proxy以後,我們就可以把這個限制給去掉。於是一個八個Partition的Topic的message可以把數據發到成百上千個Client上面去。這樣能大大降低了一個Cluster中Partition的的總數,降低Partition數量帶來的物理資源開銷。此外DLQ這個功能我們也做到了Consumer Proxy的API裡面去,作為一個原生的功能。

還有一個方面標準化是,在做跨集群跨地域的元數據的整合與管理Cluster Topology。也就是說把一整套的Ecosystem、Service都給整合起來。讓所有的Service之間更有序,更加多連接起來。這樣可以解決我們之前的這樣一個有很多很多小Service但無序的問題,也同時可以快速的幫我們找到Topology中錯誤與缺失的問題,比如某個Topic的數據沒有被備份到某個Cluster上。

我們的想法是:整套Ecosystem最核心的中間部分是一個Kafka的Topic,這個Topic跟開源Kafka不一樣,它可以在多個Cluster之間存在,而這套系統理解它的Topology。這樣當用戶創建一個Topic的時候,可以根據他的需求自動在多個Cluster上面把這個Topic創建出來,同時在Cluster之間進行數據拷貝等。

有了Cluster Topology後,我們的系統還會做一些非常智能的事情,比如說當它發現這個Cluster有問題的時候,會自動的把Producer導到這個其他對應的Cluster等等。它可以大大降低管理人員的災難發現與恢復工作,帶來非常好的容災效果,我們將這個項目叫做Cluster Federation,是我們之前備份Cluster工作的延伸。這個項目的核心思想是認為一個Topic可以在多個Cluster之間存在。當一個Cluster出問題,數據可以自動遷移到另外一個Cluster。這個系統本身的可以很智能的把存在多個Cluster上的Topic發送給下游的消費者,也就是說Cluster之間互為備份。

最後,我們還需要追求自動化和可伸縮性,來更加的有效的使用機器資源。在這段時間裡,我們將所有的集群做“容器化”,這也是做自動化的一個先決條件。這給我們的運維帶來了極大的便利,完成機器的添加置換等。我們的運維人數並沒有增加,但是我們的機器的數量確實在成倍的增長,這都得益於“自動化”。

總體來說,我們最關注的是高可靠性。就當你數據量增長,或者說是數據規模增長很大的時候,怎麼能保證這個系統的可靠性和魯棒性。其次是可用性,當你這個數據量特別大的時候,如何有效、快速的來對整個平台進行管理,包括對數據本身的管理,包括上游,上下游客戶的管理。

我們也在很關註一些新興的開源軟件,比如說像Pulsar,這個開源軟件解決了一些Kafka最開始架構上的一些缺陷,比如說存儲計算分離。但它還需要被時間檢驗。但從這個角度來看,對Uber來說,通過我們在做的標準化工作,把兩端的Proxy都給建立好了之後,我們的流處理平台的交互的銜接點,已經不是Kafka的協議了。在這個時候,我們可以像換汽車引擎一樣,換成任何各種各樣其他潛在的開源的解決方案,而不是說被某一個開源的項目綁定。

嘉賓介紹

富羽鵬,Uber Staff Engineer。在 Uber 負責實時數據與分析平台的架構與運營,包括Kafka 及其周邊生態系統。在加入 Uber 之前,是大數據存儲平台 Alluxio 的創始成員與 PMC,再之前在 Palantir 從事大數據平台的研發與管理。本科與碩士畢業於清華大學,並在 University of California San Diego 進行了數據庫方向的博士研究。

QCon北京2020的分享中,富羽鵬老師將分享 Kafka 及其生態系統在 Uber 的架構演化以及探討在實踐中遇到的經驗與教訓,點擊了解詳情。