取消
顯示的結果
而不是尋找
你的意思是:

火花結構化流:如何運行N查詢每個窗口

VivekBhalgat
新的貢獻者二世

我在k卡夫卡timeseries數據主題。我想這些數據讀入的窗戶長度10分鍾。對於每一個窗口,我想運行N SQL查詢和實現結果。特定的N查詢運行取決於卡夫卡主題名稱。我該如何解決上述問題?

問題:

——而不是在第一個例子中使用算作聚合函數的代碼,有辦法獲得數據集<行>為每個窗口,這樣我可以寫我的邏輯並實現數據。

——有不同的SQL查詢配置為每個卡夫卡的話題。

- - -

——RelationalGroupedDataset kafkaDfStreamingWithWindow =

kafkaDfStreaming.groupBy (

functions.window (functions.col (KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP),“60秒”、“60秒”))。

KeyValueGroupedDataset。flatMapGroups函數存在,它可能類似於我在尋找什麼,是否有辦法將RelationalGroupedDataset KeyValueGroupedDataset。端到端實現的示例代碼。

——我思考這個問題的正確方法嗎?有解決這個問題的更好的辦法嗎?

任何示例代碼或直接將是有益的。

我已經嚐試(原始代碼)

DataStreamReader kafkaDfStreaming = sparkSession .readStream () .format .option (“kafka.bootstrap(“卡夫卡”)。服務器”、“http://localhost: 9092”) .option .option (“includeTimestamp”、“true”) (“startingOffsets”,“最早”).option (“endingOffset”、“最新”).option .option (“includeHeaders”、“true”) (“subscribePattern”、“HelloWorld . *”);數據集<行> streamingWindow = kafkaDfStreaming .groupBy (functions.window (functions.col(“時間戳”)、“600秒”,“600秒”),functions.col(“主題)).count () .select(“窗口。開始”、“窗口。結束”、“主題”、“數”);streamingWindow .writeStream () .format(“控製台”).trigger(觸發。ProcessingTime(600秒)).outputMode (OutputMode.Update ()) .start () .awaitTermination ();

上麵的示例代碼和它打印的行數/主題/窗口。

我試過和沒有工作:

/ /問題代碼/ / kafkaDfStreamingWithWindow.df()不讓訪問整個df和df對每個窗口。數據集<行> kafkaDfStreaming = sparkSession .readStream () .format .option (“kafka.bootstrap(“卡夫卡”)。服務器”、“http://localhost: 9092”) .option .option (“includeTimestamp”、“true”) (“startingOffsets”,“最早”).option (“endingOffset”、“最新”).option .option (“includeHeaders”、“true”) (“subscribePattern”、“HelloWorld . *”);RelationalGroupedDataset kafkaDfStreamingWithWindow = kafkaDfStreaming。groupBy (functions.window (functions.col(“時間戳”)、“600秒”,“600秒”),functions.col(“主題”));.writeStream DataStreamWriter <行> kafkaStreaming = kafkaDfStreamingWithWindow.df () ();DataStreamWriter <行> afterProcessingSparkStream = kafkaStreaming。foreachBatch(新VoidFunction2 <數據集<行>,長> (){@Override公共空調用(數據集<行> kafkaDf,長batchId){/ / /拋出異常處理代碼來實現數據的數據庫。它可以覆蓋數據。}}); StreamingQuery query = afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start(); query.awaitTermination();

0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map