你好,
我想知道如果我能得到你的建議。
我們想創建一個青銅三角洲表使用廣州JSON數據存儲在S3中但每次我們嚐試讀和寫它集群CPU峰值100%。我們沒有做任何轉換,隻是閱讀從S3,創建一個列as_of_date和寫作在三角洲S3。
目前需要超過1小時讀和寫20 gb的廣州JSON S3使用服務器122 gb的內存和16個核心是無效的。
當我們做ETL數據也在小文件6.7 mb-10mb寫道。
我們已經試過的東西:
source_path = (s3: / /測試數據/源/ 2022/06/03 / *’。格式(年、月、日)test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date')) test_data.createOrReplaceTempView('test_data') test_data.write.format('delta') \ .partitionBy('as_of_date') \ .option("mergeSchema", "true") \ .option("path",'s3://test-data-bronze/').mode("append") \ .saveAsTable('test_data.bronze')
你好,
我可以建立一個筆記本,但我有困難要回填增量。
我要處理所有的數據存儲在一個2022年/月/日格式逐步減少負載。我有一個筆記本的設置,將迭代start_date_param和end_date_param它運行一個工作每天回填。
當我在自動裝卸機使用一個特定的上傳路徑為例
upload_path = (s3: / /測試數據/日曆/ 2022/01/01 / *’。格式(年、月、日)我得到這個錯誤。
. lang。IllegalStateException:發現不匹配的事件:關鍵日曆/ 2022/01/02/00 - 11 - 6 a088a39愛因斯坦場方程- 4180 - 4 - 852 d - 11 - d09e6c2eb8.json。廣州沒有前綴:日曆/ 2022/01/01 /
當我沒有指定的年/月/日自動裝卸機為2022,而不是試圖加載整個目錄增量地這樣做。我看到在SparkUI試圖加載49 k文件。
我們如何設置它所以它加載數據第一天. .寫道,分區,然後走到第二天嗎?
我看到你mentitoed我們不應該partiton,年/月/日,減慢了閱讀然後我們S3會有大量的小文件的目錄。
最後,我們如何設置它分區1 gb,而不是優化和寫的10 mb的塊現在Auto-optmize和auto-compact這是做什麼?
我也設置.option (cloudFiles。\ backfillInterval”、“一天”)
也試過.option (“cloudFiles。backfillInterval ', 1) \
任何想法嗎?
再次感謝你的幫助!
Avkash
我建議離開自動裝卸機找出最好的方法回填你所有的文件,而不是試圖增加自己做自己的模式來回填。upload_path = (s3: / /測試數據/日曆/ 2022/01/01 / *’。格式(年、月、日)<--the problem I see here is that you have no place you are putting the year, month, or day. Maybe you mean this?
upload_path = (s3: / /測試數據/日曆/ {}/ {}/ {}/ *”。格式(年、月、日)
我提到你不應該重新分區,因為我之前指定的代碼,你將有一個緊湊的三角洲表是由非常大的文件。事實上你有很多小文件做自動最優化的步驟後,裝載器流讀&去年檢查站,我提到的是不尋常的。你做了一個
描述表擴展user_data_bronze_compact
和位置?然後在接下來的細胞做一個嗎
% fs ls file_path_of_delta_tabel
看到文件的大小?你看到的大小是什麼?
你好,
謝謝你回到我。
你是正確的,通常我們把這樣的上傳路徑upload_path = (s3: / /測試數據/日曆/ {}/ {}/ {}/ *”。格式(年、月、日)and each day we fill in the date programmatically in order to minimize the data read from S3.
看來自動裝卸機有更好的方法,我們做回填路徑保留為空,所以自動裝卸機可以識別所有桶但我擔心的是,自動裝卸機試圖加載2 tb的內存而不是增量地讀和寫。
我的問題是如果我們離開路徑/ *自動裝卸機可以確定最好的方法讀/寫那麼為什麼它試圖一次加載所有數據嗎?
關於重新分區。每個文件夾都是20 gb壓縮在大小和自動裝卸機讀數據並寫出來它隻寫10 mb的塊。我們有autocompact和自動優化打開,它還是這樣。
% sql設置spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;設置spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
問題可能是,當我在做優化as_of_date寫我敏感,使得自動裝卸機讀所有的數據,然後寫嗎?
這裏是我們當前的代碼
upload_path = (s3: / /測試數據/日曆/ * / * / *’).format(年、月、日)write_path = ' s3: / / test-data-bronze / not_optimized / schema_location = ' s3: / / test-autoloader / not_optimized / checkpoint_path = ' s3: / / test-autoloader / not_optimized / #設置流開始讀取輸入文件從bronze_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。schemaLocation ', schema_location) \ .option (“cloudFiles。maxFilesPerTrigger”, 16) \ .option (“cloudFiles。inferColumnTypes”、“真實”)\ .option (cloudFiles。格式,json) \ .option (cloudFiles。backfillInterval ', 1) \。load(upload_path) # Start the stream. bronze_df.writeStream.foreachBatch(bronze_df) \ checkpoint_path2 = 's3://test-autoloader/optimized/' bronze_df.withColumn("as_of_date",col('metadata.timestamp').cast('date')) \ .writeStream \ .format('delta') \ .trigger(once=True) \ .partitionBy('as_of_date') \ .option('checkpointLocation', checkpoint_path2) \ .option("path",'s3://test-data-bronze/optimized/') \ .table('test-data.calendar_bronze')
可以讓在一個10分鍾的電話,我可以給你這個問題通過屏幕分享嗎?
這將是一個非常昂貴的ETL所以我們想做對了。