取消
顯示的結果
而不是尋找
你的意思是:

火花總是執行廣泛的投射無論spark.sql。autoBroadcastJoinThreshold期間與DeltaTable流合並操作。

gauthamchettiar
新的貢獻者二世

我想做一個流合並使用本指南-δ表之間https://docs.delta.io/latest/delta-update.html upsert-from-streaming-queries-using-foreachbatch

我們的代碼示例(Java):

數據集<行> sourceDf = sparkSession .readStream () .format .option(“δ”)(“inferSchema”,“真正的”).load(路徑);DeltaTable DeltaTable = DeltaTable。forPath (sparkSession定位路徑);sourceDf.createOrReplaceTempView (“vTempView”);StreamingQuery平方=火花()。sql (“select * from vTempView”) .writeStream () .format(“δ”).foreachBatch ((microDf, id) - > {deltaTable.alias (e) .merge (microDf.alias (“d”)、“e。.updateAll SALE_ID = d.SALE_ID ") .whenMatched () () .whenNotMatched () .insertAll () . execute ();}).outputMode .option(“更新”)(“checkpointLocation”, checkpointPath) .trigger (Trigger.Once ()) .start () .awaitTermination ();

源路徑和目標路徑已經在使用檢查點文件夾同步。約有800萬行數據總計約450 mb的鑲花的文件。

新數據時在源路徑(比方說987行),然後試圖執行BroadCastHashJoin火花,廣泛投目標表8 m行。

這裏的DAG片段合並操作(與表1 m行),

BroadCastJoin 1米

預期:

我在等更小的數據集(我。艾凡:987行)播放。如果不是那麼至少火花不應該傳播目標表,因為它是不大於spark.sql提供。autoBroadcastJoinThreshold設置,我們既不提供任何廣播提示。

我試過的東西:

我搜索了這篇文章https://learn.microsoft.com/en-us/azure/databricks/kb/sql/bchashjoin-exceeds-bcjointhreshold-oom。它提供了兩個解決方案,

  1. 運行“分析表……”(but since we are reading target table from path and not from a table this is not possible)
  2. 緩存表你廣播,DeltaTable緩存表沒有任何規定,所以不能這麼做。

我認為這是因為我們使用DeltaTable.forPath閱讀目標表和火花()方法是無法計算目標表指標。所以我也嚐試了不同的方法,

數據集<行> sourceDf = sparkSession .readStream () .format .option(“δ”)(“inferSchema”,“真正的”).load(路徑);數據集<行> targetDf = sparkSession .read () .format .option(“δ”)(“inferSchema”,“真正的”).load(定位路徑);sourceDf.createOrReplaceTempView (“vtempview”);targetDf.createOrReplaceTempView (“vtemptarget”);targetDf.cache ();StreamingQuery平方= sparkSession。sql (“select * from vtempview”) .writeStream () .format(“δ”).foreachBatch ((microDf, id) - > {microDf.createOrReplaceTempView (“vtempmicrodf”);microDf.sparkSession ()。sql(“合並成vtemptarget t使用vtempmicrodf s t。SALE_ID =。SALE_ID當匹配不匹配時更新設置*然後插入*”); }) .outputMode("update") .option("checkpointLocation", checkpointPath) .trigger(Trigger.Once()) .start().awaitTermination();

在上麵的片段我也緩存targetDf這火花可以計算指標,而不是廣泛的目標表。但它仍然沒有幫助,引發廣泛的投射。

現在我的選擇。誰能給我一些指導嗎?

0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map