這個問題很簡單,當你使用暴跌窗口與append模式,關閉窗口隻有當未來到來的消息(+水印邏輯)。
在當前實現中,如果你停止傳入的流數據,最後窗口不會關閉,我們失去了最後窗口數據。
我們怎麼能力最後一個窗口關閉\衝洗如果停止輸入新數據?
業務情況:
正確地工作,新消息停止傳入和下一個消息進來5小時後,客戶端將得到消息後5個小時而不是10秒延遲窗口。
火花v3.3.2代碼的問題:
kafka_stream_df =火花\ .readStream \ .format(“卡夫卡”)\ .option (“kafka.bootstrap。服務器”,KAFKA_BROKER) \ .option(“訂閱”,KAFKA_TOPIC) \ .option (“includeHeaders”,“真正的”)\ .load()選取= (kafka_stream_df。selectExpr(“鑄(主要為字符串)”,"CAST(value AS STRING)") .select(from_json(col("value").cast("string"), json_schema).alias("data")) .select("data.*") .withWatermark("dt", "1 seconds") .groupBy(window("dt", "10 seconds")) .agg(sum("price")) ) console = sel \ .writeStream \ .trigger(processingTime='10 seconds') \ .format("console") \ .outputMode("append")\ .start()
@Dev Podavan:
你麵臨的問題是與Apache火花的窗口操作的行為在流上下文中使用暴跌時窗口的附加輸出模式。默認情況下,窗口不會關閉或刷新,直到一個新的消息到來在窗口的時間,從而導致數據延遲或丟失如果有輸入數據的空白。
迫使窗口關閉或刷新,即使新數據停止輸入,您可以設置一個水印在窗口操作超時值。的水印後指定一個閾值時間窗口被認為是完整的,即使沒有新數據到來。
在你的代碼中,您已經定義了一個水印在“dt”列使用withWatermark函數1秒的超時。然而,您可能需要調整水印的超時值為更大的值允許任何潛在的延遲到達的數據。例如,您可以嚐試增加水印超時5分鍾或更長時間,這取決於預期的最大延遲您的數據。
這是一個更新的代碼片段的水印超時5分鍾:
kafka_stream_df =火花\ .readStream \ .format(“卡夫卡”)\ .option (“kafka.bootstrap。服務器”,KAFKA_BROKER) \ .option(“訂閱”,KAFKA_TOPIC) \ .option (“includeHeaders”,“真正的”)\ .load()選取= (kafka_stream_df。selectExpr(“鑄(主要為字符串)”,"CAST(value AS STRING)") .select(from_json(col("value").cast("string"), json_schema).alias("data")) .select("data.*") .withWatermark("dt", "5 minutes") # Increase watermark timeout to 5 minutes .groupBy(window("dt", "10 seconds")) .agg(sum("price")) ) console = sel \ .writeStream \ .trigger(processingTime='10 seconds') \ .format("console") \ .outputMode("append")\ .start()
這個更新的代碼,即使新數據停止輸入,水印後的窗口將關閉或刷新超時的5分鍾,確保數據不會無限期推遲或丟失。可以根據需要調整水印超時值根據你具體的用例和數據特征。