Categories
程式開發

ClickHouse最佳實戰之分佈表寫入流程分析


ClickHouse最佳實戰之分佈表寫入流程分析 1

雲妹導讀:

前不久,京東智聯雲正式上線了基於Clickhouse的分析型雲數據庫JCHDB,一經推出便受到廣大用戶的極大關注。有興趣的小伙伴可以回顧上一篇文章《比MySQL快839倍!揭開分析型數據庫JCHDB的神秘面紗》“。

ClickHouse像ElasticSearch一樣具有數據分片(shard)的概念,這也是分佈式存儲的特點之一,即通過並行讀寫提高效率。 ClickHouse依靠Distributed引擎實現了Distributed(分佈式)表機制,在所有分片(本地表)上建立視圖進行分佈式查詢,使用很方便。

ClickHouse最佳實戰之分佈表寫入流程分析 2

Distributed表引擎是一種特殊的表引擎,自身不會存儲任何數據,而是通過讀取或寫入其他遠端節點上的表進行數據處理的表引擎。該表引擎需要依賴各個節點的本地表來創建,本地表的存在是Distributed表創建的依賴條件,創建語句如下:

CREATE TABLE {teble} ON CLUSTER {cluster}
AS {local_table}
ENGINE= Distributed({cluster}, {database}, {local_table},{policy})

這裡的policy一般可以使用隨機(例如rand())或哈希(例如halfMD5hash(id))。

再來看下ClickHouse集群節點配置文件,相關參數如下:

1
true

1
example01-01-1
9000

example01-01-2
9000

2
true

example01-02-1
9000

example01-02-2
9000

ClickHouse最佳實戰之分佈表寫入流程分析 3

有了上面的基礎了解,就將進入主題了,本文主要是對Distributed表如何寫入及如何分發做一下分析,略過SQL的詞法解析、語法解析等步驟,從寫入流開始,其構造方法如下:

DistributedBlockOutputStream(const Context & context_, StorageDistributed &
storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool
insert_sync_, UInt64 insert_timeout_);

如果insert_sync_為true,表示是同步寫入,並配合insert_timeout_參數使用(insert_timeout_為零表示沒有超時時間);如果insert_sync_為false,表示寫入是異步。

1,同步寫入還是異步寫入

同步寫入是指數據直寫入實際的表中,而異步寫入是指數據首先被寫入本地文件系統,然後發送到遠端節點。

BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context &
context)
{
......

/// Force sync insertion if it is remote() table function
bool insert_sync = settings.insert_distributed_sync || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
/// DistributedBlockOutputStream will not own cluster, but will own
ConnectionPools of the cluster
return std::make_shared(
context, *this, createInsertToRemoteTableQuery(remote_database,
remote_table, getSampleBlockNonMaterialized()), cluster,
nsert_sync, timeout);
}

是否執行同步寫入是由insert_sync決定的,最終是由是否配置insert_distributed_sync(默認為false)和owned_cluster值的或關係決定的,一般在使用MergeTree之類的普通表引擎時,通常是異步寫入,但在使用表函數時(使用owned_cluster來判斷是否是表函數),通常會使用同步寫入。這也是在設計業務邏輯時需要注意的。

owned_cluster是什麼時候賦值的呢?

StoragePtr TableFunctionRemoteexecuteImpl(const ASTPtr & astfunction, const Context &
context, const stdstring & tablename) const
{
......
StoragePtr res = remotetablefunction_ptr
? StorageDistributed::createWithOwnCluster(
table_name,
structureremotetable,
remotetablefunction_ptr,
cluster,
context)
: StorageDistributed::createWithOwnCluster(
table_name,
structureremotetable,
remote_database,
remote_table,
cluster,
context);
......
}
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & tablename,
const ColumnsDescription & columns_,
ASTPtr & remotetablefunctionptr,
ClusterPtr & ownedcluster,
const Context & context_)
{
auto res = create(String{}, tablename, columns, ConstraintsDescription{},
remotetablefunctionptr, String{}, context, ASTPtr(), String(), false);
res->ownedcluster = ownedcluster_;
return res;
}

