pyspark.sql.DataFrame.observe¶
-
DataFrame。
觀察
( 觀察:聯盟(觀察,str],*exprs:pyspark.sql.column.Column )→DataFrame¶ -
定義(命名)DataFrame指標觀察。這個方法返回一個“觀察”DataFrame返回相同的結果作為輸入,用以下保證:
-
- 它將計算定義的聚合(指標)的所有數據流經
-
數據集。
-
- 它將報告的價值定義聚合列一旦我們完成
-
點。年底完成點可以是一個查詢(批處理模式)或一個流媒體時代的終結。的價值總量隻反映了數據處理,因為前麵的完成點。
指標列必須包含一個文本(如點燃(42)),或者應該包含一個或多個聚合函數(例如sum (a)或(a + b) +和avg (c) -點燃(1))。表達式包含輸入數據集引用的列必須包裝在一個聚合函數。
用戶可以通過添加Python的觀察這些指標
StreamingQueryListener
,Scala / Javaorg.apache.spark.sql.streaming.StreamingQueryListener
或Scala / Java的org.apache.spark.sql.util.QueryExecutionListener
火花會話。筆記
當
觀察
是觀察
,這個方法隻支持批處理查詢。當觀察
是一個字符串,該方法適用於批處理和流媒體查詢。目前還不支持連續執行。例子
當
觀察
是觀察
下麵,隻有批量查詢工作。> > >從pyspark.sql.functions進口上校,數,點燃,馬克斯> > >從pyspark.sql進口觀察> > >觀察=觀察(“我的指標”)> > >observed_df=df。觀察(觀察,數(點燃(1))。別名(“數”),馬克斯(上校(“年齡”)))> > >observed_df。數()2> > >觀察。得到{“計數”:2,“馬克斯(年齡)”:5}
當
觀察
是一個字符串,也流查詢工作如下。> > >從pyspark.sql.streaming進口StreamingQueryListener> > >類MyErrorListener(StreamingQueryListener):…defonQueryStarted(自我,事件):…通過……defonQueryProgress(自我,事件):…行=事件。進步。observedMetrics。得到(“my_event”)…#觸發如果錯誤的數量超過5%…num_rows=行。鋼筋混凝土…num_error_rows=行。倫理委員會…比=num_error_rows/num_rows…如果比>0.05:…#觸發警報…通過……defonQueryTerminated(自我,事件):…通過…> > >火花。流。addListener(MyErrorListener())> > >#觀察行數(rc)和錯誤行數(erc)的流數據集…observed_ds=df。觀察(…“my_event”,…數(點燃(1))。別名(“鋼筋混凝土”),…數(上校(“錯誤”))。別名(“倫理委員會”))> > >observed_ds。writeStream。格式(“控製台”)。開始()
-