優化狀態結構化流查詢
管理有狀態的中間狀態信息結構化流查詢可以幫助防止意想不到的延遲和生產問題。
使用多個狀態運營商結構化流
在磚運行時的13.1及以上,磚提供先進的支持結構化流狀態運營商工作負載。現在可以把多個狀態操作串在一起,這意味著你可以養活一個操作的輸出作為另一個窗口的聚合狀態連接等操作。
您可以使用以下的示例將演示幾個模式。
重要的
存在以下局限性在處理多個狀態運營商:
FlatMapGroupWithState
不支持。隻支持附加的輸出模式。
鏈接時間窗聚合
單詞=…#流DataFrame模式{時間戳:時間戳,詞:字符串}#組數據窗口和詞,計算每組的數量windowedCounts=單詞。groupBy(窗口(單詞。時間戳,“十分鍾”,“5分鍾”),單詞。詞)。數()#組由另一個窗口,窗口的數據的話,計算每組的數量anotherWindowedCounts=windowedCounts。groupBy(窗口(window_time(windowedCounts。窗口),“1小時”),windowedCounts。詞)。數()
進口火花。值得一提的。_瓦爾單詞=…/ /流DataFrame模式{時間戳:時間戳,詞:字符串}/ /組數據窗口和詞,計算每組的數量瓦爾windowedCounts=單詞。groupBy(窗口(美元“時間戳”,“十分鍾”,“5分鍾”),美元“單詞”)。數()/ /組由另一個窗口,窗口的數據的話,計算每組的數量瓦爾anotherWindowedCounts=windowedCounts。groupBy(窗口(美元“窗口”,“1小時”),美元“單詞”)。數()
時間窗口聚集在兩個不同的流緊隨其後加入stream-stream窗口
clicksWindow=clicksWithWatermark。groupBy(clicksWithWatermark。clickAdId,窗口(clicksWithWatermark。clickTime,“1小時”))。數()impressionsWindow=impressionsWithWatermark。groupBy(impressionsWithWatermark。impressionAdId,窗口(impressionsWithWatermark。impressionTime,“1小時”))。數()clicksWindow。加入(impressionsWindow,“窗口”,“內心”)
瓦爾clicksWindow=clicksWithWatermark。groupBy(窗口(“clickTime”,“1小時”))。數()瓦爾impressionsWindow=impressionsWithWatermark。groupBy(窗口(“impressionTime”,“1小時”))。數()clicksWindow。加入(impressionsWindow,“窗口”,“內心”)
Stream-stream時間間隔加入時間窗聚合緊隨其後
加入=impressionsWithWatermark。加入(clicksWithWatermark,expr(”“”clickAdId = impressionAdId和clickTime > = impressionTime和clickTime < = impressionTime +間隔1小時”“”),“leftOuter”#可以“內心”、“leftOuter”、“rightOuter”,“fullOuter”、“leftSemi”)加入。groupBy(加入。clickAdId,窗口(加入。clickTime,“1小時”))。數()
瓦爾加入=impressionsWithWatermark。加入(clicksWithWatermark,expr(”“”clickAdId = impressionAdId和clickTime > = impressionTime和clickTime < = impressionTime +間隔1小時”“”),joinType=“leftOuter”/ /可以“內心”、“leftOuter”、“rightOuter”,“fullOuter”、“leftSemi”)加入。groupBy(美元“clickAdId”,窗口(美元“clickTime”,“1小時”))。數()
阻止減緩垃圾收集(GC)暫停狀態流
如果你有狀態操作流查詢(如流聚合)和你想維護數以百萬計的鍵的狀態,那麼你可能會麵臨相關問題大型JVM的垃圾收集(GC)暫停。這將導致高micro-batch處理時間的變化。這是因為你的JVM的內存維護默認狀態數據。有大量的狀態對象會壓迫你的JVM內存,導致高GC暫停。
在這種情況下,您可以選擇使用一個更優化的基於狀態管理解決方案RocksDB。這個解決方案可以在磚運行時。JVM內存而不是保持狀態,這個解決方案使用RocksDB本機內存中有效地管理國家和當地的SSD(例如類型與當地SSD)。此外,任何更改到這個狀態是由結構化流到檢查點位置自動保存你已經提供,因此提供完整的容錯擔保(默認狀態管理)一樣。為指令配置RocksDB國家商店,看到的配置RocksDB狀態存儲在磚。
推薦配置有狀態結構化流磚
磚的建議:
使用compute-optimized實例作為工人。例如,穀歌雲n1-highcpu-32實例。
調整分區的數量設置為1 - 2倍數量的集群中的核心。
設置
spark.sql.streaming.noDataMicroBatches.enabled
配置假
SparkSession。這可以防止流micro-batch引擎處理micro-batches不包含數據。還請注意,設置此配置假
可能導致有狀態操作,利用水印或處理時間超時沒有數據輸出到新數據到達,而不是立即。
關於性能優勢,RocksDB-based狀態管理可以保持100倍比默認狀態鍵。例如,在一個火花集群與穀歌雲n1-highcpu-32實例作為工人,默認狀態管理可以保持1 - 2百萬狀態鍵/執行器之後,JVM GC開始顯著地影響性能。相比之下,RocksDB-based狀態管理可以很容易地保持1億狀態鍵/ GC執行人沒有任何問題。
請注意
狀態管理方案無法改變之間查詢重啟。也就是說,如果一個查詢已經開始使用默認管理,那麼它不能改變沒有從頭開始查詢新的檢查點位置。