取消
顯示的結果
而不是尋找
你的意思是:

的幫助!將廣州JSON轉換為三角洲造成巨大的CPU峰值和ETL的天!

卡什
貢獻者三世

你好,

我想知道如果我能得到你的建議。

我們想創建一個青銅三角洲表使用廣州JSON數據存儲在S3中但每次我們嚐試讀和寫它集群CPU峰值100%。我們沒有做任何轉換,隻是閱讀從S3,創建一個列as_of_date和寫作在三角洲S3。

目前需要超過1小時讀和寫20 gb的廣州JSON S3使用服務器122 gb的內存和16個核心是無效的。

當我們做ETL數據也在小文件6.7 mb-10mb寫道。

我們已經試過的東西:

  1. 我最初認為這個問題是由於我們的廣州JSON數據,不是可剝離所以火花很難處理這個問題,但我們沒有與其他管道。(如果這是它還不清楚)
  2. 然後我認為這是一個傾斜的問題由於.withColumn (“as_of_date坳(metadata.timestamp) .cast(日期))但即使我刪除它的問題仍然持續。
  3. 我甚至嚐試添加一個鹽提示,但沒有運氣。
  4. 我們試圖定義JSON,因為它是嵌套的模式幫助它加載更快了,但是寫了同樣長的時間。(這是不理想的,因為我們的模式隨時間變化和定義在這裏使我們失去數據)
  5. 我們嚐試.repartition (1000)
  6. 我們也試圖讓數據磚動態選擇傾斜是什麼,然後,以編程方式設置為斜提示,但沒有運氣。
  7. 我們打開Autocompact,自動優化它寫大文件但沒有做到這一點,再一次寫較小的10 mb的文件。

source_path = (s3: / /測試數據/源/ 2022/06/03 / *’。形式at(year, month, day)) test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date')) test_data.createOrReplaceTempView('test_data') test_data.write.format('delta') \ .partitionBy('as_of_date') \ .option("mergeSchema", "true") \ .option("path",'s3://test-data-bronze/').mode("append") \ .saveAsTable('test_data.bronze')

1接受解決方案

接受的解決方案

卡什
貢獻者三世

嗨Kaniz,

謝謝你的注意,謝謝大家的建議和幫助。@Joseph Kambourakis我的廣告你的建議我們的負載,但是我沒有看到任何改變在我們的數據加載或加載的時間數據。

我做了一些額外的研究和一個選擇那些有廣州的問題可以讀取數據文本文件使它快速閱讀,然後使用火花來推斷模式。

然而,最後我們決定使用自動裝卸機加載數據…但是我們還在等待幫助如何回填一年數據增量(每天)。

讓我知道如果你有任何建議,我增加了更多的康泰克斯上麵我的帖子來User16460565755155528764(磚)

謝謝,

K

在原帖子查看解決方案

19日回複19

杜利
價值貢獻

“服務器有122 gb的內存和16核”

這是一個節點的集群嗎?

卡什
貢獻者三世

是的在這個例子中它是單個節點集群規模但我們也嚐試這5 - 122 gb的服務器和CPU問題在每個服務器和工作沒有加速。

我們也試圖混淆為潛水員和δ實例計算實例的工人。

杜利
價值貢獻

試著用自動裝卸機和使自動最優化表的屬性。

第一次嚐試使用自動裝卸機內三角洲生活表為你管理你的ETL管道。

否則,你可以用一個筆記本把它寫出來。

下麵是一個方法讓你的很小的廣州JSON文件流有效成磚從S3 bucket &然後寫進一個緊湊的形式所以你其他的管道應該顯示性能改進。我建議你在多個節點上運行。比許多小的更大、更少的機器是最好的——見“複雜批ETL”集群配置的最佳實踐文檔。

schema_location = < file_path_defined > upload_path = < file_path_defined > checkpoint_path = < file_path_defined > write_path = < file_path_defined > #設置流開始讀取輸入文件從bronze_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。schemaLocation ', schema_location) \ .option (“cloudFiles。maxFilesPerTrigger”, <設置為集群的核心數量>)\ .option (cloudFiles。格式,json) \ .load (upload_path) #開始流。bronze_df。writeStream \ .format(“δ”).option (checkpointLocation, checkpoint_path) \ .table (“user_data_bronze_not_compact”)

如果仍然存在性能問題,試一試MaxBytesPerTrigger而不是MaxFilesPerTrigger;可以幫助因為您使用的是廣州JSON而不是直接JSON。

現在確保所有新三角洲表將自動緊湊。

% sql設置spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;設置spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

現在小型表:

checkpoint_path2 = < file_path > bronze_df。writeStream \ .format(“δ”).option (checkpointLocation, checkpoint_path2) \ .table (“user_data_bronze_compact”)

從這裏你可以閱讀緊湊三角洲表,您應該看到一個更好的性能。

卡什
貢獻者三世

你好,

謝謝你的建議!

今天早上我設置自動裝卸機腳本在一個筆記本,它似乎轉移文件很快。我添加了.option (“cloudFiles。inferColumnTypes”、“真實”)按次序的檢測模式。

問題:

  1. 我們如何拯救user_data_bronze_not_compact s3路徑分區通過(yyyy / mm / dd) ?
  2. 我們如何設置它自動裝卸機工作隻觸發一次,停止時加載所有數據在s3文件夾中嗎?
  3. 我們要每天運行自動裝卸機一次處理前一天的數據。目前我們使用upload_path = (s3: / /測試數據/日曆/ {}/ {}/ {}”。格式(年、月、日)加載數據為一個特定的一天。有沒有更好的方法自動裝卸機嗎?回填?增量?
  4. 在這個查詢我們跑廣州JSON數據加載到三角洲和商店表(而不是優化)。因為我們不指定位置表在S3中,這個表存儲在哪裏?
  5. 當我們優化這些數據並將其存儲在S3中,本質上我們重寫一遍我們現在有3份數據對嗎?如果是這樣,我們需要跑3號嗎?或者我們可以優化步驟2嗎?
    1. JSON
    2. JSONδ(不是優化)
    3. DETLA優化δ(優化)

謝謝你的幫助!

卡什

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map