Categories
程式開發

千億級數據量的Kafka深度實踐


本文由 dbaplus 社群授權轉載。

一、消息隊列選型

當時主要考慮以下幾個維度:社區活躍度,客戶端支持,吞吐量。對比幾個系統下來,覺得Kafka比較符合我們的要求。現在有一個新的開源系統pulsar,我覺得也可以嘗試一下。

千億級數據量的Kafka深度實踐 1

1、Kafka設計上的亮點

千億級數據量的Kafka深度實踐 2

Kafka性能和吞吐都很高,通過sendfile和pagecache來實現zero copy機制,順序讀寫的特性使得用普通磁盤就可以做到很大的吞吐,相對來說性價比比較高。

Kafka通過replica和isr機制來保證數據的高可用。

Kafka集群有兩個管理角色:controller主要是做集群的管理;coordinator主要做業務級別的管理。這兩種角色都由Kafka裡面的某個broker來擔任,這樣failover就很簡單,只需要選一個broker來替代即可,從這個角度來說Kafka有一個去中心化的設計思想在裡面, 但controller本身也是一個瓶頸,可以類比於hadoop的namenode。

CAP理論相信大家都有了解過,分佈式系統實現要么是CP,要么是AP。 Kafka實現比較靈活,不同業務可以根據自身業務特點來對topic級別做偏CP或偏AP的配置。

支持業務間獨立重複消費,並且可以做回放。

千億級數據量的Kafka深度實踐 3

這個是Kafka的簡要架構,主要分為生產端,broker端,還有消費端。日誌有三個層次:

  • 第一個層次topic;
  • 第二個層次partition(每個partition是一個並行度);
  • 第三個層次replica(replica表示partition的副本數)。

二、Kafka在360商業化的現狀

千億級數據量的Kafka深度實踐 4

目前集群有千億級數據量,集群有100多台萬兆機器,單topic的最大峰值60萬QPS,集群的峰值大概在500萬QPS。

千億級數據量的Kafka深度實踐 5

我們的物理機配置24Core/10G網卡/128G內存/4T*12 HDD,值得說一下的是我們採用了萬兆網卡加普通磁盤4T*12的配置,測下來磁盤吞吐和網絡吞吐是能夠匹配上的, 再者考慮到我們的數據量比較大,SSD盤沒有特別大的且成本比較高。

磁盤的組織結構我們用的是JBOD,RAID10也是很好的方案(磁盤成本會翻倍)。我們目前的Kafka版本是1.1.1,推薦大家部署0.11以上的版本會好一些,這個版本對協議做了很多優化,對於後續的2.x版本都是兼容的。

千億級數據量的Kafka深度實踐 6

這個是我們Kafka上下游相關的組件,生產端主要是各種Kafka clients/實時服務/flume/logstash。

消費端分為實時,離線(ETL),監控三部分。實時有spark/flink/storm等主流框架, 離線部分我們基於flink自研了一個統一落地框架hamal,從Kafka消費一遍數據就可以落地到多個下游系統(hdfs、hbase、redis等),可以避免重複消費。還有部分是監控的需求,我們把ES/influxdb相關的日誌打到Kafka,然後再消費出來通過grafana展示,但目前我們已經切到prometheus上了。

三、Kafka client框架

為什麼要做這個框架呢?之前有很多的業務部門用裸API自己去實現Kafka client的邏輯,但是會有很多問題,有一些異常情況會catch不全,我們做這個框架是想把所有的細節屏蔽掉,然後暴露出足夠簡單的接口,這樣可以減少業務犯錯的可能性,我們要確保極端的情況下比如網絡或集群異常時的可用性,如果網絡或集群不可用,數據會先落到本地,等恢復的時候再從本地磁盤恢復到Kafka中。

千億級數據量的Kafka深度實踐 7

我們實現了兩個框架:LogProducer和LogConsumer。 LogProducer支持at least once;LogConsumer支持at least once和exactly once兩種語意,其中exactly once需要業務去實現rollback接口。

千億級數據量的Kafka深度實踐 8

LogProducer框架的大體思路是通過內存隊列將日誌發送到Kafka,當Kafka或網絡不可用的情況下會寫本地磁盤,同時會有一個線程去實時檢測Kafka或者網絡的可用情況,如果恢復就會加載磁盤日誌並發送到Kafka。我們還支持一種共享內存的策略來代替內存,使用共享內存是為了減少重啟過程中日誌的丟失數。

千億級數據量的Kafka深度實踐 9

LogConsumer的框架實現,通過blocking queue將consumer線程和worker線程解耦,因為現實情況是消費邏輯很簡單,但是處理邏輯會很複雜。這樣就可以對consumer線程和worker線程做不同的配置,同時通過blocking queue還可以實現反壓機制。比如worker處理不過來了,這時候blocking queue就會滿,反壓到consumer線程會停止消費。

