Categories
程式開發

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐


TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 1

「導語」在訓練TensorFlow 模型時,為了達到最佳的訓練性能,需要一個高效的數據輸入流程,該流程可以在當前訓練步驟完成之前為下一步訓練準備好數據。 tf.data API 可以幫助我們構建這個靈活且高效的輸入流程,它包含有一系列的數據轉換操作,可以很輕鬆地對輸入數據進行各種並行化處理,本文將會對這些轉換操作進行詳細地介紹。

未優化的方法

一個訓練流程通常包括以下幾個步驟:

打開數據輸入文件的句柄。

從文件中取出一定數量(batch) 的數據。

使用取出的數據進行模型訓練。

重複步驟2-3 直至訓練完成。

在未對數據輸入流程進行優化時,訓練流程中各部分的時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 2

可以看到在這種同步的實現方式下,當從文件獲取數據時,模型訓練是處於空閒狀態的,而當對模型進行訓練時,數據輸入流程又處於空閒狀態。訓練的總體時間為各個部分所消耗的時間累加之和,這嚴重影響了訓練的效率,因此我們需要對數據輸入流程進行優化。

數據預取

數據預取操作是指模型在執行第s 步訓練時,輸入流程同時從文件中讀取第s+1 步所需訓練數據的並行化處理過程。與未優化的方法相比,數據預取可以將訓練步驟2-3 的累加時間開銷減少到二者之間的最大值。

tf.data API 提供了prefetch 轉換來完成數據預取操作,它可以將數據生成的時間與數據消耗的時間解耦。該轉換操作使用後台線程和一個內部的緩衝區在數據請求到來前從輸入數據集中預取元素,但不保證一定預取完成。

預取的數據元素數量應該等於(或大於)單個訓練步驟所需的batch 大小。該參數是可調的,既可以手動指定,也可以將其設置為tf.data.experimental.AUTOTUNE ,這時預取元素的數量將由tf.data 運行時動態地調整。

使用prefetch 轉換後的訓練流程其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 3

可以看到數據讀取和訓練的時間出現了重合,從而減少了整體的時間開銷。

並行數據提取

在真實的訓練環境中,輸入數據可能會存儲在遠程的文件系統如HDFS 中。由於本地存儲和遠程存儲之間存在一些差異,在本地可以正常運行的數據輸入流程可能會在遠程讀取數據時無法如預期一樣正常工作,具體差異如下:

首字節讀取時間:從遠程存儲讀取文件中的第一個字節可能比從本地存儲讀取文件首字節的時間要長好幾個數量級,時間花銷較大。

數據讀取吞吐量:雖然遠程存儲通常提供較大的聚合帶寬,但是按順序讀取單個文件可能僅會利用此帶寬的一小部分,吞吐量並不高。

另外,當原始的數據加載到內存中後,還可能需要對數據進行反序列化或解密(如protobuf 格式的數據),這還會需要額外的計算資源。誠然,無論數據存儲在本地或是遠程,都會存在此開銷,但如果數據沒有被有效地預取,遠程讀取數據會使得該開銷變得更大。

為了減輕各種數據提取開銷的影響, interleave 轉換可以被用於並行化數據加載,以交錯讀取多個數據集文件的內容(如讀取TextLineDataset 等數據集)。其中,並行讀取的文件數量可以通過cycle_length 參數來控制,表示從cycle_length 個文件交錯讀取數據,而從每個文件中連續讀取的樣本個數由block_length 參數控制,也就是說從一個文件讀取了block_length 個連續樣本後開始交錯從其它文件繼續讀取,而文件讀取的並行度由num_parallel_calls 參數控制,與prefetch 轉換一樣, interleave 轉換也支持tf.data.experimental.AUTOTUNE 設置,從而把有關使用什麼級別的並行度委派給tf.data 運行時動態決定。

interleave 轉換的默認參數使得它依次對多個數據文件中的單個樣本進行交錯讀取,其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 4

可以看到interleave 從兩個數據集交錯獲取數據樣本,但是其性能並沒有得到提升。而通過設置num_parallel_calls 參數後,其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 5

因為可以並行加載多個數據集文件,從而減少了依次打開數據文件所需的等待時間,繼而減少了全局訓練的時間開銷。

並行數據轉換

在準備輸入數據時,可能需要對原始的數據輸入進行預處理。為此tf.data 提供了map 轉換進行數據預處理操作,該轉換可以將用戶自定義的函數應用於輸入數據集中的每個元素。由於輸入元素彼此獨立,因此該預處理操作可以在多個CPU 內核間並行執行。

