Categories
程式開發

SparkSQL在有贊大數據的實踐


一、前言

在 2019 年 1 月份的時候,我們發表過一篇博客 SparkSQL在有贊大數據的實踐,裡面講述我們在 Spark 裡所做的一些優化和任務遷移相關的內容。本文會接著上次的話題繼續講一下我們之後在 SparkSQL 上所做的一些改進,以及如何做到 SparkSQL 佔比提升到 91% 以上,最後也分享一些在 Spark 踩過的坑和經驗希望能幫助到大家。

本文主要的內容包括:

  • Thrift Server 穩定性建設
  • SparkSQL 第二次遷移
  • 一些踩坑和經驗

二、Thrift Server 穩定性建設

隨著完成了 Hive 到 SparkSQL 的第一次遷移,SparkSQL 任務佔比一下子達到了 60% 以上,而隨之而來的是各種問題(包括用戶的任務失敗和服務不可用等),服務的穩定性受到了比較大的挑戰。

首先講一下我們使用 SparkSQL 的部署方式,我們是以部署 Thrift Server 服務提供 JDBC 訪問方式,即 YARN client 的部署方式。離線計算的調度任務以 beeline 的方式使用 Thrift Server,同時其他平台應用以 JDBC 的連接接入服務,比如提供 Ad-hoc 查詢服務應用,數據質量檢驗服務應用等。

SparkSQL在有贊大數據的實踐 1

以 Thrift Server 方式的好處是由一個 Driver 來管理用於執行任務的 Executor 池,並發的執行不同的 SQL 任務可以讓 Executor 資源的利用率達到最高。而如果選擇$SPARK_HOME/bin/spark-sql 方式來執行SQL 的方式可以讓每一個SQL 任務獨享一個Driver ,不會受到其他SQL 任務影響(特別是一些SQL 可能導致Driver OOM ),對比Thrift Server 更加穩定;但是YARN 資源利用率不高,比如啟動YARN Application 的Overhead ,向YARN 申請和釋放Executor 的 Overhead,SQL 任務傾斜導致的 Executor 空閒浪費等。考慮到目前我們大部分 SQL 任務的執行時間很短,基本在 3 分鐘之內,單個 Task 執行時間很短,所以選擇 Thrift Server 相對合適一些。

基於上面所說的部署方式,我們開展了以下主要的穩定性建設工作:

  • AB 測試灰度發布功能
  • Spark Metric 指標的採集、分析以及處理
  • 基於 SQL 引擎選擇服務攔截不合適的 SQL 任務

2.1 AB 灰度測試

在 Hive 遷移到 SparkSQL 這段時間,yz-spark (基於社區的二次開發版本)的迭代和配置變更開始有些頻繁。我們希望有一套自定義的 AB 測試的解決方案來降低上線風險,特別對一些大的迭代版本和影響比較大的變更。

這套 AB 測試灰度發布不能簡單的只是基於任務流量,我們希望先灰度一些非核心低優先級的 SQL 任務並且在白天時間段(低峰)來驗證。有贊大數據離線調度任務是基於 Apache Airflow 為基礎構建,因此實現方式是通過擴展 Airflow 增加了一些路由配置來支持 SparkSQL 任務可以按 优先级时间段流量比例等配置的AB測試功能。

這套 AB 灰度測試方案在整個遷移過程還是發揮出比較大的作用。比如我們將 yz-spark 從社區版本 2.2.1 rebase 到 2.3.3 版本,又或者為節省內存資源給算法任務而將Executor 內存配置從 3G 調整到 2G 等比較重大的變更,都降低了一些坑產生的影響。

2.2 Spark Metric 採集

