我在k卡夫卡timeseries數據主題。我想這些數據讀入的窗戶長度10分鍾。對於每一個窗口,我想運行N SQL查詢和實現結果。特定的N查詢運行取決於卡夫卡主題名稱。我該如何解決上述問題?
問題:
——而不是在第一個例子中使用算作聚合函數的代碼,有辦法獲得數據集<行>為每個窗口,這樣我可以寫我的邏輯並實現數據。
——有不同的SQL查詢配置為每個卡夫卡的話題。
- - -
——RelationalGroupedDataset kafkaDfStreamingWithWindow =
kafkaDfStreaming.groupBy (
functions.window (functions.col (KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP),“60秒”、“60秒”))。
KeyValueGroupedDataset。flatMapGroups函數存在,它可能類似於我在尋找什麼,是否有辦法將RelationalGroupedDataset KeyValueGroupedDataset。端到端實現的示例代碼。
——我思考這個問題的正確方法嗎?有解決這個問題的更好的辦法嗎?
任何示例代碼或直接將是有益的。
我已經嚐試(原始代碼)
DataStreamReader kafkaDfStreaming = sparkSession .readStream () .format .option (“kafka.bootstrap(“卡夫卡”)。服務器”、“http://localhost: 9092”) .option .option (“includeTimestamp”、“true”) (“startingOffsets”,“最早”).option (“endingOffset”、“最新”).option .option (“includeHeaders”、“true”) (“subscribePattern”、“HelloWorld . *”);數據集<行> streamingWindow = kafkaDfStreaming .groupBy (functions.window (functions.col(“時間戳”)、“600秒”,“600秒”),functions.col(“主題)).count () .select(“窗口。開始”、“窗口。結束”、“主題”、“數”);streamingWindow .writeStream () .format(“控製台”).trigger(觸發。ProcessingTime(600秒)).outputMode (OutputMode.Update ()) .start () .awaitTermination ();
上麵的示例代碼和它打印的行數/主題/窗口。
我試過和沒有工作:
/ /問題代碼/ / kafkaDfStreamingWithWindow.df()不讓訪問整個df和df對每個窗口。數據集<行> kafkaDfStreaming = sparkSession .readStream () .format .option (“kafka.bootstrap(“卡夫卡”)。服務器”、“http://localhost: 9092”) .option .option (“includeTimestamp”、“true”) (“startingOffsets”,“最早”).option (“endingOffset”、“最新”).option .option (“includeHeaders”、“true”) (“subscribePattern”、“HelloWorld . *”);RelationalGroupedDataset kafkaDfStreamingWithWindow = kafkaDfStreaming。groupBy (functions.window (functions.col(“時間戳”)、“600秒”,“600秒”),functions.col(“主題”));.writeStream DataStreamWriter <行> kafkaStreaming = kafkaDfStreamingWithWindow.df () ();DataStreamWriter <行> afterProcessingSparkStream = kafkaStreaming。foreachBatch(新VoidFunction2 <數據集<行>,長> (){@Override公共空調用(數據集<行> kafkaDf,長batchId){/ / /拋出異常處理代碼來實現數據的數據庫。它可以覆蓋數據。}}); StreamingQuery query = afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start(); query.awaitTermination();