與prefetch 和interleave 轉換類似, map 轉換也提供了num_parallel_calls 參數來指定並行度,可以自行設置該參數的值,同時它也支持tf.data.experimental.AUTOTUNE 設置,從而把有關使用什麼級別的並行度委派給tf.data 運行時動態決定。

對於未並行化的map 轉換,其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 6

此時訓練流程的整體時間開銷為其它各個部分的時間開銷和預處理時間開銷累加之和。而並行化map 轉換後,其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 7

可以看到map 部分時間開銷出現了重合,從而在整體上減少了全局訓練的時間開銷。

數據緩存

cache 轉換可以在內存或者本地存儲中緩存處理後的數據集,這樣可以避免在每個epoch 都執行相同的操作(如文件打開和數據讀取)。一個基本的cache 轉換的時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 8

可以看到在第2 個epoch 時,由於緩存了數據集,因此文件打開,數據讀取以及預處理的時間開銷都被節省了。這是因為在cache 轉換之前的所有對數據集的操作只會在第1 個epoch 被執行,接下來的epoch 將會直接使用cache 轉換所緩存的數據。

如果map 轉換使用的用戶自定義函數是比較耗時的,只要預處理後的結果數據集能夠放進內存或本地存儲,就應該在map 轉換後應用cache 轉換,以減少後續每個epoch 都進行map轉換的時間開銷。如果用戶自定義的函數增加了存儲數據集所需的空間(超出緩存容量),那麼可以在緩存轉換後應用map 轉換,或考慮在訓練之前對數據進行預處理以減少資源使用。

向量化map 轉換

在map 轉換中調用用戶自定義函數會產生額外的開銷,因此最好對用戶自定義函數進行向量化處理(即讓它一次處理一批數據輸入),然後在map 轉化前應用batch 轉化,這樣map轉換就會應用於每個batch 的數據而非單一數據。

對於在map 轉換後應用batch 轉換的數據集,其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 9

可以看到map 函數被應用於每一個數據樣本,儘管其執行時間很快,也會對整體的時間性能造成影響。而在map 轉化前應用batch 轉化的數據集,其時間開銷如下圖所示:

TensorFlow 篇| TensorFlow 數據輸入的最佳實踐 10

可以看到map 函數僅被執行一次並被應用於1 個batch 的樣本,儘管其執行時間相比而言會更長,但額外的時間開銷僅出現一次,從而改善了整體的時間性能。

總結

TensorFlow 數據輸入的最佳實踐方法包括以下幾個部分:

使用prefetch 轉換來使得數據生產和消耗的時間開銷重疊。

使用interleave 轉換來並行化讀取數據集。

通過設置num_parallel_calls 參數來並行化map 轉換。

使用cache 轉換在第1 輪訓練時將數據緩存在內存或本地存儲中。

將map 轉換的用戶自定義函數向量化。

代碼實現

根據上面的最佳實踐方法,本節使用tf.data API 來完成TensorFlow 輸入數據的構建。具體實現代碼如下所示:

def make_dataset(input_pattern, shuffle_size, batch_size):
# map 解析函数,注意这里的向量化操作
def labeler(record):
fields = tf.io.decode_csv(
record,
record_defaults=['0'] * 32,
field_delim='t',
)
data = tf.strings.to_number(fields[1:32], out_type=tf.int32)
label = tf.strings.to_number(fields[:1], out_type=tf.int32)

data = tf.transpose(data)
label = tf.transpose(label)

return data, label

filenames = tf.data.Dataset.list_files(input_pattern)
dataset = filenames.interleave(
lambda filename: tf.data.TextLineDataset(filename),
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE,
)
dataset = dataset.repeat().shuffle(shuffle_size).batch(batch_size)
dataset = dataset.map(
lambda ex: labeler(ex),
num_parallel_calls=tf.data.experimental.AUTOTUNE,
).cache()

dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

return dataset

注意事項

在使用tf.data 的各種轉換操作時,要注意它們之間的操作順序,通常而言,更少的內存佔用意味著更好的轉換順序。

如果map 轉換的結果太大而無法放進內存,另外一種權衡的方法是將自定義的處理函數分為兩個部分(如果可以劃分),一個為耗時的部分,另一個為消耗內存的部分,然後將耗時部分的轉換結果進行緩存,而消耗內存部分的轉換結果則不予緩存。

參考資料

數據表現