用戶定義的聚合函數- Scala
本文包含一個UDAF以及如何注冊它使用Apache火花SQL。看到用戶定義的聚合函數(UDAFs)為更多的細節。
實現一個UserDefinedAggregateFunction
進口org。apache。火花。sql。表達式。MutableAggregationBuffer進口org。apache。火花。sql。表達式。UserDefinedAggregateFunction進口org。apache。火花。sql。行進口org。apache。火花。sql。類型。_類GeometricMean擴展UserDefinedAggregateFunction{/ /這是聚合函數的輸入字段。覆蓋definputSchema:org。apache。火花。sql。類型。StructType=StructType(StructField(“價值”,倍增式)::零)/ /這是你保持的內部字段計算你的總。覆蓋defbufferSchema:StructType=StructType(StructField(“數”,LongType)::StructField(“產品”,倍增式)::零)/ /這是你aggregatation函數的輸出類型。覆蓋def數據類型:數據類型=倍增式覆蓋def確定的:布爾=真正的/ /這是緩衝模式的初始值。覆蓋def初始化(緩衝:MutableAggregationBuffer):單位={緩衝(0)=0 l緩衝(1)=1.0}/ /這是如何更新給定一個輸入緩衝模式。覆蓋def更新(緩衝:MutableAggregationBuffer,輸入:行):單位={緩衝(0)=緩衝。木屐(長)(0)+1緩衝(1)=緩衝。木屐(雙)(1)*輸入。木屐(雙)(0)}/ /這是如何與bufferSchema合並兩個對象類型。覆蓋def合並(buffer1:MutableAggregationBuffer,buffer2:行):單位={buffer1(0)=buffer1。木屐(長)(0)+buffer2。木屐(長)(0)buffer1(1)=buffer1。木屐(雙)(1)*buffer2。木屐(雙)(1)}/ /這是你輸出最終的價值,給你bufferSchema的最終值。覆蓋def評估(緩衝:行):任何={數學。戰俘(緩衝。用(1),1。toDouble/緩衝。getLong(0))}}
用你UDAF
/ /創建一個DataFrame並引發SQL表進口org。apache。火花。sql。功能。_瓦爾id=火花。範圍(1,20.)id。createOrReplaceTempView(“id”)瓦爾df=火花。sql(“選擇id, id % 3從ids group_id”)df。createOrReplaceTempView(“簡單”)
——使用UDAF group_by聲明和調用。選擇group_id,通用汽車(id)從簡單的集團通過group_id
/ /或者使用DataFrame語法調用聚合函數。/ /創建一個實例的UDAF GeometricMean。瓦爾通用汽車=新GeometricMean/ /顯示的列值的幾何平均數“id”。df。groupBy(“group_id”)。gg(通用汽車(上校(“id”))。作為(“GeometricMean”))。顯示()/ /調用UDAF由其指定的名字。df。groupBy(“group_id”)。gg(expr(“通用汽車(id) GeometricMean”))。顯示()