插入表使用合並成三角洲湖

您可以插入數據從源表,視圖,或者DataFrame目標三角洲表使用合並SQL操作。三角洲湖支持插入、更新和刪除合並,它支持擴展語法之外的SQL標準促進先進的用例。

假設您有一個源表命名people10mupdates或一個源路徑/ tmp /δ/ people-10m-updates,包含一個目標表命名的新數據people10m或一個目標路徑/ tmp /δ/ people-10m。其中一些新記錄可能已經出現在目標數據。合並的新數據,你想更新人的行id已經存在並插入新行,不匹配id是禮物。您可以運行下麵的查詢:

合並people10m使用people10mupdatespeople10mid=people10mupdatesid匹配然後更新id=people10mupdatesid,firstName=people10mupdatesfirstName,middleName=people10mupdatesmiddleName,=people10mupdates,性別=people10mupdates性別,生日=people10mupdates生日,ssn=people10mupdatesssn,工資=people10mupdates工資匹配然後插入(id,firstName,middleName,,性別,生日,ssn,工資)(people10mupdatesid,people10mupdatesfirstName,people10mupdatesmiddleName,people10mupdates,people10mupdates性別,people10mupdates生日,people10mupdatesssn,people10mupdates工資)
delta.tables進口*deltaTablePeople=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)deltaTablePeopleUpdates=DeltaTableforPath(火花,“/ tmp /δ/ people-10m-updates”)dfUpdates=deltaTablePeopleUpdatestoDF()deltaTablePeople別名(“人”)\合並(dfUpdates別名(“更新”),”的人。id=更新年代。id')\whenMatchedUpdate(={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\whenNotMatchedInsert(={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\執行()
進口ioδ_進口orgapache火花sql功能_瓦爾deltaTablePeople=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)瓦爾deltaTablePeopleUpdates=DeltaTableforPath(火花,“tmp /δ/ people-10m-updates”)瓦爾dfUpdates=deltaTablePeopleUpdatestoDF()deltaTablePeople作為(“人”)合並(dfUpdates作為(“更新”),”的人。id=更新年代。id")whenMatchedupdateExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))whenNotMatchedinsertExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))執行()

看到三角洲湖API文檔Scala和Python語法細節。SQL語法細節,請參閱合並成

使用合並修改所有無與倫比的行

在磚SQL和磚的運行時12.1及以上,可以使用匹配通過條款更新orgydF4y2Ba刪除記錄目標表中沒有對應的源表中的記錄。磚建議添加一個可選的條件條款,以避免完全重寫目標表。

下麵的代碼示例顯示了基本語法使用這個刪除,覆蓋目標表與源表的內容和刪除目標表中的無與倫比的記錄。更可伸縮模式表,更新和刪除源是有時限的,看到的增量同步三角洲與源表

(targetDF合並(sourceDF,”源。關鍵=目標。關鍵")whenMatchedUpdateAll()whenNotMatchedInsertAll()whenNotMatchedBySourceDelete()執行())
targetDF合並(sourceDF,”源。關鍵=目標。關鍵")whenMatched()updateAll()whenNotMatched()insertAll()whenNotMatchedBySource()刪除()執行()
合並目標使用關鍵=目標關鍵匹配然後更新*匹配然後插入*匹配通過然後刪除

下麵的例子將條件添加到匹配通過條款和指定值更新在無與倫比的目標行。

(targetDF合並(sourceDF,”源。關鍵=目標。關鍵")whenMatchedUpdate(={“target.lastSeen”:“source.timestamp”})whenNotMatchedInsert(={“target.key”:“source.key”,“target.lastSeen”:“source.timestamp”,“target.status”:“活躍”})whenNotMatchedBySourceUpdate(條件=”目標。lastSeen> =(當前日期()- - - - - -時間間隔“5”一天)",={“target.status”:“不活躍”})執行())
targetDF合並(sourceDF,”源。關鍵=目標。關鍵")whenMatched()updateExpr(地圖(“target.lastSeen”- >“source.timestamp”))whenNotMatched()insertExpr(地圖(“target.key”- >“source.key”,“target.lastSeen”- >“source.timestamp”,“target.status”- >“活躍”,))whenNotMatchedBySource(”目標。lastSeen> =(當前日期()- - - - - -時間間隔“5”一天)")updateExpr(地圖(“target.status”- >“不活躍”))執行()
合並目標使用關鍵=目標關鍵匹配然後更新目標lastSeen=時間戳匹配然後插入(關鍵,lastSeen,狀態)(關鍵,時間戳,“活躍”)匹配通過目標lastSeen> =(當前日期()- - - - - -時間間隔“5”一天)然後更新目標狀態=“不活躍”

合並操作語義

下麵是詳細的描述合並編程操作語義。

  • 可以有任意數量的whenMatchedwhenNotMatched條款。

  • whenMatched條款執行時源行匹配目標表行根據匹配條件。這些條款有以下語義。

    • whenMatched條款最多隻能有一個更新和一個刪除行動。的更新行動合並僅更新(類似於指定的列更新操作)匹配的目標行。的刪除刪除匹配的行。

    • 每一個whenMatched條款可以有一個可選的條件。如果這一條款條件存在,更新orgydF4y2Ba刪除行動執行任何匹配的源-目標行隻有當條款條件為真。

    • 如果有多個whenMatched條款,然後他們評估的順序指定。所有whenMatched條款,除了最後一個,必須有條件。

    • 如果沒有一個whenMatched條件評估為true的源和目標匹配的行對合並條件,那麼目標行左不變。

    • 更新目標三角洲表的所有列的相應列源數據集,使用whenMatched (…) .updateAll ()。這相當於:

      whenMatched(…)。updateExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))

      對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。

      請注意

      這種行為變化時自動啟用模式遷移。看到自動模式演化獲取詳細信息。

  • whenNotMatched條款執行時源行不匹配任何行基於目標匹配條件。這些條款有以下語義。

    • whenNotMatched條款可以隻有插入行動。生成新行根據指定的列和相應的表達式。你不需要指定目標表中的所有列。對於未指定的目標列,被插入。

    • 每一個whenNotMatched條款可以有一個可選的條件。如果條款條件存在,源行插入隻有當條件為真行。否則,源列將被忽略。

    • 如果有多個whenNotMatched條款,然後他們評估的順序指定。所有whenNotMatched條款,除了最後一個,必須有條件。

    • 插入目標三角洲表的所有列的相應列源數據集,使用whenNotMatched (…) .insertAll ()。這相當於:

      whenNotMatched(…)。insertExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))

      對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。

      請注意

      這種行為變化時自動啟用模式遷移。看到自動模式演化獲取詳細信息。

  • whenNotMatchedBySource條款執行當目標行不匹配任何源行基於合並條件。這些條款有以下語義。

    • whenNotMatchedBySource條款可以指定刪除更新行動。

    • 每一個whenNotMatchedBySource條款可以有一個可選的條件。如果條款條件存在,目標行修改隻對這一行,如果條件為真。否則,目標行是不變的。

    • 如果有多個whenNotMatchedBySource條款,然後他們評估的順序指定。所有whenNotMatchedBySource條款,除了最後一個,必須有條件。

    • 根據定義,whenNotMatchedBySource條款沒有將列值從一個源行,所以不能引用源列。對於每一列要修改,您可以指定一個文字或目標列上執行操作,如target.deleted_count=target.deleted_count+1

