我想做一個流合並使用本指南-δ表之間https://docs.delta.io/latest/delta-update.html upsert-from-streaming-queries-using-foreachbatch
我們的代碼示例(Java):
數據集<行> sourceDf = sparkSession .readStream () .format .option(“δ”)(“inferSchema”,“真正的”).load(路徑);DeltaTable DeltaTable = DeltaTable。forPath (sparkSession定位路徑);sourceDf.createOrReplaceTempView (“vTempView”);StreamingQuery平方=火花()。sql (“select * from vTempView”) .writeStream () .format(“δ”).foreachBatch ((microDf, id) - > {deltaTable.alias (e) .merge (microDf.alias (“d”)、“e。.updateAll SALE_ID = d.SALE_ID ") .whenMatched () () .whenNotMatched () .insertAll () . execute ();}).outputMode .option(“更新”)(“checkpointLocation”, checkpointPath) .trigger (Trigger.Once ()) .start () .awaitTermination ();
源路徑和目標路徑已經在使用檢查點文件夾同步。約有800萬行數據總計約450 mb的鑲花的文件。
新數據時在源路徑(比方說987行),然後試圖執行BroadCastHashJoin火花,廣泛投目標表8 m行。
這裏的DAG片段合並操作(與表1 m行),
預期:
我在等更小的數據集(我。艾凡:987行)播放。如果不是那麼至少火花不應該傳播目標表,因為它是不大於spark.sql提供。autoBroadcastJoinThreshold設置,我們既不提供任何廣播提示。
我試過的東西:
我搜索了這篇文章https://learn.microsoft.com/en-us/azure/databricks/kb/sql/bchashjoin-exceeds-bcjointhreshold-oom。它提供了兩個解決方案,
我認為這是因為我們使用DeltaTable.forPath閱讀目標表和火花()方法是無法計算目標表指標。所以我也嚐試了不同的方法,
數據集<行> sourceDf = sparkSession .readStream () .format .option(“δ”)(“inferSchema”,“真正的”).load(路徑);數據集<行> targetDf = sparkSession .read () .format .option(“δ”)(“inferSchema”,“真正的”).load(定位路徑);sourceDf.createOrReplaceTempView (“vtempview”);targetDf.createOrReplaceTempView (“vtemptarget”);targetDf.cache ();StreamingQuery平方= sparkSession。sql (“select * from vtempview”) .writeStream () .format(“δ”).foreachBatch ((microDf, id) - > {microDf.createOrReplaceTempView (“vtempmicrodf”);microDf.sparkSession ()。sql(“合並成vtemptarget t使用vtempmicrodf s t。SALE_ID =。SALE_ID當匹配不匹配時更新設置*然後插入*”); }) .outputMode("update") .option("checkpointLocation", checkpointPath) .trigger(Trigger.Once()) .start().awaitTermination();
在上麵的片段我也緩存targetDf這火花可以計算指標,而不是廣泛的目標表。但它仍然沒有幫助,引發廣泛的投射。
現在我的選擇。誰能給我一些指導嗎?