如果你正在尋找一個優雅的停止(不停止在5但停止後在進步的micro-batch 5點而不是突然停止流),你可以試試下麵的。缺點是如果微批時間高,流停止將推遲。
進口java.time。LocalTime val queryStopListner = new StreamingQueryListener(){覆蓋def onQueryStarted (queryStarted: StreamingQueryListener.QueryStartedEvent):單位={}覆蓋def onQueryTerminated (queryTerminated: StreamingQueryListener.QueryTerminatedEvent):單位={}覆蓋def onQueryProgress (queryProgress: StreamingQueryListener.QueryProgressEvent):單位= {val id = queryProgress.progress。id如果(LocalTime.now () .isAfter (LocalTime.parse (“17:00:00”))) {val currentStreamingQuery = spark.streams.get currentStreamingQuery (id)。停止}}}/ /這個查詢listner添加到會話spark.streams.addListener (queryStopListner)