Categories
程式開發

Presto源碼分析:Coordinator篇


Presto是一個開源的分佈式SQL查詢引擎,適用於對GB到PB量級的數據源進行交互式查詢。 Presto的服務器可以分為兩種類型:coordinator和worker。 Coordinator負責解析查詢語句,制定執行計劃,和管理worker節點。而worker負責具體任務的執行和數據處理。

本文首先簡要介紹了Presto的服務器的啟動流程,然後以一個查詢請求的處理過程為線索,分析了coordinator對查詢的處理流程。整個分析以323版本的PrestoSQL代碼為基礎.

1. 服務端啟動

1.1 相關類

Presto的服務器端啟動流程涉及到的類主要有3個:

  • PrestoServer:服務端入口
  • ServerMainModule:服務端的主要依賴注入配置
  • CoordinatorModule:Coordinator的依賴注入配置

1.2 流程

PrestoServer作為服務端的運行入口,在其run函數中,進行一系列的依賴注入配置,其中最主要的配置在ServerMainModule中。 ServerMainModule針對服務端是否被配置為Coordinator/Worker,分別使用CoordinatorModule和WorkerModule進行配置。

2. Coordinator

Coordinator負責接受用戶提交的查詢請求,對這些請求進行分析,並調度到不同的worker節點上執行。

2.1 用戶執行查詢相關Restful接口

用戶執行查詢相關的Restful接口,在類QueuedStatementResource和ExecutingStatementResource中實現。

從用戶的角度看來,一個數據庫查詢在結束前主要經歷兩個狀態,Queued(已提交,排隊中)和Executing(執行中)。對於處於不同狀態的查詢,Presto把相關的Restful接口放在兩個類中:

QueuedStatementResource類負責處理處於Queued狀態的查詢,主要接口有:

  • postStatement(POST /v1/statement):提交一個新的查詢,若成功,會返回新查詢的id,nextUri,和新查詢的狀態等。其中,nextUri可以直接用於下面的getStatus和cancelQuery接口;

  • getStatus(GET /v1/statement/queued/{queryId}/{slug}/{token}):獲取查詢當前的狀態,返回的響應中包含一個nextUri。根據查詢的當前狀態,這個nextUri的路徑以/v1/statement/queued或者/v1/statement/executing為前綴。當以/v1/statement/queued為前綴時,可以用於重複調用getStatus接口或cancelQuery接口。當以/v1/statement/executing為前綴時,可以用於調用ExecutingStatementResource的相關接口;

  • cancelQuery(DELETE /v1/statement/queued/{queryId}/{slug}/{token}):取消已經提交的查詢;

ExecutingStatementResource類負責處理處於Executing狀態的查詢,主要接口有:

  • getQueryResults(GET /v1/statement/executing/{queryId}/{slug}/{token}):獲取查詢結果,響應中,主要的元素有nextUri(用於進一步調用getQueryResults接口或cancelQuery接口),partialCancelUri(用於取消已經部分執行的查詢,即下面的cancelPartial接口),columns(結果的列信息),data(查詢結果數據);
  • cancelQuery(DELETE /v1/statement/executing/{queryId}/{slug}/{token}):用於取消已經處於Executing的查詢;
  • partialCancel(DELETE /v1/statement/partialCancel/{queryId}/{stage}/{slug}/{token}):用於取消已經部分執行的查詢;

2.2 查詢的總體執行流程

基於在2.1節中介紹的Restful接口,一個查詢的總體執行流程如圖 1所示,分為以下幾個步驟:
Presto源碼分析:Coordinator篇 1

