取消
顯示的結果
而不是尋找
你的意思是:

如何向表分區磚三角洲平行合並數據使用PySpark /火花流?

bobbysidhartha
新的因素

PySpark流管道,從卡夫卡的話題,讀取數據時數據經曆通過各種變換,最後被合並成一個磚三角洲的表。一開始我們的數據加載到三角洲表通過合並函數如下考慮。

這傳入dataframe inc_df所有分區的數據。

合並成main_db。main_delta_table main_dt main_dt.continent上使用inc_df df = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company當匹配不匹配時更新設置*然後插入*

我們在表級別上執行上麵的查詢。

我給一個非常基本的圖下圖的過程。

WbOeJ但是我的三角洲表分區在大陸和年。例如,這是我的表分區的三角洲的樣子。

6 mywv

所以我試著實現合並分區級別,並試圖運行合並活動在多個分區平行。即我創建了獨立的管道與過濾器在查詢分區的水平。下麵的圖片可以看到。

合並成main_db。main_delta_table main_dt main_dt上使用inc_df df。大陸(非洲)和main_dt。年(“202301”)和main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company當匹配不匹配時更新設置*然後插入*

但是我看到一個錯誤和並發性。

com.databricks.sql.transaction.tahoe。ConcurrentAppendException:文件添加到分區(非洲大陸=,= 2021年)並發更新。請再次嚐試操作。

我知道錯誤告訴我,不能同時更新文件。但是我有大量的數據在生產和我不想執行合並表級別上有近10億條記錄沒有適當的過濾器。

Trial2:作為另一種方法,

  1. 我救了我的增量dataframe S3存儲桶中(如分期dir)和結束我流管道。
  2. 然後我有一個分離PySpark工作從S3登台dir讀取數據,執行合並成我的主要增量表,再次在分區級別(我指定分區的工作過濾器)

但我麵臨著同樣的異常/錯誤。

Trial3:

我還做了另一個嚐試在不同的方法如前所述鏈接和“ConcurrentAppendException”部分的頁麵。

base_delta = DeltaTable.forPath(火花,s3: / / PATH_OF_BASE_DELTA_TABLE) base_delta.alias (“main_dt”)。合並(源= final_incremental_df.alias (df)、條件= " main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company大陸=非洲”).whenMatchedUpdateAll () .whenNotMatchedInsertAll () . execute ()

base_delta = DeltaTable.forPath(火花,s3: / / PATH_OF_BASE_DELTA_TABLE) base_delta.alias (“main_dt”)。合並(源= final_incremental_df.alias (df)、條件= " main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company大陸=亞洲”).whenMatchedUpdateAll () .whenNotMatchedInsertAll () . execute ()

我跑上麵的合並操作在兩個單獨的管道。但我仍然麵臨著同樣的問題。

有人能讓我知道如何設計和優化我的流管道將數據合並到三角洲表分區級別通過多個作業平行(單個分區上運行的工作)

1回複1

匿名
不適用

@bobbysidhartha:

當數據合並到一個分區並行三角洲表,重要的是要確保每個工作隻訪問和修改文件的分區,以避免並發問題。實現這一目標的一個方法是使用分區級鎖來防止多個用戶的並行更新相同的分區。

這裏是一個例子如何修改您PySpark流管道將數據合並到一個分區的三角洲表並行:

  1. 為每個分區創建一個單獨的火花工作你想更新。分區鍵上的每個工作都應該有一個過濾器,以確保它隻處理數據的分區。
  2. 使用DeltaTable.forPath()方法創建一個增量表對象為每個分區。然後,您可以使用這個對象執行合並操作。
  3. 使用增量表的分區級鎖定特性來確保隻有一份工作可以訪問一個分區。要做到這一點,您可以使用.option (“partitionBy”、“partition_key”)。選項(“replaceWhere”、“partition_key = partition_value”)方法在創建δ為每個分區表對象。這將獲得鎖定分區和防止其他工作時修改正在進行合並操作。
  4. 合並操作完成後,釋放鎖定分區使用.option (“replaceWhere”、“1 = 1”)的方法。

下麵是一個示例代碼片段來幫助你開始:

#得到δ表的分區鍵partition_keys =[“大陸”、“年”]#遍曆每個分區並創建一個火花工作為大陸更新(“非洲”、“亞洲”、“歐洲”,…):年(“2021”、“2022”、“2023”,…):#創建一個過濾器為當前分區partition_filter = f“大陸={大陸}和年={一}”#創建一個增量表對象當前分區delta_table = DeltaTable。”forPath(火花,f /道路/ / delta_table /{大陸}/{一}”)#獲得一個鎖在當前分區delta_table \ .toDF() \其中(partition_filter) \ .write \ .option (“partitionBy”、“,”. join (partition_keys)) \ .option (“replaceWhere”, partition_filter) \ .format(“δ”)\ .mode(“追加”)\ .save (delta_table._data_path) #當前分區上執行合並操作delta_table \ .alias (main_dt) \ .merge(源= inc_df.alias (df)、條件= " main_dt.continent = df。大陸和main_dt.str_id = df。str_id main_.rule_date = df.rule_date和main_.rule_id = df.rule_id main_.rule_read_start = df.rule_read_start main_.company = df.company”) \ .whenMatchedUpdateAll () \ .whenNotMatchedInsertAll () \ . execute() #釋放鎖定當前分區delta_table \ .toDF() \其中(partition_filter) \ .write \ .option (“partitionBy”、“,”. join (partition_keys)) \ .option (“replaceWhere”、“1 = 1”) \ .format(“δ”)\ .mode(“追加”)\ .save (delta_table._data_path)

請注意,上麵的代碼僅僅是一個例子,可能需要修改以適合您的特定用例。另外,一定要測試這種方法在運行前的一小部分數據在整個數據集,以確保它能夠正常工作。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map