重要的

  • 一個合並源數據集的操作就會失敗,如果多行匹配和合並嚐試更新相同的目標三角洲表行。根據SQL合並的語義,等更新操作是模棱兩可的尚不清楚應該使用哪個源行更新匹配的目標行。源表可以進行預處理來消除多個匹配的可能性。看到變化數據捕獲的例子——顯示了如何預處理改變數據集(即源數據集)隻保留最新的改變對於每個關鍵申請前三角洲到目標表。

  • 你可以申請一個SQL合並操作在一個SQL視圖隻有視圖已被定義為創建視圖viewName作為選擇*deltaTable

重複數據刪除在編寫到三角洲表中

常見的ETL的用例是收集日誌到三角洲表通過添加一個表。然而,通常來源可以生成重複的日誌記錄和下遊重複數據刪除步驟需要照顧他們。與合並,你可以避免插入重複的記錄。

合並日誌使用newDedupedLogs日誌uniqueId=newDedupedLogsuniqueId匹配然後插入*
deltaTable別名(“日誌”)合並(newDedupedLogs別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)\whenNotMatchedInsertAll()\執行()
deltaTable作為(“日誌”)合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)whenNotMatched()insertAll()執行()
deltaTable作為(“日誌”)合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)whenNotMatched()insertAll()執行();

請注意

包含新日誌的數據集需要刪除處理內部本身。通過SQL合並的語義,它匹配和刪除處理新數據與現有的數據表中,但是如果有新的數據集內重複數據,插入。因此,在合並之前刪除處理新的數據表。

如果你知道你可能會重複的記錄隻有幾天,您可以優化您的查詢進一步通過分區表的日期,然後指定目標表的日期範圍相匹配。

合並日誌使用newDedupedLogs日誌uniqueId=newDedupedLogsuniqueId日誌日期>當前日期()- - - - - -時間間隔7匹配newDedupedLogs日期>當前日期()- - - - - -時間間隔7然後插入*
deltaTable別名(“日誌”)合並(newDedupedLogs別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")\whenNotMatchedInsertAll(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")\執行()
deltaTable作為(“日誌”)。合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")insertAll()執行()
deltaTable作為(“日誌”)。合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")insertAll()執行();

