嘿!
我想知道如果有任何聲明一個三角洲的生活方式表,我們使用foreachBatch處理查詢的輸出流。
這裏有一個簡化我的代碼:
def join_data (df_1 df_2): df_joined = (df_1 .withWatermark (“timestamp_1”、“30秒”). join (df_2 .withWatermark (“timestamp_2”,“10秒”)= f。expr (“”“df_1。id = df_2。id和timestamp_2 > = timestamp_1 -間隔24小時和timestamp_2 < = timestamp_1”“”),如何= "左")返回df_joined def foreachbatch_func (df_micro_batch batchId): (df_micro_batch .withColumn (rn, f.row_number () .over(窗口.partitionBy (partition_by_cols) .orderBy (order_by_cols))) .filter (f.col (rn) = = 1) .drop (rn)) #隻有δ表中插入如果不是已經(DeltaTable .forPath(火花,mypath) .alias(“表”).merge (df_micro_batch.alias (“current_batch”), f.expr (“myexpr”)) .whenNotMatchedInsertAll () . execute ()) df = (join_data (df_1 df_2) .writeStream .format(“δ”).foreachBatch (foreachbatch_func) .outputMode(“追加”).start ())
因為多個聚合是不允許在流媒體查詢,我需要foreachBatch調用來執行重複數據刪除在我微批並找出哪些記錄已經被寫入δ表,這樣我不重新插入。
這種方法的問題在於foreachBatch DataStreamWriter的一種方法
對象,所以我認為我不能叫它不叫writeStream第一,但與此同時,我認為我不能叫writeStream當定義一個DLT,所以會很感激一些有助於理解如果有一種方法在這裏!
謝謝提前