可以發現在創建remote表時會根據remote_table_function_ptr參數對最終的owned_cluster_賦值為true。

2,異步寫入是如何實現的

了解了什麼時候使用同步寫入什麼時候異步寫入後,再繼續分析正式的寫入過程,同步寫入一般場景中涉及較少,這裡主要對異步寫入邏輯進行分析。 outStream的write方法主邏輯如下:

DistributedBlockOutputStream::write()

if insert_sync
| |
true false
↓ ↓
writeSync() writeAsync()

其實這個write方法是重寫了virtual void IBlockOutputStream::write(const Block & block),所以節點在接收到流並調用流的write方法就會進入該邏輯中。並且根據insert_sync來決定走同步寫還是異步寫。

3,寫入本地節點還是遠端節點

主要還是對異步寫入進行分析,其實writeAsync()最終的實現方法是writeAsyncImpl(),大致邏輯圖如下:

writeAsyncImpl()

if shard_info.hasInternalReplication()
| |
true false
↓ ↓
writeToLocal() writeToLocal()
↓ ↓
writeToShard() for(every shard){writeToShard()}
↓ ↓
end end

其中getShardsInfo()方法就是獲取config.xml配置文件中獲取集群節點信息,hasInternalReplication()就對應著配置文件中的internal_replication參數,如果為true,就會進入最外層的if邏輯,否則就會進入else邏輯。

其中writeToLocal()方法是相同的,是指如果shard包含本地節點,優先選擇本地節點進行寫入;後半部分writeToShard()就是根據internal_replication參數的取值來決定是寫入其中一個遠端節點,還是所有遠端節點都寫一次。

4,數據如何寫入本地節點

當然一般情況Distributed表還是基於ReplicatedMergeTree系列表進行創建,而不是基於表函數的,所以大多數場景還是會先寫入本地再分發到遠端節點。那寫入Distributed表的數據是如何保證原子性落盤而不會在數據正在寫入的過程中就把不完整的數據發送給遠端其他節點呢?看下writeToShard()方法大致邏輯,如下:

writeToShard()

for(every dir_names){
|
└──if first iteration
| |
false true
↓ ↓
| ├──storage.requireDirectoryMonitor()
| ├──CompressedWriteBuffer
| ├──writeStringBinary()
| ├──stream.writePrefix()
| ├──stream.write(block)
| ├──stream.writeSuffix()
↘ ↙
link(tmp_file, file)
└──}

繼續具體再看下源碼的具體實現,如下:

void DistributedBlockOutputStream::writeToShard(const Block & block, const
std::vector & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
std::string first_file_tmp_path{};

auto first = true;

/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';

/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);

const auto & file_name = toString(storage.file_names_increment.get()) +
".bin";
const auto & block_file_path = path + file_name;

/** on first iteration write block to a temporary directory for
subsequent hardlinking to ensure
* the inode is not freed until we're done */
if (first)
{
first = false;

const auto & tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto & block_file_tmp_path = tmp_path + file_name;

first_file_tmp_path = block_file_tmp_path;

WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(),
block.cloneEmpty()};

writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
context.getSettingsRef().serialize(out);
writeStringBinary(query_string, out);

stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}

if (link(first_file_tmp_path.data(), block_file_path.data()))
throwFromErrnoWithPath("Could not link " + block_file_path + " to "
+ first_file_tmp_path, block_file_path,
ErrorCodes::CANNOT_LINK);
}
......
}

首先來了解下Distributed表在目錄中的存儲方式,默認位置都是/var/lib/clickhouse/data/{database}/{table}/在該目錄下會為每個shard生成不同的目錄,其中存放需要發送給該shard的數據文件,例如:

[[email protected] test]# tree
.
├── '[email protected]:9000,[email protected]:9000'
│ ├── 25.bin
│ └── tmp
│ └── 26.bin
└── '[email protected]:9000,[email protected]:9000'
└── tmp

可以發現每個shard對應的目錄名是{darabse}@{hostname}:{tcpPort}的格式,如果多個副本會用,分隔。並且每個shard目錄中還有個tmp目錄,這個目錄的設計在writeToShard()方法中做了解釋,是為了避免數據文件在沒寫完就被發送到遠端。

