取消
顯示的結果
而不是尋找
你的意思是:

我怎麼更新一個聚合表使用一個三角洲住表嗎

珍妮花
新的貢獻者三世

我已經使用三角洲住表流事件和我有一個原始表的所有事件和下遊總表。我需要添加新的聚合到下遊表總列數。但我沒有發現任何配方談論這個。

我的代碼類似於下麵這個。

@dlt.table ()

def my_raw_table ():

回報(

spark.readStream.format \ (“cloudFiles”)

.option (“cloudFiles。形式at", "parquet") \

.option (“recursiveFileLookup”,“真正的”)\

.load (dirs)

)

def my_aggregate_table ():

回報(

dlt.read (“my_raw_table”)

.groupBy (“col_id”)

.agg (max (col_a)分鍾(col_a)) .alias (col_aggr))

)

我需要做的就是添加col_aggr my_aggregate_table my_raw_table新的聚合值。

疾病預防控製中心與達美住表文檔似乎隻有更新新值,但沒有提供一種方法來增加總col_aggr新的聚合值。

我會像在三角洲地區的生活表類似於三角洲湖能做些什麼表如下:

my_aggregate_table.alias \ (“aggr”)

.merge (

my_raw_table.alias(“更新”),

“aggr。col_id = updates.col_id”

)

.whenMatchedUpdate(設置=

{

:“col_id updates.col_id”,

:“col_aggr col_aggr”+“updates.col_aggr”

}

)

.whenNotMatchedInsert(值=

{

:“col_id updates.col_id”,

:“col_aggr updates.col_aggr”

}

). execute ()

有什麼方法我可以實現這個三角洲住表嗎?還是我誤解了三角洲地區的生活方式表總表的工作嗎?

1回複1

珍妮花
新的貢獻者三世

也許我的代碼是正確的已經因為我使用dlt.read (“my_raw_table”)而不是delta.read_stream (“my_raw_table”)。所以每次更新my_raw_table col_aggr完全是重新計算。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map