取消
顯示的結果
而不是尋找
你的意思是:

使用foreachBatch三角洲住表框架內

diguid
新的貢獻者三世

嘿!

我想知道如果有任何聲明一個三角洲的生活方式表,我們使用foreachBatch處理查詢的輸出流。

這裏有一個簡化我的代碼:

def join_data (df_1 df_2): df_joined = (df_1 .withWatermark (“timestamp_1”、“30秒”). join (df_2 .withWatermark (“timestamp_2”,“10秒”)= f。expr (“”“df_1。id = df_2。id和timestamp_2 > = timestamp_1 -間隔24小時和timestamp_2 < = timestamp_1”“”),如何= "左")返回df_joined def foreachbatch_func (df_micro_batch batchId): (df_micro_batch .withColumn (rn, f.row_number () .over(窗口.partitionBy (partition_by_cols) .orderBy (order_by_cols))) .filter (f.col (rn) = = 1) .drop (rn)) #隻有δ表中插入如果不是已經(DeltaTable .forPath(火花,mypath) .alias(“表”).merge (df_micro_batch.alias (“current_batch”), f.expr (“myexpr”)) .whenNotMatchedInsertAll () . execute ()) df = (join_data (df_1 df_2) .writeStream .format(“δ”).foreachBatch (foreachbatch_func) .outputMode(“追加”).start ())

因為多個聚合是不允許在流媒體查詢,我需要foreachBatch調用來執行重複數據刪除在我微批並找出哪些記錄已經被寫入δ表,這樣我不重新插入。

這種方法的問題在於foreachBatch DataStreamWriter的一種方法

對象,所以我認為我不能叫它不叫writeStream第一,但與此同時,我認為我不能叫writeStream當定義一個DLT,所以會很感激一些有助於理解如果有一種方法在這裏!

謝謝提前:slightly_smiling_face:

1回複1

JJ_LVS1
新的貢獻者三世

我隻是經曆這也需要micro-batch操作。不能看到這將與DLT現在我轉回結構化流。我希望他們添加此功能否則限製DLT更基本的流。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map