圖 1 查詢的總體執行流程

  1. 用戶通過postStatement接口提交一個查詢請求,QueuedStatementResource接受到該請求後,創建一個Query對象維護該請求的狀態,Query對象的創建過程中,會請求DispatchManager分配一個全局唯一的queryId;

  2. Query創建後,QueuedStatementResource立即請求它的getQueryResults方法,此時由於DispatchManager僅僅只是為該查詢分配了ID,而尚未進行分發,因此會立即返回一個nextUri(queuedUri),這個nextUri指向的其實就是QueuedStatementResource的getStatus接口;

  3. 用戶通過上一步的nextUri調用getStatus接口之後,實際上調用的是Query對象的waitForDispatched接口,該接口則請求DispatchManager新建一個查詢,並等待查詢被分發,一旦分發完成,則返回一個nextUri(executingUri),這個nextUri指向的其實就是ExecutingStatementResource的getQueryResults接口;

  4. 一旦getQueryResults接口被調用,ExecutingStatementResource將創建一個protocol.Query對像以維護查詢狀態,並異步調用該對象的waitForResults以獲取查詢結果,當查詢結果未就緒或者未全部返回,getQueryResults仍然會返回一個nextUri(executingUri) ,用戶可以通過這個nextUri循環獲取所有結果數據。

2.3 分發查詢

從2.2節我們可以看到,一個查詢最後是在DispatchManager的主導下進行分發。展示了DispatchManager進行分發的具體步驟。

Presto源碼分析:Coordinator篇 2

圖 2 查詢的分發過程

從圖 2可以看到,查詢的分發過程如下:

  1. 首先,DispatchManager請求QuerySessionSupplier,為查詢創建一個session;

  2. 請求QueryPreparer,進而請求SqlParser,對查詢語句進行語法分析,得到一個已解析的查詢語句preparedQuery;

  3. 請求InternalResourceGroupManager,為查詢分配一個資源組InternalResourceGroup,用於執行查詢;

  4. 進行必要的事務相關處理;

  5. 當上述步驟完成之後,DispatchManager將前面幾步產生的session,preparedQuery,resourceGroup(實際上是ResrouceGroup的ID)等,通通放入一個LocalDispatchQuery對像中;

  6. 將LocalDispatchQuery對象提交給InternalResourceGroupManager執行;

需要說明的是,目前Presto只有一個InternalResourceGroup,其ID為GLOBAL,所以所有查詢都會使用該資源組。

2.4 查詢的執行

從2.3節可以看到,封裝了查詢的session,preparedQuery,resourceGroup的LocalDispatchQuery,最後是被submit到了InternalResourceGroupManager。接下來,可以算是查詢真正開始被執行了。圖 3展示了這個過程。

Presto源碼分析:Coordinator篇 3

圖 3 查詢的執行

圖 3的查詢執行過程大致可以分為3個階段:

第一階段:LocalDispatchQuery創建時,其實也同時創建了另外兩個對象,一個是查詢的狀態機QueryStateMachine,另一個則是SqlQueryExecution。從名字很容易猜到,QueryStateMachine記錄當前查詢的狀態。而SqlQueryExecution則是封裝了與Sql執行相關的對象,包括SqlQueryExecution創建時被同時創建的Analyzer,以及下面第二階段創建的LogicalPlanner,DistributedExecutionPlanner和SqlQueryScheduler。

第二階段:LocalDispatchQuery被提交到InternalResourceGroupManager之後,後者實際上是調用InternalResourceGroup的run方法,讓LocalDispatchQuery在資源組上面執行。這個階段,會通過ClusterSizeMonitor等待足夠數量的worker,然後依次創建LogicalPlanner,DistributedExecutionPlanner和SqlQueryScheduler。依靠這3個類,以及PlanFragmenter,Presto生成查詢的執行計劃,並根據計劃將查詢分發到不同的Worker節點上執行。

第三階段:SqlQueryScheduler在查詢完成之後,通知QueryStateMachine,並進一步通知InternalResourceGroup,查詢已經完成。

從上面的描述,可以看到,整個過程的關鍵是第二階段。第二階段的LogicalPlanner,PlanFragmenter,DistributedExecutionPlanner和SqlQueryScheduler這幾個類,完成了查詢計劃的生成,到查詢的執行這一系列過程,下面是這幾個類的簡要介紹:

  • LogicalPlanner:負責生成邏輯執行計劃;

  • PlanFragmenter:將LogicalPlanner生成的邏輯執行計劃,拆分為多個子計劃;

  • DistributedExecutionPlanner:將PlanFragmenter拆分好的子計劃,進一步拆分成可以分配到不同Worker節點上運行的Stage;

  • SqlQueryScheduler:將Stage調度到不同的Worker節點上運行;