同時我們在worker線程接口裡面會提供接口讓用戶提交到global offsetmap, 如上圖我們提供三個組合接口,如果在業務處理與commit中實現了業務端rollback邏輯, 那麼就是exactly once語義,默認是at least once語義。

四、數據高可用

之前講過Kafka本身提供replica+isr的機制來保證數據高可用,但我們覺得這個可能還不夠,所以我們還要支持rack aware。比如replica=3的情況,確保三個副本在不同的物理rack上,這樣我們最多能容忍兩個物理機架同時出問題而數據仍可用,我們rack aware方案是與負載均衡方案一起做掉的,具體後面會講。

千億級數據量的Kafka深度實踐 10

值得注意的是Kafka官方也支持rack aware,通過在broker端配置broker.rack參數可實現,但有一個限制,必須為每個rack分配數量相同的brokers,否則會導致replica分配傾斜,實際情況是IDC的rack是很多的,分配到的物理機分佈也可能很隨機,一個可以參考的解決思路是採用虛擬rack group的概念,比如維護3個虛擬rack group,申請到的物理機加入到這3個group中,並確保rack group間分配的物理機數量一致,當然rack group間物理機不應存在有相同物理rack的情況。

五、負載均衡

Kafka的負載均衡功能在confluent商業版本才支持,負載均衡本質上來說是replica分配均勻問題,我們一開始想通過經典一致性hash來解決如下圖:

千億級數據量的Kafka深度實踐 11

然後我們發現經典一次性hash不能滿足我們的需求,比如要加一個節點node5,只能分擔節點node2的部分負載,不能做全局節點的負載均衡

千億級數據量的Kafka深度實踐 12

於是我們基於虛擬節點的一次性hash的算法實現了一個方案,如圖所示:相同的顏色對應同一個物理機,hash環上的都是虛擬節點。這裡有四個物理節點,其中node4是我們新加的節點。通過虛擬節點可以把物理節點的負載足夠均衡地分散出去,所以當我把node4加到hash環上的時候,分擔了所有物理機的負載。

算法實現的步驟分為兩個大的步驟:

  1. 新建hash circle:通過vnode_str(比如hostname-v0)做一個MD5的hash,得到虛擬節點的vnode_key,再用ring字典來保存虛擬節點到物理節點的映射,同時將vnode_key加入到sorted_keys的list中。
  2. 在hash環中分配replica: 將(topic_name + partition_num + replica_num)作為key用相同的MD5 hash算法得到replica_key, 接著二分查找該replica_key在sorted_keys中的position, 最後用ring字典來映射到物理機node, 至此replica分配完成。

千億級數據量的Kafka深度實踐 13

我們基於這個算法解決三個問題:

1)添加物理節點只需遷移很小一部分數據;

2)對不同配置的物理機做權重設置,可以支持異構集群的部署;

3)實現replica的rack aware,物理節點上面會有rack信息,在為replica分配物理節點的時候會記錄已經分配的rack信息,如果有重複的情況,就會把vnode_key找到position的位置+1找下一個物理節點,我們會確保三個replica的物理rack一定是不一樣的(假如replica=3)。

Leader balance

這是一種快速且成本低的負載balance方法,因為Kafka只有leader提供讀寫,所以通過leader切換是可以達到負載切換的效果的,由於只是leader切換不涉及數據同步,因此這個代價是比較小的。

disk rebalance

這個feature需要Kafka1.1.0版本之後才支持,Kafka提供了一些腳本和API可以做balance操作, 其本質也是生成replica plan然後做reassign。

六、鑑權、授權和ACL方案

如果是新集群比較推薦基於SASL的SCRAM方案,實施起來比較簡單。如果老集群想中途施行鑑權授權機制會比較困難,需要推各個業務去修改配置,同時切換的過程也很容易出問題。

下面介紹下我們實現的一個白名單機制來解決老集群的問題,首先將老業務加入到白名單中,讓新業務通過工單流程來申請topics和consumers兩種資源權限並加到白名單裡,定期監測非法(沒有走工單)topics,consumers資源,同時將這些資源都deny掉,這樣就收緊了topics和consumer讀寫權限的口子,同時原有業務不會有任何影響。

千億級數據量的Kafka深度實踐 14

七、Quota機制

千億級數據量的Kafka深度實踐 15

Quota主要是為了解決多個業務間資源搶占問題。 Quota類型有兩種:一種是限製網絡帶寬,一種是限制請求速率(限制CPU)。我們對業務做了三個優先級設置:高,中,低優先級,高優先級不做限制,中優先級可容忍lag,低優先級極端情況可停掉,通過工具可以批量限制某個優先級的所有業務,可以確保高優先級業務及集群的安全。

八、跨IDC的數據同步

千億級數據量的Kafka深度實踐 16

