開始
加載和管理數據
處理數據
政府
引用和資源
2023年8月3日更新
給我們反饋
磚為結構化的流媒體應用程序提供了內置的監測下的火花UI流媒體選項卡。
通過添加提供流獨特的查詢名稱.queryName(<查詢名稱>)到你的writeStream代碼很容易區分指標所屬流的火花UI。
.queryName(<查詢名稱>)
writeStream
流指標可以被推到外部服務提醒或儀表盤用例流查詢使用Apache火花的偵聽器接口。在磚運行時的11.0及以上,流查詢偵聽器可以在Python和Scala。
請注意
處理延遲與聽眾可能會影響查詢處理。磚在這些聽眾和寫作建議最小化處理邏輯低延遲沉如卡夫卡。
下麵的代碼提供了基本的語法實現偵聽器的例子:
進口org。apache。火花。sql。流媒體。StreamingQueryListener進口org。apache。火花。sql。流媒體。StreamingQueryListener。_瓦爾myListener=新StreamingQueryListener{/ * **在啟動時調用查詢。* @note這叫做同步* [[org.apache.spark.sql.streaming。DataStreamWriter DataStreamWriter.start ()]]。*“onQueryStart”號召所有聽眾*’DataStreamWriter.start()的返回相應的[[StreamingQuery]]。*不阻止這種方法,因為它會阻礙你的查詢。* /defonQueryStarted(事件:QueryStartedEvent):單位={}/ * **有狀態更新時調用(攝入率更新等)** @note這個方法是異步的。在[[StreamingQuery]]返回狀態*最新狀態,無論何時調用此方法。[[StreamingQuery]]的狀態*可能會改變之前或當你處理事件。例如,您可能會發現[[StreamingQuery]]*終止在處理“QueryProgressEvent”。* /defonQueryProgress(事件:QueryProgressEvent):單位={}/ * **當停止查詢,有或沒有錯誤。* /defonQueryTerminated(事件:QueryTerminatedEvent):單位={}}
類MyListener(StreamingQueryListener):defonQueryStarted(自我,事件):”“”啟動時調用查詢。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryStartedEvent”可用的屬性是一樣的Scala API。筆記- - - - - -這就是所謂的同步甲:“pyspark.sql.streaming.DataStreamWriter.start”,即“onQueryStart“將呼籲所有聽眾“DataStreamWriter.start() ' '返回對應的類:“pyspark.sql.streaming.StreamingQuery”。不阻塞在這個方法,因為它會阻止您的查詢。”“”通過defonQueryProgress(自我,事件):”“”當有一些狀態更新(攝入率更新等)。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryProgressEvent”可用的屬性是一樣的Scala API。筆記- - - - - -這個方法是異步的。的狀態:類:“pyspark.sql.streaming。StreamingQuery”返回最近的狀態,無論何時調用此方法。狀態:類的:“pyspark.sql.streaming.StreamingQuery”。或者當你處理事件之前可能會改變。例如,你可能會發現:類:“StreamingQuery”終止在處理“QueryProgressEvent”。”“”通過defonQueryTerminated(自我,事件):”“”當停止查詢,有或沒有錯誤。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryTerminatedEvent”可用的屬性是一樣的Scala API。”“”通過my_listener=MyListener()
可觀測的指標是名為任意聚合函數,可以定義在一個查詢(DataFrame)。一旦執行DataFrame達到完成點(即完成一批查詢或達到一個流媒體時代),命名事件包含的指標數據處理自上次完成點。
你可以觀察這些指標通過附加一個偵聽器火花會話。偵聽器依賴於執行模式:
批處理模式:使用QueryExecutionListener。
QueryExecutionListener
QueryExecutionListener查詢完成時被調用。訪問指標使用QueryExecution.observedMetrics地圖。
QueryExecution.observedMetrics
流,或者micro-batch:使用StreamingQueryListener。
StreamingQueryListener
StreamingQueryListener被稱為流查詢完成後一個時代。訪問指標使用StreamingQueryProgress.observedMetrics地圖。磚不支持連續執行流。
StreamingQueryProgress.observedMetrics
例如:
/ /觀察行數(rc)和錯誤行數(erc)的流數據集瓦爾observed_ds=ds。觀察(“my_event”,數(點燃(1))。作為(“鋼筋混凝土”),數(美元“錯誤”)。作為(“倫理委員會”))observed_ds。writeStream。格式(“…”)。開始()/ /使用偵聽器監控指標火花。流。addListener(新StreamingQueryListener(){覆蓋defonQueryProgress(事件:QueryProgressEvent):單位={事件。進步。observedMetrics。得到(“my_event”)。foreach{行= >/ /觸發如果錯誤的數量超過5%瓦爾num_rows=行。木屐(長)(“鋼筋混凝土”)瓦爾num_error_rows=行。木屐(長)(“倫理委員會”)瓦爾比=num_error_rows。toDouble/num_rows如果(比>0.05){/ /觸發警報}}}})
#觀察指標observed_df=df。觀察(“指標”,數(點燃(1))。作為(“問”),數(上校(“錯誤”))。作為(“畸形”))observed_df。writeStream。格式(“…”)。開始()#定義我的聽眾。類MyListener(StreamingQueryListener):defonQueryStarted(自我,事件):打印(f“‘{事件。的名字}”({事件。id})開始!”)defonQueryProgress(自我,事件):行=事件。進步。observedMetrics。得到(“指標”)如果行是不沒有一個:如果行。畸形的/行。問>0.5:打印(“警告!哎喲!有太多的畸形”f“記錄{行。畸形的}的{行。問}!”)其他的:打印(f”{行。問}行處理!”)defonQueryTerminated(自我,事件):打印(f”{事件。id}終止了!”)#添加我的偵聽器。火花。流。addListener(MyListener())