監控流查詢結構化數據磚

磚為結構化的流媒體應用程序提供了內置的監測下的火花UI流媒體選項卡。

區分結構化流查詢在UI的火花

通過添加提供流獨特的查詢名稱.queryName(<查詢名稱>)到你的writeStream代碼很容易區分指標所屬流的火花UI。

推動結構化流度量外部服務

流指標可以被推到外部服務提醒或儀表盤用例流查詢使用Apache火花的偵聽器接口。在磚運行時的11.0及以上,流查詢偵聽器可以在Python和Scala。

重要的

憑證和對象由目錄不能用於統一管理StreamingQueryListener邏輯。

請注意

處理延遲與聽眾可能會影響查詢處理。磚在這些聽眾和寫作建議最小化處理邏輯低延遲沉如卡夫卡。

下麵的代碼提供了基本的語法實現偵聽器的例子:

進口orgapache火花sql流媒體StreamingQueryListener進口orgapache火花sql流媒體StreamingQueryListener_瓦爾myListener=StreamingQueryListener{/ * **在啟動時調用查詢。* @note這叫做同步* [[org.apache.spark.sql.streaming。DataStreamWriter DataStreamWriter.start ()]]。*“onQueryStart”號召所有聽眾*’DataStreamWriter.start()的返回相應的[[StreamingQuery]]。*不阻止這種方法,因為它會阻礙你的查詢。* /defonQueryStarted(事件:QueryStartedEvent):單位={}/ * **有狀態更新時調用(攝入率更新等)** @note這個方法是異步的。在[[StreamingQuery]]返回狀態*最新狀態,無論何時調用此方法。[[StreamingQuery]]的狀態*可能會改變之前或當你處理事件。例如,您可能會發現[[StreamingQuery]]*終止在處理“QueryProgressEvent”。* /defonQueryProgress(事件:QueryProgressEvent):單位={}/ * **當停止查詢,有或沒有錯誤。* /defonQueryTerminated(事件:QueryTerminatedEvent):單位={}}
MyListener(StreamingQueryListener):defonQueryStarted(自我,事件):”“”啟動時調用查詢。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryStartedEvent”可用的屬性是一樣的Scala API。筆記- - - - - -這就是所謂的同步甲:“pyspark.sql.streaming.DataStreamWriter.start”,即“onQueryStart“將呼籲所有聽眾“DataStreamWriter.start() ' '返回對應的類:“pyspark.sql.streaming.StreamingQuery”。不阻塞在這個方法,因為它會阻止您的查詢。”“”通過defonQueryProgress(自我,事件):”“”當有一些狀態更新(攝入率更新等)。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryProgressEvent”可用的屬性是一樣的Scala API。筆記- - - - - -這個方法是異步的。的狀態:類:“pyspark.sql.streaming。StreamingQuery”返回最近的狀態,無論何時調用此方法。狀態:類的:“pyspark.sql.streaming.StreamingQuery”。或者當你處理事件之前可能會改變。例如,你可能會發現:類:“StreamingQuery”終止在處理“QueryProgressEvent”。”“”通過defonQueryTerminated(自我,事件):”“”當停止查詢,有或沒有錯誤。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryTerminatedEvent”可用的屬性是一樣的Scala API。”“”通過my_listener=MyListener()

在結構化流定義可觀測的指標

可觀測的指標是名為任意聚合函數,可以定義在一個查詢(DataFrame)。一旦執行DataFrame達到完成點(即完成一批查詢或達到一個流媒體時代),命名事件包含的指標數據處理自上次完成點。

你可以觀察這些指標通過附加一個偵聽器火花會話。偵聽器依賴於執行模式:

  • 批處理模式:使用QueryExecutionListener

    QueryExecutionListener查詢完成時被調用。訪問指標使用QueryExecution.observedMetrics地圖。

  • 流,或者micro-batch:使用StreamingQueryListener

    StreamingQueryListener被稱為流查詢完成後一個時代。訪問指標使用StreamingQueryProgress.observedMetrics地圖。磚不支持連續執行流。

例如:

/ /觀察行數(rc)和錯誤行數(erc)的流數據集瓦爾observed_ds=ds觀察(“my_event”,(點燃(1))。作為(“鋼筋混凝土”),(美元“錯誤”)。作為(“倫理委員會”))observed_dswriteStream格式(“…”)。開始()/ /使用偵聽器監控指標火花addListener(StreamingQueryListener(){覆蓋defonQueryProgress(事件:QueryProgressEvent):單位={事件進步observedMetrics得到(“my_event”)。foreach{= >/ /觸發如果錯誤的數量超過5%瓦爾num_rows=木屐()(“鋼筋混凝土”)瓦爾num_error_rows=木屐()(“倫理委員會”)瓦爾=num_error_rowstoDouble/num_rows如果(>0.05){/ /觸發警報}}}})
#觀察指標observed_df=df觀察(“指標”,(點燃(1))作為(“問”),(上校(“錯誤”))作為(“畸形”))observed_dfwriteStream格式(“…”)開始()#定義我的聽眾。MyListener(StreamingQueryListener):defonQueryStarted(自我,事件):打印(f“‘{事件的名字}”({事件id})開始!”)defonQueryProgress(自我,事件):=事件進步observedMetrics得到(“指標”)如果沒有一個:如果畸形的/>0.5:打印(“警告!哎喲!有太多的畸形”f“記錄{畸形的}{}!”)其他的:打印(f{}行處理!”)defonQueryTerminated(自我,事件):打印(f{事件id}終止了!”)#添加我的偵聽器。火花addListener(MyListener())