取消
顯示的結果
而不是尋找
你的意思是:

的幫助!將廣州JSON轉換為三角洲造成巨大的CPU峰值和ETL的天!

卡什
貢獻者三世

你好,

我想知道如果我能得到你的建議。

我們想創建一個青銅三角洲表使用廣州JSON數據存儲在S3中但每次我們嚐試讀和寫它集群CPU峰值100%。我們沒有做任何轉換,隻是閱讀從S3,創建一個列as_of_date和寫作在三角洲S3。

目前需要超過1小時讀和寫20 gb的廣州JSON S3使用服務器122 gb的內存和16個核心是無效的。

當我們做ETL數據也在小文件6.7 mb-10mb寫道。

我們已經試過的東西:

  1. 我最初認為這個問題是由於我們的廣州JSON數據,不是可剝離所以火花很難處理這個問題,但我們沒有與其他管道。(如果這是它還不清楚)
  2. 然後我認為這是一個傾斜的問題由於.withColumn (“as_of_date坳(metadata.timestamp) .cast(日期))但即使我刪除它的問題仍然持續。
  3. 我甚至嚐試添加一個鹽提示,但沒有運氣。
  4. 我們試圖定義JSON,因為它是嵌套的模式幫助它加載更快了,但是寫了同樣長的時間。(這是不理想的,因為我們的模式隨時間變化和定義在這裏使我們失去數據)
  5. 我們嚐試.repartition (1000)
  6. 我們也試圖讓數據磚動態選擇傾斜是什麼,然後,以編程方式設置為斜提示,但沒有運氣。
  7. 我們打開Autocompact,自動優化它寫大文件但沒有做到這一點,再一次寫較小的10 mb的文件。

