我看過一個類似的問題使用flatMapGroupsWithState大型國有。有可能是a)他們不使用狀態。setTimeout正確或b)他們不調用state.remove()當存儲狀態超時,離開國家增長的規模。
def groupedStateFunc(關鍵:字符串,記錄:迭代器(一個),狀態:GroupState [B]):迭代器[C] = ={如果(state.hasTimedOut) {val結果狀態。getOption匹配{…}state.remove() / /如果這不是其他國家將泄漏結果}= {val結果狀態。{getOption匹配情況下一些(lastState) = > / /合並新記錄和做一些情況沒有= > / /第一州-創建初始StateStore記錄}state.update (processingFunc(記錄))/ /如果超時沒有設定一個明確的在未來(或如果該值太大),那麼記錄不會超時和國家能長state.setTimeoutTimestamp (timestampWhenStateWillExpire)迭代器。空}}
如果不是這樣的話,那麼它也可以隨著時間的表示一個問題。StateStore將跟蹤每個microbatch的水印指標使用時間戳(時代毫秒)可接受的界限(min / avg / max水印閾值)。這裏有可能調查可以幫助。