我有一個表“demo_table_one”我想插入以下值
data =[(11111年,“CA”,“2020-01-26”),(11111年,“CA”,“2020-02-26”),(88888年,“CA”,“2020-06-10”),(88888年,“CA”,“2020-05-10”),(88888年,“佤邦”,“2020-07-10”),(88888年,“佤邦”,“2020-07-15”),(55555年,“佤邦”,“2020-05-15”),(55555年,“CA”,“2020-03-15”),]列= [‘attom_id’,‘state_code’,‘sell_date] df =火花。createDataFrame(數據列)
每個attom_id & state_code的邏輯是,我們隻需要最新的sell_date
所以在我的表的數據
“CA”[(11111年,“2020-02-26”),(88888年,“CA”,“2020-06-10”),(88888年,“佤邦”,“2020-07-15”),(55555年,“CA”, ' 2020-03-15 '))
我有下麵的代碼
從三角洲。表導入DeltaTable DeltaTable = DeltaTable。forName(火花,“demo_table_one”) #執行插入(deltaTable.alias (“orginal_table”) .merge (df.alias (“update_table”)、“orginal_table。state_code = update_table。state_code orginal_table。attom_id = update_table.attom_id”) .whenNotMatchedInsertAll () .whenMatchedUpdateAll (“orginal_table。sell_date < update_table.sell_date”) . execute ())
但這插入表中所有的值
我不能這樣做在PySpark
deltaTable.as (“orginal_table”) .merge (df.as (“update_table”)、“orginal_table。state_code = update_table。state_code orginal_table。attom_id .whenMatched (“orginal_table = update_table.attom_id”)。.whenNotMatched sell_date < update_table.sell_date”) .updateAll () () .insertAll () . execute ()