取消
顯示的結果
而不是尋找
你的意思是:

如何提供PySpark UPSERT條件

康斯坦丁
貢獻者三世

我有一個表“demo_table_one”我想插入以下值

data =[(11111年,“CA”,“2020-01-26”),(11111年,“CA”,“2020-02-26”),(88888年,“CA”,“2020-06-10”),(88888年,“CA”,“2020-05-10”),(88888年,“佤邦”,“2020-07-10”),(88888年,“佤邦”,“2020-07-15”),(55555年,“佤邦”,“2020-05-15”),(55555年,“CA”,“2020-03-15”),]列= [‘attom_id’,‘state_code’,‘sell_date] df =火花。createDataFrame(數據列)

每個attom_id & state_code的邏輯是,我們隻需要最新的sell_date

所以在我的表的數據

“CA”[(11111年,“2020-02-26”),(88888年,“CA”,“2020-06-10”),(88888年,“佤邦”,“2020-07-15”),(55555年,“CA”, ' 2020-03-15 '))

我有下麵的代碼

從三角洲。表導入DeltaTable DeltaTable = DeltaTable。forName(火花,“demo_table_one”) #執行插入(deltaTable.alias (“orginal_table”) .merge (df.alias (“update_table”)、“orginal_table。state_code = update_table。state_code orginal_table。attom_id = update_table.attom_id”) .whenNotMatchedInsertAll () .whenMatchedUpdateAll (“orginal_table。sell_date < update_table.sell_date”) . execute ())

但這插入表中所有的值

1接受解決方案

接受的解決方案

werners1
尊敬的貢獻者三世

@John康斯坦丁,根據文檔,whenMatched可以有一個可選的條件。

所以我不立即看到這裏的問題。也許whenMatched條件永遠不會真的因為某些原因?

在原帖子查看解決方案

5回複5

Hubert_Dudek1
尊敬的貢獻者三世

它不會有目的地在第一次插入的數據,因此,它將執行.whenNotMatchedInsertAll()為每一個記錄。同樣,當兩個新記錄到達一次(使用相同的id和狀態)在接下來的插入,插入兩個。當然,你需要的是聚合數據之前插入(attom_id,‘state_code’,馬克斯(“sell_date”)。

我不能這樣做在PySpark

deltaTable.as (“orginal_table”) .merge (df.as (“update_table”)、“orginal_table。state_code = update_table。state_code orginal_table。attom_id .whenMatched (“orginal_table = update_table.attom_id”)。.whenNotMatched sell_date < update_table.sell_date”) .updateAll () () .insertAll () . execute ()

werners1
尊敬的貢獻者三世

@John康斯坦丁,根據文檔,whenMatched可以有一個可選的條件。

所以我不立即看到這裏的問題。也許whenMatched條件永遠不會真的因為某些原因?

Hubert_Dudek1
尊敬的貢獻者三世

另外@John康斯坦丁,你能分享demo_table_one數據是什麼?我們隻有df(別名update_table)的例子

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map