抽樣
DataFrames之前,您將使用
RDD.groupBy ()
對數據進行分組。這種方法非常昂貴,需要一個完整的重組的所有數據,確保所有記錄相同的關鍵最終引發工人節點。
相反,您應該使用
RDD.aggregate ()
,PairRDDFunctions.groupByKey ()
,或PairRDDFunctions.reduceByKey ()
如果你分組等聚合數據的目的sum ()
或count ()
。
注意:您需要
進口org.apache.spark.rdd.PairRDDFunctions
DataFrames
目前,所有DataFrame分組操作假定你分組的目的聚合數據。
如果你想組其他原因(不常見),你需要引用底層抽樣如下:
sessionsDF.rdd.groupBy (…)
否則,DataFrame
groupBy ()
方法返回一個GroupedData實例如下:進口org.apache.spark.sql.GroupedDataval sessionsDF = Seq((“俊”、“user1”,“session1”, 100.0),(“俊”、“user1”中“session2”, 200.0),
(“day2”、“user1”、“session3”, 300.0), (“day2”、“user1”,“session4”, 400.0),(“day2”、“user1”、“session4”, 99.0)) .toDF(“日”,“用戶名”,“sessionId”,“purchaseTotal”)val groupedSessions: GroupedData = sessionsDF。groupBy(“日”,“標識”)
目前,沒什麼可以做的
GroupedData
除了添加實例聚合如下:進口org.apache.spark.sql.functions._groupedSessionsDF = groupedSessions。gg(“天”,“標識”,美元countDistinct (“sessionId”),和(“purchaseTotal”))
還有一個方便的gg()方法需要的地圖“列”“聚合型”如下:
val groupedSessionsDF = groupedSessions。gg(“天”,“標識”,美元“sessionId”- >“數”、“purchaseTotal”- >“金額”)
這種便利方法目前不支持的完整列表聚合,所以你可能需要恢複到上方列出的slightly-less-convenient方法之類的
countDistrinct ()
和sumDistrinct ()
等
SQL
SQL / HiveQL要求您指定一個聚合函數如下:
% sql SELECT天,userId, sessionId purchaseTotal test_df_group_by集團,userId錯誤的SQL語句:. lang。RuntimeException: org.apache.spark.sql。AnalysisException:表達“purchaseTotal”既不是出現在集團,也不是一個聚合函數。添加到組或包裝在第一()如果你不在乎哪個值。org.apache.spark.sql.catalyst.analysis.CheckAnalysis class.failAnalysis美元(CheckAnalysis.scala: 38) org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis (Analyzer.scala: 40)
這是一種有效的版本……
% sql SELECT天,userId,計數(sessionId)作為session_count,總和(purchaseTotal)從test_df_group_by purchase_total集團,userId
你總是想要使用DataFrames或SQL api在低級抽樣。尤其是麵向應用,你會看到一個巨大的性能提升通過使用這些高級庫將在這些層性能優化。
你好,我知道我發布到一個古老的線程,但我的問題是從來沒有那麼熱
我想總在windows和計算一些聚合(avg + stddev),希望除了也有訪問原始行基本上把我的兩個聚集到現有windows ....知道如何做到這一點嗎?
這是郵局堆棧溢出(downvoted無論什麼原因)但是隨時答案. .
@cfregly