這是更有效的比前麵的命令作為副本看起來隻有在過去的7天的日誌,而不是整個表。此外,您可以使用此純插入合並結構化流執行連續重複數據刪除的日誌。

  • 在流媒體查詢,您可以使用合並操作foreachBatch不斷編寫任何流數據與重複數據刪除三角洲表。看到下麵的流的例子的更多信息foreachBatch

  • 在另一個流媒體查詢,你可以不斷地從δ讀取刪除處理數據表。這是有可能的,因為一個純插入合並隻有δ表添加新數據。

慢慢地改變數據(SCD) 2型操作到三角洲表中

三角洲住表原生支持跟蹤和應用SCD 2型。看到改變數據獲取與三角洲生活表

更改數據寫入一個增量表

類似的化合物,另一個常見的用例,通常被稱為變化數據捕獲(CDC),是適用於所有數據更改來自外部數據庫到三角洲表。換句話說,一組更新,刪除,插入應用到一個外部表需要應用於三角洲表。您可以使用合並如下。

瓦爾deltaTable:DeltaTable=/ / DeltaTable模式(關鍵字,值)/ / DataFrame變化後列/ /——關鍵:關鍵的變化/ / -時間:時間之間的變化順序變化(可以取代其他順序id)/ / - newValue:更新或插入的值如果鍵不刪除/ /刪除:真如果鍵被刪除,假如果鍵插入或更新瓦爾changesDF:DataFrame=/ /找到最新的改變基於時間戳的每個鍵/ /注意:嵌套結構,馬克斯在結構計算/ / max第一結構體字段,如果回到第二個字段,等等。瓦爾latestChangeForEachKey=changesDFselectExpr(“關鍵”,“結構(時間、newValue刪除)otherCols”)groupBy(“關鍵”)gg(馬克斯(“otherCols”)。作為(“最新”))selectExpr(“關鍵”,“最新*”。)deltaTable作為(“t”)合並(latestChangeForEachKey作為(“s”),”年代。關鍵= t.key”)whenMatched(“s.deleted = true”)刪除()whenMatched()updateExpr(地圖(“關鍵”- >“s.key”,“價值”- >“s.newValue”))whenNotMatched(“s.deleted = false”)insertExpr(地圖(“關鍵”- >“s.key”,“價值”- >“s.newValue”))執行()
deltaTable=# DeltaTable模式(關鍵字,值)# DataFrame變化後列#鍵:關鍵的變化#——時間:時間之間的變化順序變化(可以取代其他順序id)# - newValue:更新或插入的值如果鍵不刪除# -刪除:真如果鍵被刪除,假如果鍵插入或更新changesDF=火花(“改變”)#找到最新的改變基於時間戳的每個鍵#注意:嵌套結構,馬克斯在結構計算#馬克斯第一結構體字段,如果回到第二個字段,等等。latestChangeForEachKey=changesDF\selectExpr(“關鍵”,“結構(時間、newValue刪除)otherCols”)\groupBy(“關鍵”)\gg(馬克斯(“otherCols”)別名(“最新”))\選擇(“關鍵”,“最新*”。)\deltaTable別名(“t”)合並(latestChangeForEachKey別名(“s”),”年代。關鍵= t.key”)\whenMatchedDelete(條件=“s.deleted = true”)\whenMatchedUpdate(={“關鍵”:“s.key”,“價值”:“s.newValue”})\whenNotMatchedInsert(條件=“s.deleted = false”,={“關鍵”:“s.key”,“價值”:“s.newValue”})執行()

筆記本的例子:寫變化數據與合並

下麵的筆記本演示了使用三角洲湖合並寫修改表數據捕獲數據三角洲。

使用合並筆記本寫更改數據

在新標簽頁打開筆記本

增量同步三角洲與源表

在磚SQL和磚的運行時12.1及以上,您可以使用匹配通過創建任意條件自動刪除和替換表的一部分。這是特別有用,當你有一個源表記錄可能會改變或被刪除後幾天初始數據輸入,但最終解決最終狀態。

下麵的查詢顯示了使用這種模式從源選擇5天的記錄,更新目標匹配記錄,插入新記錄從源到目標,並刪除所有無與倫比的記錄從過去5天的目標。

合並目標作為t使用(選擇*在哪裏created_at> =(當前日期()- - - - - -時間間隔“5”一天))作為年代t關鍵=年代關鍵匹配然後更新*匹配然後插入*匹配通過created_at> =(當前日期()- - - - - -時間間隔“5”一天)然後刪除

通過提供相同的布爾濾源表和目標表,你可以動態地傳播變化從源到目標表,包括刪除。

請注意

雖然這種模式可以使用沒有任何條件條款,這將導致完全重寫目標表可以是昂貴的。