首先我們為什麼要做跨IDC的數據同步?沒做這個同步之前業務可能對數據的讀寫沒有一個IDC的概念,所以很容易就會有跨IDC的讀寫,多個業務還可能有重複consume和produce,這就造成跨IDC網絡的極大浪費, 加上跨IDC的網絡並不穩定,有時候會有一些異常,業務也不一定能很好處理。

千億級數據量的Kafka深度實踐 17

為了解決以上問題,我們統一做了跨IDC的數據同步服務,首先我們約定業務只能做本IDC的讀寫,不允許做跨IDC的讀寫,如果有跨IDC的數據需求,要向我們申請,通過mirrormaker去同步一份過來。這樣做有兩個好處:一是屏蔽了異常對業務的影響,二是節省了IDC之間的帶寬(我們通過同步機制能保證這份數據只傳輸一份),我們還基於marathon/mesos對這個服務做了pass化,提高了服務的SLA。

千億級數據量的Kafka深度實踐 18

九、監控告警

千億級數據量的Kafka深度實踐 19

千億級數據量的Kafka深度實踐 20

  • 基於jmx exporter+promehteus+grafana來做圖表展示,在每個broker上面部署jmx exporter, prometheus會去pull這些數據,最後通過grafana來展示。
  • 基於Kafka manager做瞬態指標的監控。
  • 基於burrow做consumer lag的監控。
  • 基於wonder來做告警,這個是360內部實現的一個組件,類似zabbix。

千億級數據量的Kafka深度實踐 21

十、線上問題及解決方案

千億級數據量的Kafka深度實踐 22

磁盤故障:

我們通過smartctl來監測,首先狀態是要passed的,其次我們會判斷197 Current_Pending_Sector這個屬性值不能大於100, 如果大於100這個磁盤可能有讀寫性能問題。

bootstrap.servers性能瓶頸:

該參數可以配置多台broker,這些broker作為proxy的角色為Kafka clients提供lookup服務,如果集群規模很大,clients很多的情況下,這些proxy角色的broker的負載會很大,為了解決這個問題,我們對bootstrap.servers參數做了vip配置。每個VIP可以綁定任意多的brokers,這樣在客戶端不需要修改配置的情況下可以對proxy動態擴縮容。

consumer重啟不消費:

業務反饋消費停止,重啟也不能夠解決問題,後來定位發現是早於0.11之前版本的bug, https://issues.apache.org/jira/browse/KAFKA-5413

原因是log cleaner線程掛了導致compact停止,__consumer_offsets這個topic的量非常大,broker reload時間特別長,這段時間是停止服務的。

解決方法有兩個:一是升級到Kafka 0.11+版本,二是將offset遷移到新的consumer group來解決(規避掉有問題的coordinator)。

Q&A

Q1:hamal落地系統是消費一次落地到多個組件還是消費了多次?

A:消費一次落地到多個下游組件。

Q2:在LogProducer實現中將數據存在共享內存裡,這樣不會丟數據,我想詳細聽一下。

A:不是說不丟數據,而是盡可能少丟數據,當選用共享內存策略,業務進程掛掉不會影響共享內存中的數據,重啟的時候直接從共享內存恢復。

Q3:這邊是通過白名單機製做了一個權限控制嗎?

A:通過白名單機制我們對topic, consumer資源做了粗粒度的控制,這樣可以在不影響老業務的情況下收緊口子。如果是新集群從頭搭建的話推薦用SASL的SCRAM方案。

Q4:你剛才說的quota優先級別,具體實現是怎麼做的?

A:在業務接入過程當中我們會給業務定級,比如這個業務是計費的,那麼就是高優先級,如果只是一些track日誌那麼就是低優先級, 在設置quota的時候我們會根據業務當前峰值再加上一定比例buffer來設置業務的quota值。

Q5:如果集群有100個節點,客戶端要配100個地址嗎?

A:不需要,只需配置bootstrap.servers(proxy),proxy可以拿到所有broker的信息,它的主要工作是lookup,接收client請求返回broker地址列表,然後client再直連broker。

Q6:如果出現磁盤掛載不上這種情況下,broker節點可以正常拉起來嗎?能恢復到從前那個狀態嗎?

A:broker可以將這塊磁盤對應的目錄exclude掉,然後重啟就可以了,對於replica=1的topic數據會有丟失,對於replica>1的topic數據不會有丟失,因為我們做了rack aware,那麼其他rack上會有副本。

作者介紹

嚴鎖鵬,奇虎360大數據架構運維專家,具有10年基礎架構與大數據開發經驗。 2013年加入360商業化團隊,負責消息中間件開發與運維,同時涉及大數據架構、微服務架構、實時計算平台、機器學習平台、監控系統等基礎設施建設,致力於為商業化團隊提供穩定高效的基礎服務。

原文鏈接

https://mp.weixin.qq.com/s/5p1IgayVXvCSLLc0Zvoqew