#從s3山讀取流
df = (
spark.readStream.format (“cloudFiles”)
.option (“cloudFiles。格式”、“csv”)
.option (“9”、“| | | |”)
. schema (transaction_schema)
.option (“maxFilesPerTrigger”, 1)
.load (“dbfs: / mnt / s3 /公共/ test_transactions”)
)
#外負載contracts_df一旦流操作
contracts_df = spark.read.table (“hive_metastore.spindl.contracts”)
#轉換應用到整個流DataFrame
df = df。withColumn (“transaction_hash F.col (" id "))
#……更多的轉換…
#定義udf
contract_info_udf = F。udf (contract_info,……
#應用udf
df = df。withColumn (contract_info contract_info_udf (F.struct (contracts_df.columns)))
#……更多的轉換…
#寫入事務表
df.write.mode(“追加”).insertInto (“hive_metastore.spindl.test_transactions”)
#寫流
查詢= df.writeStream \
.format \(“δ”)
.option (“checkpointLocation”、“/ tmp /δ/測試/ _write_stream_checkpoints /”) \
.start ()
query.awaitTermination ()
我強烈懷疑,一個文件有300萬條記錄。確認如果數據來自單個文件或多個文件,你可以添加一個新列的值作為input_file_name ()。這將有助於我們理解是否配置maxFilesPerTrigger被認為是。
文檔-https://docs.m.eheci.com/en/sql/language-manual/functions/input_file_name.html
有幾個可能的方法來提高性能的火花流攝取大量的S3的工作文件。這裏有一些建議:
默認情況下,調整分區的數量設置為200,這可能是太低的工作負載。該參數控製火花時應該使用多少個分區之間移動數據階段,和太少的分區會導致較低的並行性和緩慢的性能。你可以試試這個參數到一個更高的數量增加,基於數據的大小和核集群節點的數量,提高並行性和提高性能。
例如:
寫作三角洲湖時,火花首先將輸出寫入到臨時文件,然後將其合並到主表使用背景的工作。如果你有太多的小文件,這可能導致低劣的性能。為了幫助緩解這個問題,你可以合並輸出dataframe減少創建的文件的數量。
例如:
執行行級轉換使用udf可以是昂貴的,特別是如果函數沒有優化。你可以嚐試優化你的udf:
您可能需要考慮使用更大的實例類型。或者,你可以調整你的自動定量政策動態集群規模大小根據傳入的工作量。你可以使用磚自動定量的特性來幫助最大化集群利用率和降低整體成本。
考慮顯式打開三角洲湖的自動最優化特性。當你保持攝取數據,選擇優化/緊湊可以提高查詢性能。
這些最佳實踐改善火花流媒體應用程序的性能從S3使用自動裝卸機攝取大量的數據。
我希望會有幫助!