數據文件在本地寫入的過程中會先寫入tmp路徑中,寫完後通過硬鏈接link到shard目錄,保證只要在shard目錄中出現的數據文件都是完整寫入的數據文件。

數據文件的命名是通過全局遞增的數字加.bin命名,是為了在後續分發到遠端節點保持順序性。

5,數據如何分發到各個節點

細心的你可能已經發現在writeToShard()方法中有個requireDirectoryMonitor(),這個方法就是將shard目錄註冊監聽,並通過專用類StorageDistributedDirectoryMonitor來實現數據文件的分發,根據不同配置可以實現逐一分發或批量分發。並且包含對壞文件的容錯處理。

ClickHouse最佳實戰之分佈表寫入流程分析 4

分析到這,可能還有人會覺得云裡霧裡,覺得整個流程串不起來,其實這樣寫是為了先不影響Distributed表寫入的主流程,明白了這個再附加上sharding_key拆分和權重拆分就很好理解了。

ClickHouse最佳實戰之分佈表寫入流程分析 5

上面提到過writeAsync()的最終實現方法是writeAsyncImpl,這個說法是沒問題的,但是中間還有段關鍵邏輯,如下:

writeAsync()

if storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1
| |
true false
↓ ↓
writeAsyncImpl(block) writeSplitAsync(block)

splitBlock(block)

writeAsyncImpl(splitted_blocks,shard_idx)

getShardingKeyExpr()方法就是去獲取sharding_key生成的表達式指針,該表達式是在創建表時就生成的,如下:

sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,
getColumns().getAllPhysical(), false);

那sharding_key和sharding_key_expr是什麼關係呢?如下:

const ExpressionActionsPtr & getShardingKeyExpr() const { return
sharding_key_expr; }

所以說sharding_key_expr最終主要就是由sharding_key決定的。

一般情況下getShardingKeyExpr()方法都為true,如果再滿足shard數量大於1,就會對block進行拆分,由splitBlock()方法主要邏輯就是創建selector並使用selector進行切割,大致邏輯如下:

splitBlock()

createSelector(block)

for(every shard){column->scatter(num_shards, selector);}

對於如何創建selector以及selector中都做了什麼事兒,來具體看下源碼截取,如下:

IColumn::Selector DistributedBlockOutputStream::createSelector(const Block &
source_block)
{
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()-
>execute(current_block_with_sharding_key_expr);

const auto & key_column =
current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName
());
const auto & slot_to_shard = cluster->getSlotToShard();
......
throw Exception{"Sharding key expression does not evaluate to an integer
type", ErrorCodes::TYPE_MISMATCH};
}

ClickHouse最佳實戰之分佈表寫入流程分析 6

看splitBlock()方法,ClickHouse是利用createSelector()方法構造selector來進行後續的處理。在createSelector()方法中最重要的就是key_column和slot_to_shard。

key_column是通過sharding_key間接獲得的,是為了根據主鍵列進行切割;slot_to_shard是shard插槽,這裡就是為了處理權重,在後續向插槽中插入數據時就會結合config.xml中的weight進行按比例處理。

細節比較複雜這裡不做太細緻的分析,有興趣可以自行看下(如template IColumn::Selector createBlockSelector())。

到此,對於Distributed表的寫入流程的關鍵點就大致分析完了。篇幅有限有些細節沒有做過多說明,有興趣的可以自行再了解下。

ClickHouse最佳實戰之分佈表寫入流程分析 7

通過對Distributed表寫入流程的分析,了解了該類型表的實際工作原理,所以在實際應用中有幾個點還需要關註一下:

Distributed表在寫入時會在本地節點生成臨時數據,會產生寫放大,所以會對CPU及內存造成一些額外消耗,建議盡量少使用Distributed表進行寫操作;Distributed表寫的臨時block會把原始block根據sharding_key和weight進行再次拆分,會產生更多的block分發到遠端節點,也增加了merge的負擔;Distributed表如果是基於表函數創建的,一般是同步寫,需要注意。

了解原理才能更好的使用,遇到問題才能更好的優化。

點擊【閱讀原文“】即可前往京東智聯雲控制台開通試用JCHDB。