source_path = (s3: / /測試數據/源/ 2022/06/03 / *’。格式(年、月、日)test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date')) test_data.createOrReplaceTempView('test_data') test_data.write.format('delta') \ .partitionBy('as_of_date') \ .option("mergeSchema", "true") \ .option("path",'s3://test-data-bronze/').mode("append") \ .saveAsTable('test_data.bronze')

19日回複19

杜利
價值貢獻
  1. 我建議不寫小鑲花的眾多文件S3因為性能將可怕的寫作三角洲格式相比,更少和更大的文件版本的相同的數據——在我們的例子中,被稱為user_data_bronze_compact。我不會建議分區表小於1結核病和沒有分區小於1 gb由於性能的原因。你寫S3將更高效緊湊版本的表。你可以嚐試使用foreachBatch寫作()或foreach ()。
  2. 然後把這青銅dataframe使用觸發一次選項。看到觸發在這裏
  3. 自動裝卸機可以使用“cloudFiles.backfillInterval回填的增量
  4. 你可以找到的位置表的描述表擴展user_data_bronze_compact底部它說“位置”。你可以看到使用% fs文件構成表然後ls file_path_you_grabbed_from_describe_table_extended_step
  5. 你可以打開汽車優化步驟在你開始流&跳過中間的檢查點。

杜利
價值貢獻

同樣,關掉auto-compact如果延遲是一個問題

卡什
貢獻者三世

你好,

我可以建立一個筆記本,但我有困難要回填增量。

我要處理所有的數據存儲在一個2022年/月/日格式逐步減少負載。我有一個筆記本的設置,將迭代start_date_param和end_date_param它運行一個工作每天回填。

當我在自動裝卸機使用一個特定的上傳路徑為例

upload_path = (s3: / /測試數據/日曆/ 2022/01/01 / *’。格式(年、月、日)我得到這個錯誤。

. lang。IllegalStateException:發現不匹配的事件:關鍵日曆/ 2022/01/02/00 - 11 - 6 a088a39愛因斯坦場方程- 4180 - 4 - 852 d - 11 - d09e6c2eb8.json。廣州沒有前綴:日曆/ 2022/01/01 /

當我沒有指定的年/月/日自動裝卸機為2022,而不是試圖加載整個目錄增量地這樣做。我看到在SparkUI試圖加載49 k文件。

我們如何設置它所以它加載數據第一天. .寫道,分區,然後走到第二天嗎?

我看到你mentitoed我們不應該partiton,年/月/日,減慢了閱讀然後我們S3會有大量的小文件的目錄。

最後,我們如何設置它分區1 gb,而不是優化和寫的10 mb的塊現在Auto-optmize和auto-compact這是做什麼?

我也設置.option (cloudFiles。\ backfillInterval”、“一天”)

也試過.option (“cloudFiles。backfillInterval ', 1) \

任何想法嗎?

再次感謝你的幫助!

Avkash

杜利
價值貢獻

我建議離開自動裝卸機找出最好的方法回填你所有的文件,而不是試圖增加自己做自己的模式來回填。upload_path = (s3: / /測試數據/日曆/ 2022/01/01 / *’。格式(年、月、日)<--the problem I see here is that you have no place you are putting the year, month, or day. Maybe you mean this?

upload_path = (s3: / /測試數據/日曆/ {}/ {}/ {}/ *”。格式(年、月、日)

我提到你不應該重新分區,因為我之前指定的代碼,你將有一個緊湊的三角洲表是由非常大的文件。事實上你有很多小文件做自動最優化的步驟後,裝載器流讀&去年檢查站,我提到的是不尋常的。你做了一個

描述表擴展user_data_bronze_compact

和位置?然後在接下來的細胞做一個嗎

% fs ls file_path_of_delta_tabel

看到文件的大小?你看到的大小是什麼?

卡什
貢獻者三世

你好,

謝謝你回到我。

你是正確的,通常我們把這樣的上傳路徑upload_path = (s3: / /測試數據/日曆/ {}/ {}/ {}/ *”。格式(年、月、日)and each day we fill in the date programmatically in order to minimize the data read from S3.

看來自動裝卸機有更好的方法,我們做回填路徑保留為空,所以自動裝卸機可以識別所有桶但我擔心的是,自動裝卸機試圖加載2 tb的內存而不是增量地讀和寫。

我的問題是如果我們離開路徑/ *自動裝卸機可以確定最好的方法讀/寫那麼為什麼它試圖一次加載所有數據嗎?

關於重新分區。每個文件夾都是20 gb壓縮在大小和自動裝卸機讀數據並寫出來它隻寫10 mb的塊。我們有autocompact和自動優化打開,它還是這樣。

% sql設置spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;設置spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

問題可能是,當我在做優化as_of_date寫我敏感,使得自動裝卸機讀所有的數據,然後寫嗎?

這裏是我們當前的代碼

upload_path = (s3: / /測試數據/日曆/ * / * / *’).format(年、月、日)write_path = ' s3: / / test-data-bronze / not_optimized / schema_location = ' s3: / / test-autoloader / not_optimized / checkpoint_path = ' s3: / / test-autoloader / not_optimized / #設置流開始讀取輸入文件從bronze_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。schemaLocation ', schema_location) \ .option (“cloudFiles。maxFilesPerTrigger”, 16) \ .option (“cloudFiles。inferColumnTypes”、“真實”)\ .option (cloudFiles。格式,json) \ .option (cloudFiles。backfillInterval ', 1) \。load(upload_path) # Start the stream. bronze_df.writeStream.foreachBatch(bronze_df) \ checkpoint_path2 = 's3://test-autoloader/optimized/' bronze_df.withColumn("as_of_date",col('metadata.timestamp').cast('date')) \ .writeStream \ .format('delta') \ .trigger(once=True) \ .partitionBy('as_of_date') \ .option('checkpointLocation', checkpoint_path2) \ .option("path",'s3://test-data-bronze/optimized/') \ .table('test-data.calendar_bronze')

可以讓在一個10分鍾的電話,我可以給你這個問題通過屏幕分享嗎?

這將是一個非常昂貴的ETL所以我們想做對了。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map