2.5 查詢執行過程的進一步分析

2.1到2.4節主要是以用戶向Presto提交查詢語句作為一個分析的入口,分析了查詢如何被Presto處理的大體流程。從前面的分析,我們可以知道,一個SQL查詢,首先被2.3節的SqlParser解析器處理,接下來依次經過LogicalPlanner,PlanFragmenter,DistributedExecutionPlanner,生成一個分佈式的執行計劃,最後被SqlQueryScheduler調度到不同的節點上執行。本節將依照這個處理流程,分析其每個階段的輸入和輸出。

為了分析這個流程,本節將結合一個具體的例子,假定我們的Presto連接到了一個MySQL數據庫(catalog的名字為test),MySQL數據庫上面有一個Database,其名字為db,db中有一個名為tab的表。展示了db和tab的創建腳本。

Presto源碼分析:Coordinator篇 4

圖 4測試數據庫的初始化腳本

然後我們向Presto提交一個查詢:

select col1 from test.db.tab 

2.5.1 SQL的解析

Presto使用了ANTLR4(一款開源的語法分析器生成工具)生成了Presto的SQL解析器SqlBaseParser(相關的語法定義在SqlBase.g4文件裡面)。 2.3節中提到的SqlParser,首先調用了SqlBaseParser對我們提交的查詢語句進行解析,生成ANTLR4形式的AST(抽象語法樹),然後再使用AstVisitor類,從樹根開始,遍歷這個AST,生成一個Presto用自己的類表達的語法樹。圖 5是從SqlBase.g4文件中,抽出的跟我們的示例查詢相關的語法定義。而圖 6是我們例子中的查詢語句生成的對應的語法樹。

從圖 5和圖 6可以看到,圖 6這棵語法樹跟圖 5的語法定義,基本上是對應的。這棵樹的根節點是一個Query類的對象,對應的是語法定義中的query定義。它有一個成員body,指向一個QuerySpecification對象,對應語法定義中的querySpecification。而QuerySpecification有一個select成員,指向一個Select類的對象,Select類中有selectItems成員,對應語法定義中querySpecification裡面可能出現的多個selectItem,以此類推。

Presto源碼分析:Coordinator篇 5

圖 5示例查詢相關的ANTLR4語法定義

Presto源碼分析:Coordinator篇 6

圖 6示例查詢對應的語法樹

2.5.2 邏輯執行計劃的生成

一旦圖 6的語法樹已經生成,LogicalPlanner將會據此生成邏輯執行計劃。這個階段分為兩步執行,首先,LogicalPlanner對圖 6的語法樹進行從根節點開始的遞歸遍歷,生成一個未經優化的邏輯計劃,如圖 7所示。圖7中的表遍歷節點TableScanNode是在遍歷到圖6的Table節點時生成的,4個映射節點ProjectNode,靠近TableScanNode的兩個是在遍歷QuerySpecification節點時生成的,另外兩個是在遍歷Query節點時生成的。最後的OutputNode,是遍歷完語法樹之後,再生成的輸出節點。

Presto源碼分析:Coordinator篇 7

圖 7 示例查詢對應的邏輯執行計劃

可以看到,未經優化的邏輯計劃,其實包含非常多冗餘的ProjectNode,這時候,LogicalPlanner會進行第二步:對計劃進行一系列的優化。在LogicalPlanner類中,有一個planOptimizers列表,其中的每一個元素是一個優化器接口PlanOptimizer的實現。每個PlanOptimizer的實現都帶有一個重寫器Rewriter,用於對邏輯計劃進行遞歸遍歷,重寫出新的,優化後的邏輯計劃。 LogicalPlanner循環地對上一步生成的邏輯計劃應用planOptimizers列表的每一個優化器,最終得到圖 8所示的優化過的執行計劃。

Presto源碼分析:Coordinator篇 8

圖 8 示例查詢對應的優化後的邏輯執行計劃

