一些背景

假設您有一個數據分析批處理作業, 它在專用計算機上每小時運行一次。隨著周的推移, 您會發現輸入越來越大, 運行時間越長, 就會慢慢接近一個小時標記。您擔心隨后的處決可能會開始 ‘ 相互運行 ‘, 導致您的業務管線行為不端?;蛘? 您可能在 SLA 下為給定時間約束中的一批信息提供結果, 并且隨著批次大小在生產中慢慢增加, 您正接近分配的最大時間。

這聽起來像你可能有一個流問題!但是-你說-分析管道的其他部分是由其他團隊擁有的, 讓所有在船上遷移到流式架構的人都需要時間和大量的努力。在發生的時候, 你的管道的特定部分可能會被完全堵塞。沃拉魯雖然最初是為流和事件數據設計的, 但也可以用來可靠地并行化許多通常不被認為是流式的工作負載, 幾乎沒有什么努力。

讓我們讓我們的熊貓走得更快!我們將使用特定的群集來并行化批處理作業, 并通過?在一臺計算機上減少其運行時。群集將由一臺機器上的幾個沃拉魯工人組成, 在完成作業后可以關閉。

隨著這個結構的到位, 我們可以很容易地橫向擴展到多臺機器, 如果需要的話。這意味著我們可以在自己的后院中推出一小部分流式架構, 并在時間到了將堆棧的其他部分移動到事件流的世界時準備好一個故事。

現有管線

# file: old_pipeline.py

df = pd.read_csv(infile, index_col=0, dtype=unicode, engine='python')
fancy_ml_black_box.classify_df(df)
df.to_csv(outfile, header=False)

瓶頸在于 fancy_ml_black_box.classify_df 。這個函數運行一個分類器, 由我們的數據分析員編寫, 在每行的熊貓 dataframe。由于對特定行進行分類的結果與對任何其他行的分類無關, 所以它似乎是并行化的好候選。

花式黑匣子 Classifer 的注記

如果您查看分類器源代碼, 您會發現它調用dataframe. 應用一個相當無意義的計算。我們已經選擇了一些東西, 燃燒 CPU 周期, 以模擬昂貴的機器學習分類過程, 并展示了從并行化的好處。

以下是我們如何與沃拉魯:

    ab = wallaroo.ApplicationBuilder("Parallel Pandas Classifier with Wallaroo")
    ab.new_pipeline("Classifier",
                    wallaroo.TCPSourceConfig(in_host, in_port, decode))
    ab.to_stateful(batch_rows, RowBuffer, "CSV rows + global header state")
    ab.to_parallel(classify)
    ab.to_sink(wallaroo.TCPSinkConfig(out_host, out_port, encode))

我們的想法是使用我們的 TCP 源攝取 csv 行, 將它們分批到小 dataframes, 并并行運行分類算法。

我們將保留管線的輸入和輸出格式, 保持與上游和下游系統的兼容性, 但希望通過利用服務器上的所有內核, 可以看到顯著的速度增長。以下是不同大小的輸入文件的運行時間:

輸入大小 所采取的時間 (AWS c5.4xlarge)
1000行 3.7s
1萬行 555
10萬行 5m 53s
100萬行 58m 21s

這些數字清楚地表明, 我們正在處理線性運行時復雜度的算法-執行任務所用的時間與輸入的大小線性相關。我們可以估計, 如果數據的速率超過270行/秒, 我們的管線就會陷入麻煩。

這意味著, 如果每小時工作輸入開始接近100萬行, 新的作業可能會開始 “運行到” 尚未完成的舊作業。

熊貓與沃拉魯的并行化

讓我們來看看我們是否可以通過在這臺機器上拆分所有可用的 CPU 內核 (8) 中的所有工作來提高這些數字。首先, 我們需要一些腳手架來設置沃拉魯的輸入和輸出。

Three process architecture: send.py sends data, wallaroo processes it, and sends to data_receiver

步驟 1: 將 CSV 文件發送到沃拉魯

我們將使用 Python 腳本讀取輸入 csv 文件中的所有行, 并將它們發送到我們的沃拉魯 TCP 源。我們需要對每一行進行, 以便在沃拉魯源中正確解碼它們:

try:
   with open(filename, 'rb') as f:
     for line in f.readlines():
       line = line.strip()
       sock.sendall(struct.pack(">I",len(line))+line)

finally:
   sock.sendall(struct.pack(">I",len(EOT))+EOT)
   print('Done sending {}'.format(filename))
   sock.close()

sock.sendall(struct.pack(">I",len(line))+line)方法: 將行的長度編碼為4字節、大端字節整數 ( I ), 然后在 TCP 套接字下發送該整數和整行文本。

finally 子句中, 我們還編碼和發送一個單一的ASCII EOT字節, 信號, 這是我們的輸入結束。

此 TCP 輸入由解碼器接收:

@wallaroo.decoder(header_length=4, length_fmt=">I")
def decode(bs):
    if bs == "\x04":
        return EndOfInput()
    else:
        return bs

正如您所看到的, 如果我們的數據是 EOT 字節 ( \x04 ), 我們將創建一個對象, 使 “輸入的末尾” 含義顯式。否則, 我們將把數據看作是。

步驟 2: 對 CSV 行進行批處理

管道中的下一步是將輸入行批處理為100塊。

@wallaroo.state_computation(name='Batch rows of csv, emit DataFrames')
def batch_rows(row, row_buffer):
    return (row_buffer.update_with(row), True)

RowBuffer狀態對象將采取它所看到的第一行, 并將其保存在內部作為 header 。然后它將接受傳入行, 直到它存儲了一定數量 (我們的應用程序中有100行)。.update_with(row)如果添加了該方法, None row 但緩沖區中仍有空間, 則返回如果更新填充緩沖區, 它將在內部為零, 并發出 BatchedRows 兩個字段的對象: a headerrows