Spark 在 Job 運行過程中會記錄很多 Metric 信息包括 Job 、Stage 、Task 和Executor 等幾個方面。 Spark 提供了 REST API 來查詢 Metrics 數據,需要開啟 Spark UI 的配置,不過需要注意是 Spark Driver在內存裡保存最近運行 Job 信息,所以歷史數據無法查詢。另外 Spark 支持開啟 EventLog 將相關的 Spark Event(攜帶了相關 Metrics 數據)進行持久化(比如配置 HDFS )。 REST API 和 EventLog 功能的詳細說明可以查看官方資料 (https://spark.apache.org/docs/latest/monitoring.html )。

結合了 REST APIEventLog 功能,我們搭建一個 spark-monitor 應用。這個應用主要職責是近實時的讀取 EventLog 產生的 Spark 事件,通過事件回放並結合 REST API 最終形成我們需要的 Job 數據,並寫到 Hbase 表保存。基於採集到的數據主要有以下幾個方面的用途:

  • 實時預警和乾預。通過實時收集的 Job 數據,我們可以分析出哪些正在運行的 Job 是異常的,比如耗時非常久,產生很多 Task 等等場景,然後針對異常場景的 Action 可以是 Alarm 或者 Kill Job 等。
  • 離線分析。每天從 Hbase 離線的同步到hive表做一些離線分析,比如統計存在 Failed Task 的任務、Peak Execution Memory 使用比較高的任務,或者數據傾斜的任務等。找出已發生問題或者潛在問題的任務,去優化 SQL 任務或者分析原因並反哺去調校 Thrift Server 配置。
  • 歷史 Job 數據保存,用於排查歷史任務的日誌查找和分析( Spark UI 因為內存限制無法保留歷史任務的記錄)。

2.3 基於引擎選擇的 SQL 攔截

我們開發了一套 SQL 引擎選擇服務,他的主要職責是給 Ad-hoc 服務增加了 SQL 智能選擇的能力。有讚的大數據離線計算提供了 Presto/SparkSQL/Hive 三種引擎選擇,大數據經驗比較弱的用戶在執行 Ad-hoc 的 SQL 時往往不知道該怎麼選擇。而 SQL 引擎選擇通過 SQL 解析,語法檢查,規則匹配,各個引擎的資源負載情況等等因素最終給用戶選擇合適的引擎來執行計算。

SparkSQL 正是通過添加一些自定義規則來攔截對 Spark 引擎不合適的 SQL 任務,從而提高 Spark 服務的穩定性。比如 Not in SubqueryCrossJoin 等 SQL 場景。

三、SparkSQL 第二次遷移

第一次做的遷移主要完成了 P4 和 P5 低優先級的任務。在生產上經過一段時間的充分驗證後,並且在Spark Thrift Server 的穩定性得到極大的提升之後,然後就開始了第二次大規模的Hive 到Spark 的遷移,完成了P1 ~ P3的所有適合任務。截止目前執行引擎選擇的作業數中 SparkSQL 佔比已經提升到 91% 以上。

SparkSQL在有贊大數據的實踐 2

而之所以把核心任務也遷移到SparkSQL,這樣的做的好處有兩個:

  • 節約離線集群資源成本。通過 tpcds 的性能測試,同等資源情況下 SparkSQL 是 Hive 的 2~10 倍(不同的 query 性能表現不一樣)。在生產驗證下來大部分確實有差不多 2 倍的性能。
  • 計算資源更合理的分配。由於 Spark 自身實現任務調度和資源分配,可以通過它已有的功能針對不同優先級的任務配置不同的資源配額。比如原先使用 Hive 時每一個 SQL 任務的 map 或者 reduce 並發數默認都是一樣的,而使用 SparkSQL 時可以讓 资源的比例按优先级倾斜(即 scheduler pool 的功能)。

四、踩坑和經驗

在使用 Spark 過程中,我們不可避免的踩過一些坑,也積累了一些經驗。從這些經驗積累選擇了一部分想分享一下,希望對正在使用 Spark 的同學有所幫助。

4.1 spark.sql.autoBroadcastJoinThreshold

這個配置在大家使用 SparkSQL 的時候會比較熟悉,在 join 的場景判斷相關的表是否可以使用 BroadcastJoin ,默認閥值是 10 MB。目前閥值判斷的比較邏輯會參考幾個因素:文件的大小字段选择的裁剪。比如某張 Hive 表的數據大小為 13 MB , 表 schema 為 struct,而假設當前 SQL 只使用到 name 字段,那根據字段選擇情況並對文件大小進行裁剪估算所需總字節的公式為:20/(8+20)*13约等于9.3MB(各個字段類型有不同的估算字節,比如long 是 8 個字節 ,string 是 20 個字節等),從而滿足 BroadcastJoin 的條件。但是這裡有幾種情況需要額外考慮:1、表存儲格式帶來的差異,比如使用ZLIB 壓縮的ORC 格式跟TEXT 格式就在數據存儲上的文件大小可能會差很多,即使兩張表都是ORC格式,壓縮率的差異也是存在; 2、字段字節估算本身就有一定的誤差,比如string 字段認為是20 個字節,對於一些極端情況的string 大字段,那估算誤差就會比較大; 3 、讀取Hive 表的“raw” 數據到內存然後展開成Java 對象,內存的消耗也有一定放大係數。所以 10M 的閥值,最終實際上需要的內存可能達到 1G,這個我們也是在生產環境上碰到過。

4.2 spark.blacklist.enabled

Spark 針對 Task 失敗有重試機制,但是當一個 Task 在某一台 host上的 一個 Spark Executor 實例執行失敗,下一次重試調度因為考慮 Data Locality 還是會大概率的選擇那個 host 上的 Executor。如果失敗是因為 机器坏盘引起的,那重試還是會失敗,重試次數達到最大後那最終整個 Job 失敗。而開啟 blacklist 功能可以解決此類問題,將發生失敗的 Executor 實例或者 host 添加到黑名單,那麼重試可以選擇其他實例或者 host ,從而提高任務的 容错能力。

4.3 spark.scheduler.pool

當我們的調度離線計算 SQL 任務,大部分都使用 SparkSQL 帶來的問題是有些低優先級的任務可能會消耗很多 Executor 資源,從而讓高優先級的任務一直得不到充分的資源去完成任務。我們希望資源調度的策略是讓優先級高的任務優先得到資源,所以使用 FairScheduler 策略,並配置不同 资源权重的 Pool 給不同優先級的任務。

SparkSQL在有贊大數據的實踐 3

注:Spark 2.4 以下版本的這個功能存在 SPARK-26992 的問題,在不指定 pool 的情況下可能不會使用默認 pool 。

4.4 spark.sql.adaptive.enabled

adaptive 功能支持 shuffle 下游 stage 可以根據上游 stage 產生的 shuffle 數據量自動調節下游 stage 的 task 數,這個功能我們主要是為了解決 Hive 表數據表很多 小文件的問題(Map Only 的 SQL 場景不起作用)。 adaptive 功能在 Spark 1.6 版本就已經支持,但是我們目前 yz-spark 版本合入是Intel 實現的增強版本(該版本還實現了另兩個功能:动态调整执行计划动态处理数据倾斜),目前官方版本還沒有合入(https://github.com/Intel-bigdata/spark-adaptive

SparkSQL在有贊大數據的實踐 4

4.5 SPARK-24809

這是一個 correctness 的 bug, 在 broadcast join 的情況下可能會發生數據結果不正確的情況。當用於 broadcast 的 LongToUnsafeRowMap 對像如果被多次的序列化反序列化就會觸發,導致野指針的產生,從而可能產生不正確的結果。當時這個問題導致我們一張核心倉庫表產生錯誤數據。由於這個 bug 幾週才偶現一次,復現的困難導致花費了一個月時間才定位原因。這次教訓也讓我們意識到需要經常的去關注社區版本的迭代,及早發現特別是那些比較嚴重的 bug fix,避免生產上的故障。

4.6 SPARK-26604

這是 Spark External Shuffle 的一個內存洩漏 bug ,所以在開啟該功能的情況才會觸發。它在某些場景下會導致NodeManager 的ExternalShuffleService 開始內存洩漏,這個內存洩漏比較大的危害是導致一個HashMap 對像變的越來越大,最終導致shuffle fetch 請求越來越慢(每次fetch 請求需要對這個HashMap 的values 進行sum 統計,這個邏輯變慢),從而最終導致了我們生產環境的離線任務耗時時間在某天突然多了 30% 以上。

4.7 低效且危險的 Not in Subquery

舉個例子, select * from t1 where f1 not in (select f1 from t2),對於“ not in subquery ”的場景最終都會選擇BroadcastNestedLoopJoinExec 物理執行計劃,而BroadcastNestedLoopJoinExec 是一個非常低效的物理執行計劃,內部實現將subquery broadcast 成一個 list,然後 t1 每一條記錄通過 loop 遍歷 list 去匹配是否存在。由於它的低效可能會長時間佔用 executor 資源,同時 subquery 結果數據量比較大的情況下,broadcast 可能帶來 driver 的 OOM 風險。

SparkSQL在有贊大數據的實踐 5

四、結語

至今,有贊大數據離線計算從 Hive 切換到 SparkSQL 達到了一個階段性的里程碑。雖然SparkSQL 對比Hive 的穩定性有所不如,特別是內存管理上一些不完善導致各種內存所引發的問題,但是性能上非常明顯的優勢也值得作為一種新的選擇,我們也一直努力著希望將SparkSQL 調校成具有Hive 一樣的穩定性。

本文轉載自公眾號有贊coder(ID:youzan_coder)。

原文鏈接

https://mp.weixin.qq.com/s?__biz=MzAxOTY5MDMxNA==&mid=2455760259&idx=1&sn=b7dffdbece52442c1d05883ba842e0ee&chksm=8c686ba6bb1fe2b0129a16bdbc2fc9a74629d95d3c9e02027fb97e149f61581e219f755203cb&scene=27#wechat_redirect