我已經使用三角洲住表流事件和我有一個原始表的所有事件和下遊總表。我需要添加新的聚合到下遊表總列數。但我沒有發現任何配方談論這個。
我的代碼類似於下麵這個。
@dlt.table ()
def my_raw_table ():
回報(
spark.readStream.format \ (“cloudFiles”)
.option (“cloudFiles。形式at", "parquet") \
.option (“recursiveFileLookup”,“真正的”)\
.load (dirs)
)
def my_aggregate_table ():
回報(
dlt.read (“my_raw_table”)
.groupBy (“col_id”)
.agg (max (col_a)分鍾(col_a)) .alias (col_aggr))
)
我需要做的就是添加col_aggr my_aggregate_table my_raw_table新的聚合值。
的疾病預防控製中心與達美住表文檔似乎隻有更新新值,但沒有提供一種方法來增加總col_aggr新的聚合值。
我會像在三角洲地區的生活表類似於三角洲湖能做些什麼表如下:
my_aggregate_table.alias \ (“aggr”)
.merge (
my_raw_table.alias(“更新”),
“aggr。col_id = updates.col_id”
)
.whenMatchedUpdate(設置=
{
:“col_id updates.col_id”,
:“col_aggr col_aggr”+“updates.col_aggr”
}
)
.whenNotMatchedInsert(值=
{
:“col_id updates.col_id”,
:“col_aggr updates.col_aggr”
}
). execute ()
有什麼方法我可以實現這個三角洲住表嗎?還是我誤解了三角洲地區的生活方式表總表的工作嗎?