pyspark.sql.DataFrame.observe

DataFrame。 觀察 ( 觀察:聯盟(觀察,str],*exprs:pyspark.sql.column.Column )→DataFrame

定義(命名)DataFrame指標觀察。這個方法返回一個“觀察”DataFrame返回相同的結果作為輸入,用以下保證:

  • 它將計算定義的聚合(指標)的所有數據流經

    數據集。

  • 它將報告的價值定義聚合列一旦我們完成

    點。年底完成點可以是一個查詢(批處理模式)或一個流媒體時代的終結。的價值總量隻反映了數據處理,因為前麵的完成點。

指標列必須包含一個文本(如點燃(42)),或者應該包含一個或多個聚合函數(例如sum (a)或(a + b) +和avg (c) -點燃(1))。表達式包含輸入數據集引用的列必須包裝在一個聚合函數。

用戶可以通過添加Python的觀察這些指標StreamingQueryListener,Scala / Javaorg.apache.spark.sql.streaming.StreamingQueryListener或Scala / Java的org.apache.spark.sql.util.QueryExecutionListener火花會話。

參數
觀察 觀察或str

str指定的名字,或一個觀察實例獲取指標。

添加支持str在這個參數。

exprs

列表達式()。

返回
DataFrame

觀察到的DataFrame

筆記

觀察觀察,這個方法隻支持批處理查詢。當觀察是一個字符串,該方法適用於批處理和流媒體查詢。目前還不支持連續執行。

例子

觀察觀察下麵,隻有批量查詢工作。

> > >pyspark.sql.functions進口上校,,點燃,馬克斯> > >pyspark.sql進口觀察> > >觀察=觀察(“我的指標”)> > >observed_df=df觀察(觀察,(點燃(1))別名(“數”),馬克斯(上校(“年齡”)))> > >observed_df()2> > >觀察得到{“計數”:2,“馬克斯(年齡)”:5}

觀察是一個字符串,也流查詢工作如下。

> > >pyspark.sql.streaming進口StreamingQueryListener> > >MyErrorListener(StreamingQueryListener):defonQueryStarted(自我,事件):通過defonQueryProgress(自我,事件):=事件進步observedMetrics得到(“my_event”)#觸發如果錯誤的數量超過5%num_rows=鋼筋混凝土num_error_rows=倫理委員會=num_error_rows/num_rows如果>0.05:#觸發警報通過defonQueryTerminated(自我,事件):通過> > >火花addListener(MyErrorListener())> > >#觀察行數(rc)和錯誤行數(erc)的流數據集observed_ds=df觀察(“my_event”,(點燃(1))別名(“鋼筋混凝土”),(上校(“錯誤”))別名(“倫理委員會”))> > >observed_dswriteStream格式(“控製台”)開始()