你好,
我練習磚樣的筆記本發表:
https://github.com/databricks-academy/advanced-data-engineering-with-databricks
在一個筆記本(正麵3.1 -在線重複數據刪除)(URL),有一個示例代碼刪除重複的記錄而流數據。
我有一些問題,感謝你的幫助。我複製下麵的代碼的主要部分:
從pyspark。sql導入函數F json_schema =“device_id長時間的時間戳,心率雙“deduped_df =(火花。readStream .table(青銅).filter(“主題= bpm”) .select (F.from_json (F.col .cast(“價值”)(“字符串”),json_schema) .alias (“v”)) .select (“*”) .withWatermark(“時間”,“30秒”).dropDuplicates ([“device_id”、“時間”]))sql_query = " "合並成heart_rate_silver使用stream_updates b a.device_id = b.device_id a.time = b。時候不匹配插入*”“”類Upsert: def __init__(自我、sql_query update_temp =“stream_updates”):自我。sql_query = sql_query自我。update_temp = update_temp def upsert_to_delta(自我,microBatchDF,批處理):microBatchDF.createOrReplaceTempView (self.update_temp) microBatchDF._jdf.sparkSession () . sql (self.sql_query) streaming_merge = = (deduped_df Upsert (sql_query)查詢。writeStream .foreachBatch (streaming_merge.upsert_to_delta) #運行查詢每一批.outputMode .option(“更新”)(“checkpointLocation”, f“{DA.paths.checkpoints} /錄音”).trigger (availableNow = True) .start ()) query.awaitTermination ()
Q1)的原因是什麼定義類“插入”和使用方法“foreachBatch”?
Q2)如果我不使用“foreachBatch”?
方法“dropDuplicates ([“device_id”、“時間”])”刪除重複閱讀記錄。是不是足以確保沒有重複的記錄?
第三季度)的方法“upsert_to_delta”類“插入”有兩個輸入參數(microBatchDF,批處理)。但是,當我們叫它在以下行:
.foreachBatch (streaming_merge.upsert_to_delta)
,我們不通過它的參數。它是如何得到的值(microBatchDF,批處理)?
謝謝你的時間閱讀我的問題。