你好,
我想知道如果我能得到你的建議。
我們想創建一個青銅三角洲表使用廣州JSON數據存儲在S3中但每次我們嚐試讀和寫它集群CPU峰值100%。我們沒有做任何轉換,隻是閱讀從S3,創建一個列as_of_date和寫作在三角洲S3。
目前需要超過1小時讀和寫20 gb的廣州JSON S3使用服務器122 gb的內存和16個核心是無效的。
當我們做ETL數據也在小文件6.7 mb-10mb寫道。
我們已經試過的東西:
source_path = (s3: / /測試數據/源/ 2022/06/03 / *’。形式at(year, month, day)) test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date')) test_data.createOrReplaceTempView('test_data') test_data.write.format('delta') \ .partitionBy('as_of_date') \ .option("mergeSchema", "true") \ .option("path",'s3://test-data-bronze/').mode("append") \ .saveAsTable('test_data.bronze')
嗨Kaniz,
謝謝你的注意,謝謝大家的建議和幫助。@Joseph Kambourakis我的廣告你的建議我們的負載,但是我沒有看到任何改變在我們的數據加載或加載的時間數據。
我做了一些額外的研究和一個選擇那些有廣州的問題可以讀取數據文本文件使它快速閱讀,然後使用火花來推斷模式。
然而,最後我們決定使用自動裝卸機加載數據…但是我們還在等待幫助如何回填一年數據增量(每天)。
讓我知道如果你有任何建議,我增加了更多的康泰克斯上麵我的帖子來User16460565755155528764(磚)
謝謝,
K
第一次嚐試使用自動裝卸機內三角洲生活表為你管理你的ETL管道。
否則,你可以用一個筆記本把它寫出來。
下麵是一個方法讓你的很小的廣州JSON文件流有效成磚從S3 bucket &然後寫進一個緊湊的形式所以你其他的管道應該顯示性能改進。我建議你在多個節點上運行。比許多小的更大、更少的機器是最好的——見“複雜批ETL”這集群配置的最佳實踐文檔。
schema_location = < file_path_defined > upload_path = < file_path_defined > checkpoint_path = < file_path_defined > write_path = < file_path_defined > #設置流開始讀取輸入文件從bronze_df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。schemaLocation ', schema_location) \ .option (“cloudFiles。maxFilesPerTrigger”, <設置為集群的核心數量>)\ .option (cloudFiles。格式,json) \ .load (upload_path) #開始流。bronze_df。writeStream \ .format(“δ”).option (checkpointLocation, checkpoint_path) \ .table (“user_data_bronze_not_compact”)
如果仍然存在性能問題,試一試MaxBytesPerTrigger而不是MaxFilesPerTrigger;可以幫助因為您使用的是廣州JSON而不是直接JSON。
現在確保所有新三角洲表將自動緊湊。
% sql設置spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;設置spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
現在小型表:
checkpoint_path2 = < file_path > bronze_df。writeStream \ .format(“δ”).option (checkpointLocation, checkpoint_path2) \ .table (“user_data_bronze_compact”)
從這裏你可以閱讀緊湊三角洲表,您應該看到一個更好的性能。
你好,
謝謝你的建議!
今天早上我設置自動裝卸機腳本在一個筆記本,它似乎轉移文件很快。我添加了.option (“cloudFiles。inferColumnTypes”、“真實”)按次序的檢測模式。
問題:
謝謝你的幫助!
卡什