取消
顯示的結果
而不是尋找
你的意思是:

關於“foreachBatch”流數據時刪除重複的記錄

麥當娜
重視貢獻二世

你好,

我練習磚樣的筆記本發表:

https://github.com/databricks-academy/advanced-data-engineering-with-databricks

在一個筆記本(正麵3.1 -在線重複數據刪除)(URL),有一個示例代碼刪除重複的記錄而流數據。

我有一些問題,感謝你的幫助。我複製下麵的代碼的主要部分:

從pyspark。sql導入函數F json_schema =“device_id長時間的時間戳,心率雙“deduped_df =(火花。readStream .table(青銅).filter(“主題= bpm”) .select (F.from_json (F.col .cast(“價值”)(“字符串”),json_schema) .alias (“v”)) .select (“*”) .withWatermark(“時間”,“30秒”).dropDuplicates ([“device_id”、“時間”]))sql_query = " "合並成heart_rate_silver使用stream_updates b a.device_id = b.device_id a.time = b。時候不匹配插入*”“”類Upsert: def __init__(自我、sql_query update_temp =“stream_updates”):自我。sql_query = sql_query自我。update_temp = update_temp def upsert_to_delta(自我,microBatchDF,批處理):microBatchDF.createOrReplaceTempView (self.update_temp) microBatchDF._jdf.sparkSession () . sql (self.sql_query) streaming_merge = = (deduped_df Upsert (sql_query)查詢。writeStream .foreachBatch (streaming_merge.upsert_to_delta) #運行查詢每一批.outputMode .option(“更新”)(“checkpointLocation”, f“{DA.paths.checkpoints} /錄音”).trigger (availableNow = True) .start ()) query.awaitTermination ()

Q1)的原因是什麼定義類“插入”和使用方法“foreachBatch”?

Q2)如果我不使用“foreachBatch”?

方法“dropDuplicates ([“device_id”、“時間”])”刪除重複閱讀記錄。是不是足以確保沒有重複的記錄?

第三季度)的方法“upsert_to_delta”類“插入”有兩個輸入參數(microBatchDF,批處理)。但是,當我們叫它在以下行:

.foreachBatch (streaming_merge.upsert_to_delta)

,我們不通過它的參數。它是如何得到的值(microBatchDF,批處理)?

謝謝你的時間閱讀我的問題。

2回答2

Vidula_Khanna
主持人
主持人

嗨@Mohammad軍刀

很高興認識你,謝謝你的問題!

看看你的同行在社區中有一個回答你的問題。否則bricksters能早日回到你身邊。

謝謝

謝謝你的消息。我還沒有找到這個問題的答案。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map