對於我們的示例查詢在第一步生成的邏輯執行計劃,真正生效的優化器只有兩個:一個是IterativeOptimizer,另外一個是AddExchanges。 IterativeOptimizer將邏輯計劃中冗餘的ProjectNode全部去掉了,這是IterativeOptimizer對RemoveRedundantIdentityProjections規則的應用。而AddExchanges優化器在OutputNode和TableScanNode之間,加上了一個ExchangeNode,用於在不同節點之間交換查詢數據。

去掉冗餘的ProjectNode的好處是顯而易見的:去掉多餘的ProjectNode可以提高查詢的執行效率。而之所以需要增加ExchangeNode,是因為我們的最終輸出OutputNode需要在Coordinator上執行,而TableScanNode則一般需要調度到Worker上執行,所以兩者之間,需要加上一個ExchangeNode以交換數據。

2.5.3 執行計劃的拆分

Presto接下來會通過PlanFragmenter對優化後的邏輯執行計劃進行拆分,分為若干個子計劃SubPlan。並不例外,這也是對優化後的邏輯執行計劃進行自頂向下的再一次遞歸遍歷完成的。

Presto源碼分析:Coordinator篇 9

圖 9 拆分後的子計劃

圖 9是拆分好的子計劃,可以看到,圖 8中的邏輯執行計劃被拆分為兩個子計劃。對於圖 8的邏輯計劃,Presto的拆分的邏輯是,將ExchangeNode轉換為RemoteSourceNode,然後為ExchangeNode的sources中的每個元素,新建一個子計劃SubPlan。在2.5.4節中我們將會看到,這麼拆分可以使DistributedExecutionPlanner將ExchangeNode的sources對應的每一個SubPlan轉換為一個Stage,然後分發到不同的Worker上執行。

2.5.4 分佈式執行計劃的生成

接下來,DistributedExecutionPlanner將上一小節拆分好的子計劃,轉換為分佈式執行計劃。 DistributedExecutionPlanner的轉換邏輯是:將每一個SubPlan轉換為一個StageExecutionPlan。所以從圖 10可以看到,分佈式執行計劃與圖 9的拆分後的子計劃是非常相似的。區別在於,對於那些fragment裡面存在TableScanNode的StageExecutionPlan,它會額外維護一個splitSources。 SplitSource定義了一個表如何被劃分成若干個Batch,以便於後續並行處理。

Presto源碼分析:Coordinator篇 10

圖 10 分佈式執行計劃

2.5.5 執行計劃的調度

接下來進入的是執行計劃的實際調度階段,流程如圖 11所示。
Presto源碼分析:Coordinator篇 11

圖 11 執行計劃的調度

SqlQueryScheduler在創建的時候,會為圖10的分佈式執行計劃中的每一個StateExecutionPlan創建一個對應的SqlStageExecution對象和對應的StageScheduler(為了保持簡潔,圖11僅展示了一個SqlStageExecution和一個StageScheduler,但實際上,對應我們的示例查詢,SqlStageExecution和StageScheduler應該各有兩個,分別對應圖10的兩個StateExecutionPlan。並且SqlQueryScheduler創建的是StageScheduler子類的實例,分別是FixedCountScheduler和SourcePartitionedScheduler)。

此後,SqlQueryScheduler通過AllAtOnceExecutionPolicy,創建AllAtOnceExecutionSchedule。 AllAtOnceExecutionSchedule在SqlQueryScheduler調用其getStagesToSchedule時,會一次性返回全部未調度的SqlStageExecution的集合。 SqlQueryScheduler接下來會遍歷這個集合,並調用集合中每個SqlStageExecution對應的StageScheduler的schedule方法,這個方法最終會調用到SqlStageExecution的scheduleTask。 scheduleTask將會創建HttpRemoteTask,並通過HttpRemoteTask,以Restful的方式,將Stage發送到worker節點。此後的執行,將會在worker上處理。

3. 總結

在Presto中,coordinator負責接受查詢請求,解析請求,生成執行計劃並將計劃拆分和調度到worker上執行。本文結合代碼,分析了這個流程。