關于序列化效率的一個注記

為什么要進行批處理, 當我們可以簡單地將 CSV 文件中的每個條目作為單行 dataframe 發送到我們的分類器?答案是: 速度。在沃拉魯中計算步驟之間的每次數據傳輸都可能需要對導線上的數據進行編碼和解碼, 而 dataframe 對象的創建也不是沒有成本的。

步驟 3: 并行分類小型 Dataframes

這是管道的一部分, 我們可以將沃拉魯的內置分配機制歸結于我們的問題:

@wallaroo.computation(name="Classify")
def classify(batched_rows):
    df = build_dataframe(batched_rows)
    fancy_ml_black_box.classify_df(df)
    return df

在將對象轉換為 dataframe 的過程中, 還有一些按摩 BatchedRows :

def build_dataframe(br):
    buf = StringIO(br.header + "\n" + ("\n".join(br.rows)))
    return pd.read_csv(buf, index_col=0, dtype=unicode, engine='python')

本質上, 我們粘附 BatchedRows.headerBatchedRows.rows 模擬一個獨立的 csv 文件, 然后我們傳遞到 pandas.read_csv StringIO緩沖區的形式?,F在, 我們可以將結果豐富的 dataframe 傳遞給 fancy_ml_black_box.classify_df() 函數。

上述所有工作 (包括將數據封送到 dataframe 中) 都是并行發生的, 而群集中的每個沃拉魯工作者都得到了不同的實例 BufferedRows 。

步驟 4: 將編碼回文件

dataframe 輸出 classify() , 上面, 得到序列化和框架的 encode 步驟?,F在, 您應該有點熟悉整個項目中使用的簡單 TCP 框架:

def encode(df):
    s = dataframe_to_csv(df)
    return struct.pack('>I',len(s)) + s

將幫助器函數 dataframe_to_csv 定義為:

def dataframe_to_csv(df):
    buf = StringIO()
    df.to_csv(buf, header=False)
    s = buf.getvalue().strip()
    buf.close()
    return s

此表示法由沃拉魯工具讀取 data_receiver , 它被告知偵聽 --framed 數據:

nohup data_receiver  \
      --framed --listen "$LEADER":"$SINK_PORT" \
      --ponynopin \
      > "$OUTPUT" 2>&1 &

這是很好的, 因為這是它會得到的。輸出將被寫入到由環境變量指定的文件中 OUTPUT 。

對運行時的影響

首先, 讓我們驗證新代碼是否與舊代碼生成了相同的輸出:

$ /usr/bin/time make run-old INPUT=input/1000.csv
./old_pipeline.py input/1000.csv "output/old_1000.csv"
3.85user 0.47system 0:03.70elapsed 116%CPU (0avgtext+0avgdata 54260maxresident)k
176inputs+288outputs (0major+17423minor)pagefaults 0swaps

$ /usr/bin/time make run-new N_WORKERS=1 INPUT=input/1000.csv
INPUT=input/1000.csv OUTPUT="output/new_1000.csv" N_WORKERS=1 ./run_machida.sh
(..)
4.48user 0.90system 0:04.13elapsed 130%CPU (0avgtext+0avgdata 63808maxresident)k
0inputs+352outputs (0major+989180minor)pagefaults 0swaps

$ diff output/new_1000.csv output/old_1000.csv
$ echo $?
0

耶!結果是匹配的, 運行時只會慢1秒, 這并不那么糟糕, 考慮到我們正在啟動3個單獨的進程 (發件人、沃拉魯和接收器), 并通過網絡發送所有數據兩次。首先, 1萬行文件:

原始代碼 1 工作者 4 名工人 8 名工人
555 595 405 11s

現在, 使用10萬行文件:

原始代碼 1 工作者 4 名工人 8 名工人
5m48s 6m28s 3m16s 1m41s

并與百萬行文件:

原始代碼 1 工作者 4 名工人 8 名工人
58m21s 1h03m46s 32m12s 16m33s

你為什么不測試兩個工人?

由于 Python 執行模型的單線程約束, 沃拉魯群集中的初始值設定項通常會在將工作發送到群集的其余部分之前, 積極地承擔其在并行工作負載中的份額。

這意味著在兩個工人上運行并行作業不會產生速度效益。我們建議運行至少四名員工集群, 以利用沃拉魯的擴展能力。

正如您所看到的 (通過克隆這個示例項目來驗證自己), 我們能夠將百萬行的處理時間減少到十六分鐘。此外, 如果輸入數據集對于我們的單機八工作者群集來說太大, 我們可以很容易地添加更多的機器并利用額外的并行性, 而不會在我們的沃拉魯應用程序中更改單行代碼。

這給了我們相當大的能力來抵御日益增加的負載的風暴, 而我們為整個系統設計了一個更成熟的流體系結構。

接下來呢?

希望, 我已經提出了以上的情況下, 沃拉魯可以作為一種特殊的方法, 以適應您現有的基于熊貓的分析管道, 以處理增加負荷。下一次, 我將向您展示如何沃拉魯群集按需進行旋轉, 以處理那些在一臺機器上無法容納的真正巨大的工作。

將分析管道放在流式框架中不僅可以擴展數據科學的規模, 還能為實時的洞察力提供機會。一旦你準備好進入一個真正的 evented 模型, 你所要做的就是把你的數據直接發送到沃拉魯, 完全繞過 CSV 階段。實際的沃拉魯管線不需要改變!有了一點點的預先投資, 你已經解開了廣泛的可能性, productionize 你的 Python 分析代碼。

Comments are closed.