表刪除、更新和合並<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#table-deletes-updates-and-merges" title="">

三角洲湖支持多個語句來促進三角洲表中刪除的數據和更新數據。

概述和示範三角洲湖的刪除和更新數據,看這個YouTube視頻(54分鍾)。

額外的信息捕獲變更數據從三角洲湖,看這個YouTube視頻(53分鍾)。

刪除從表<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#delete-from-a-table" title="">

您可以刪除數據匹配謂詞從三角洲表。例如,在一個表命名people10m或一個路徑/ tmp /δ/ people-10m刪除所有行對應於人的價值生日之前的列1955年,您可以運行下麵的:

刪除people10m在哪裏生日<“1955-01-01”刪除δ' /tmp/δ/- - - - - -10在哪裏生日<“1955-01-01”

請注意

Python API有磚運行時的6.1及以上。

delta.tables進口*pyspark.sql.functions進口*deltaTable=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)#聲明使用SQL-formatted字符串謂詞。deltaTable刪除(“生日< 1955-01-01”)#使用謂詞的火花SQL函數聲明。deltaTable刪除(上校(“生日”)<“1960-01-01”)

請注意

Scala API有磚運行時的6.0及以上。

進口ioδ_瓦爾deltaTable=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)/ /聲明使用SQL-formatted字符串謂詞。deltaTable刪除(“生日< 1955-01-01”)進口orgapache火花sql功能_進口火花值得一提的_/ /聲明使用火花值得一提的是SQL函數和謂詞。deltaTable刪除(上校(“生日”)<“1955-01-01”)

請注意

磚的Java API可用運行時6.0及以上。

進口io.delta.tables。*;進口org.apache.spark.sql.functions;DeltaTabledeltaTable=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”);/ /聲明使用SQL-formatted字符串謂詞。deltaTable刪除(“生日< 1955-01-01”);/ /聲明謂詞使用火花SQL函數。deltaTable刪除(功能上校(“生日”)。lt(功能點燃(“1955-01-01”)));

看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-apidoc.html">三角洲湖api獲取詳細信息。

重要的

刪除從三角洲的最新版本中刪除數據表,但不刪除它從物理存儲到舊版本明確真空的。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-utility.html">真空獲取詳細信息。

提示

在可能的情況下,提供一個分區的分區列上的謂詞三角洲表這樣謂詞可以顯著加快操作。

更新一個表<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#update-a-table" title="">

你可以更新數據相匹配的謂詞在三角洲表。例如,在一個表命名people10m或一個路徑/ tmp /δ/ people-10m,改變的縮寫性別列從orgydF4y2BaF男性orgydF4y2Ba,您可以運行下麵的:

更新people10m性別=“女”在哪裏性別=“F”;更新people10m性別=“男”在哪裏性別=“米”;更新δ' /tmp/δ/- - - - - -10性別=“女”在哪裏性別=“F”;更新δ' /tmp/δ/- - - - - -10性別=“男”在哪裏性別=“米”;

請注意

Python API有磚運行時的6.1及以上。

delta.tables進口*pyspark.sql.functions進口*deltaTable=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)#聲明使用SQL-formatted字符串謂詞。deltaTable更新(條件=“性別= ' F '”,={“性別”:“女”})#使用謂詞的火花SQL函數聲明。deltaTable更新(條件=上校(“性別”)= =“米”,={“性別”:點燃(“男”)})

請注意

Scala API有磚運行時的6.0及以上。

進口ioδ_瓦爾deltaTable=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)/ /聲明使用SQL-formatted字符串謂詞。deltaTableupdateExpr(“性別= ' F '”,地圖(“性別”- >“女”)進口orgapache火花sql功能_進口火花值得一提的_/ /聲明使用火花值得一提的是SQL函數和謂詞。deltaTable更新(上校(“性別”)= = =“M”,地圖(“性別”- >點燃(“男性”)));

請注意

Scala API有磚運行時的6.0及以上。

進口io.delta.tables。*;進口org.apache.spark.sql.functions;進口java.util.HashMap;DeltaTabledeltaTable=DeltaTableforPath(火花,“/數據/事件/”);/ /聲明使用SQL-formatted字符串謂詞。deltaTableupdateExpr(“性別= ' F '”,HashMap<字符串,字符串>(){{(“性別”,“女”);}});/ /聲明謂詞使用火花SQL函數。deltaTable更新(功能上校(性別)。情商(“M”),HashMap<字符串,>(){{(“性別”,功能點燃(“男性”));}});

看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-apidoc.html">三角洲湖api獲取詳細信息。

提示

類似於刪除,更新操作可以顯著加速分區上的謂詞。

使用合並插入到表中<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#upsert-into-a-table-using-merge" title="">

您可以插入數據從源表,視圖,或者DataFrame目標三角洲表使用合並SQL操作。三角洲湖支持插入、更新和刪除合並,它支持擴展語法之外的SQL標準促進先進的用例。

假設您有一個源表命名people10mupdates或一個源路徑/ tmp /δ/ people-10m-updates,包含一個目標表命名的新數據people10m或一個目標路徑/ tmp /δ/ people-10m。其中一些新記錄可能已經出現在目標數據。合並的新數據,你想更新人的行id已經存在並插入新行,不匹配id是禮物。你可以運行以下:

合並people10m使用people10mupdatespeople10mid=people10mupdatesid匹配然後更新id=people10mupdatesid,firstName=people10mupdatesfirstName,middleName=people10mupdatesmiddleName,=people10mupdates,性別=people10mupdates性別,生日=people10mupdates生日,ssn=people10mupdatesssn,工資=people10mupdates工資匹配然後插入(id,firstName,middleName,,性別,生日,ssn,工資)(people10mupdatesid,people10mupdatesfirstName,people10mupdatesmiddleName,people10mupdates,people10mupdates性別,people10mupdates生日,people10mupdatesssn,people10mupdates工資)

語法細節,請參閱

  • 磚運行時7。x,上圖:<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/spark/latest/spark-sql/language-manual/delta-merge-into.html">合並成

  • 磚運行時5.5 LTS和6. x:_

delta.tables進口*deltaTablePeople=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)deltaTablePeopleUpdates=DeltaTableforPath(火花,“/ tmp /δ/ people-10m-updates”)dfUpdates=deltaTablePeopleUpdatestoDF()deltaTablePeople別名(“人”)\合並(dfUpdates別名(“更新”),”的人。id=更新s.id')\whenMatchedUpdate(={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\whenNotMatchedInsert(={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\執行()
進口ioδ_進口orgapache火花sql功能_瓦爾deltaTablePeople=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)瓦爾deltaTablePeopleUpdates=DeltaTableforPath(火花,“tmp /δ/ people-10m-updates”)瓦爾dfUpdates=deltaTablePeopleUpdatestoDF()deltaTablePeople作為(“人”)合並(dfUpdates作為(“更新”),”的人。id=更新s.id")whenMatchedupdateExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))whenNotMatchedinsertExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))執行()
進口io.delta.tables。*;進口org.apache.spark.sql.functions;進口java.util.HashMap;DeltaTabledeltaTable=DeltaTableforPath(火花,“/ tmp /δ/ people-10m”)數據集<>dfUpdates=火花(“δ”)。負載(“/ tmp /δ/ people-10m-updates”)deltaTable作為(“人”)合並(dfUpdates作為(“更新”),”的人。id=更新s.id")whenMatched()updateExpr(HashMap<字符串,字符串>(){{(“id”,“updates.id”);(“firstName”,“updates.firstName”);(“middleName”,“updates.middleName”);(“姓”,“updates.lastName”);(“性別”,“updates.gender”);(“生日”,“updates.birthDate”);(“ssn”,“updates.ssn”);(“工資”,“updates.salary”);}})whenNotMatched()insertExpr(HashMap<字符串,字符串>(){{(“id”,“updates.id”);(“firstName”,“updates.firstName”);(“middleName”,“updates.middleName”);(“姓”,“updates.lastName”);(“性別”,“updates.gender”);(“生日”,“updates.birthDate”);(“ssn”,“updates.ssn”);(“工資”,“updates.salary”);}})執行();

看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-apidoc.html">三角洲湖apiScala的Java、Python語法細節。

三角洲湖合並操作通常需要兩個通過源數據。如果你的源數據包含不確定性表達式,多個傳遞源數據可以產生不同的行導致不正確的結果。一些常見的例子包括不確定性表達式當前日期current_timestamp功能。如果你不能避免使用不確定的函數,考慮源數據保存到存儲,例如作為臨時三角洲表。緩存的源數據可能不解決這個問題,如緩存失效所導致源數據重新計算部分或完全(例如當集群失去一些執行人當縮放)。

操作語義<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#operation-semantics" title="">

這是一個詳細的描述合並編程操作。

  • 可以有任意數量的whenMatchedwhenNotMatched條款。

    請注意

    在磚運行時7.2及以下,合並可以有最多2whenMatched條款和最多1whenNotMatched條款。

  • whenMatched條款執行時源行匹配目標表行根據匹配條件。這些條款有以下語義。

    • whenMatched條款最多隻能有一個更新和一個刪除行動。的更新行動合並僅更新(類似於指定的列更新操作)匹配的目標行。的刪除刪除匹配的行。

    • 每一個whenMatched條款可以有一個可選的條件。如果這一條款條件存在,更新orgydF4y2Ba刪除行動執行任何匹配的源-目標行隻有當條款條件為真。

    • 如果有多個whenMatched條款,然後他們評估的順序指定。所有whenMatched條款,除了最後一個,必須有條件。

    • 如果沒有一個whenMatched條件評估為true的源和目標匹配的行對合並條件,那麼目標行左不變。

    • 更新目標三角洲表的所有列的相應列源數據集,使用whenMatched (…) .updateAll ()。這相當於:

      whenMatched(…)。updateExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))

      對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。

      請注意

      這種行為變化時自動啟用模式遷移。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。

  • whenNotMatched條款執行時源行不匹配任何行基於目標匹配條件。這些條款有以下語義。

    • whenNotMatched條款可以隻有插入行動。生成新行根據指定的列和相應的表達式。你不需要指定目標表中的所有列。對於未指定的目標列,被插入。

      請注意

      在磚運行時的6.5和下麵,您必須提供的目標表中所有的列插入行動。

    • 每一個whenNotMatched條款可以有一個可選的條件。如果條款條件存在,源行插入隻有當條件為真行。否則,源列將被忽略。

    • 如果有多個whenNotMatched條款,然後他們評估的順序指定。所有whenNotMatched條款,除了最後一個,必須有條件。

    • 插入目標三角洲表的所有列的相應列源數據集,使用whenNotMatched (…) .insertAll ()。這相當於:

      whenNotMatched(…)。insertExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))

      對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。

      請注意

      這種行為變化時自動啟用模式遷移。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。

重要的

  • 一個合並源數據集的操作就會失敗,如果多行匹配和合並嚐試更新相同的目標三角洲表行。根據SQL合並的語義,等更新操作是模棱兩可的尚不清楚應該使用哪個源行更新匹配的目標行。源表可以進行預處理來消除多個匹配的可能性。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#write-change-data-into-a-delta-table">變化數據捕獲的例子——顯示了如何預處理改變數據集(即源數據集)隻保留最新的改變對於每個關鍵申請前三角洲到目標表。

  • 一個合並操作可以產生不正確的結果如果源數據集是不確定的。這是因為合並可能執行的兩個掃描源數據集和數據兩個掃描產生的不同,最後更改表可以是不正確的。非確定性源可以出現在許多方麵。其中一些如下:

    • 閱讀從non-Delta表。例如,閱讀可以改變從一個CSV表底層文件之間的多個掃描。

    • 使用不確定的操作。例如,Dataset.filter ()使用當前時間戳來過濾數據的操作多個掃描之間可以產生不同的結果。

  • 你可以申請一個SQL合並操作在一個SQL視圖隻有視圖已被定義為創建視圖viewName作為選擇*deltaTable

請注意

在磚運行時7.3 LTS以上,無條件刪除多個匹配時允許匹配(因為無條件的刪除不是模棱兩可的即使有多個匹配)。

模式驗證<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#schema-validation" title="">

合並自動驗證的模式由插入和更新表達式生成的數據表的兼容模式。它使用以下規則來確定合並操作兼容:

  • 更新插入操作,指定的目標列必須存在於目標三角洲表。

  • updateAllinsertAll行動,源數據集必須有目標三角洲表的所有列。源數據集可以有額外的列,他們將被忽略。

  • 為所有操作,如果數據類型表達式生成的生產目標列不同於目標三角洲表中相應的列,合並試圖把他們的類型表。

自動模式演化<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#automatic-schema-evolution" title="">

請注意

模式演化的合並有磚運行時6.6及以上。

默認情況下,updateAllinsertAll指定目標三角洲表中的所有列和列名稱相同的源數據集。任何列在源數據集不匹配目標表中的列將被忽略。但是,在某些用例,需要自動添加源列到目標差值表。自動更新在表模式合並操作updateAllinsertAll(至少一個),您可以設置引發會話配置spark.databricks.delta.schema.autoMerge.enabled真正的在運行之前合並操作。

請注意

  • 模式演化發生隻有當有一個updateAll(更新*)或一個insertAll(插入*)行動,或兩者兼而有之。

  • 更新插入行動不能顯式地引用目標列不已經存在於目標表(即使它有updateAllorgydF4y2BainsertAll的條款)。請參見下麵的示例。

請注意

在磚運行時7.4及以下,合並隻支持模式演化的頂級列,而不是嵌套的列。

這裏有一些例子的影響合並操作,沒有模式演化。

查詢(在Scala中)

行為沒有模式演化(默認)

行為模式演化

目標列:鍵,價值

源列:鍵,值,newValue

targetDeltaTable別名(“t”)合並(sourceDataFrame別名(“s”),“t。關鍵= s.key”)whenMatched()。updateAll()whenNotMatched()。insertAll()執行()

表模式保持不變;隻列關鍵,價值更新/插入。

表模式改變(關鍵值,newValue)updateAll更新列價值newValue,insertAll插入行(關鍵值,newValue)

目標列:鍵,oldValue

源列:鍵,newValue

targetDeltaTable別名(“t”)合並(sourceDataFrame別名(“s”),“t。關鍵= s.key”)whenMatched()。updateAll()whenNotMatched()。insertAll()執行()

updateAllinsertAll因為目標列行動拋出一個錯誤oldValue不是在源。

表模式改變(關鍵oldValue,newValue)updateAll更新列關鍵newValue離開oldValue保持不變,insertAll插入行(關鍵空,newValue)(即,oldValue是插入)。

目標列:鍵,oldValue

源列:鍵,newValue

targetDeltaTable別名(“t”)合並(sourceDataFrame別名(“s”),“t。關鍵= s.key”)whenMatched()。更新(地圖(“newValue”- >上校(“s.newValue”)))whenNotMatched()。insertAll()執行()

更新拋出一個錯誤,因為列newValue目標表中不存在。

更新因為列仍拋出一個錯誤newValue目標表中不存在。

目標列:鍵,oldValue

源列:鍵,newValue

targetDeltaTable別名(“t”)合並(sourceDataFrame別名(“s”),“t。關鍵= s.key”)whenMatched()。updateAll()whenNotMatched()。插入(地圖(“關鍵”- >上校(“s.key”),“newValue”- >上校(“s.newValue”)))執行()

插入拋出一個錯誤,因為列newValue目標表中不存在。

插入列仍然拋出一個錯誤newValue目標表中不存在。

特別注意事項包含陣列結構模式<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#special-considerations-for-schemas-that-contain-arrays-of-structs" title="">

δ合並支持解決結構體字段為結構體數組的名字和發展模式。啟用模式演化後,目標表模式將為陣列結構的進化,也適用於任何嵌套的結構體數組的內部。

請注意

這個特性可以在磚運行時9.1及以上。磚運行時的9.0及以下,隱式火花鑄造用於數組結構來解決結構體字段的位置,和合並操作的影響,沒有模式演化的結構體數組與以外的結構體數組的行為不一致。

這裏有一些例子的合並操作的影響,沒有模式演化的結構體數組。

源模式

目標模式

行為沒有模式演化(默認)

行為模式演化

數組< struct < b:字符串,答:string > >

<結構體數組< int, b: int > >

表模式保持不變。列將解決名稱和更新或插入。

表模式保持不變。列將解決名稱和更新或插入。

數組< struct < int, c:字符串,d: string > >

<結構體數組<字符串,b: string > >

更新插入把錯誤是因為cd目標表中不存在。

表模式更改為數組< struct <字符串,b:字符串,c:字符串,d: string > >。cd是插入現有條目的目標表。更新插入源表中的條目填充一個字符串和轉化b作為

數組< struct <字符串,b: struct < c:字符串,d: string > > >

數組< struct <字符串,b: struct < c: string > > >

更新插入把錯誤是因為d目標表中不存在。

目標表模式更改為數組< struct <字符串,b: struct < c:字符串,d: string > > >。d是插入現有條目的目標表。

性能調優<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#performance-tuning" title="">

你可以減少花費的時間合並使用以下方法:

  • 減少匹配的搜索空間:默認情況下,合並操作搜索整個三角洲表來找到匹配的源表。加速的一種方式合並是減少搜索空間在比賽中通過添加已知約束條件。例如,假設您有一個表分區的國家日期你想使用合並為最後一天更新信息,一個特定的國家。添加條件

    事件日期=當前日期()事件國家=“美國”

    將使查詢速度因為它查找匹配隻有在相關的分區。此外,它還將減少衝突的機會與其他並發操作。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/concurrency-control.html">並發控製為更多的細節。

  • 緊湊的文件:如果數據存儲在許多小文件,讀取的數據搜索匹配會變得緩慢。你可以緊湊的小文件到大文件來提高閱讀的吞吐量。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/best-practices.html">緊湊的文件獲取詳細信息。

  • 控製調整分區寫道:合並多次操作打亂數據更新的數據計算和寫作。任務的數量用於洗牌由火花控製會話配置spark.sql.shuffle.partitions。設置這個參數不僅控製並行性,也決定了輸出文件的數量。增加價值增加並行,也產生大量的小的數據文件。

  • 啟用優化寫道:對於分區表,合並可以產生大量小文件的數量比洗牌分區。這是因為每一個洗牌的任務在多個分區,可以寫多個文件,可以成為一個性能瓶頸。你可以減少文件通過啟用<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/auto-optimize.html">優化的寫

請注意

在磚運行時的7.4及以上,<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/auto-optimize.html">優化的寫是自動啟用合並分區表的操作。

  • 優化文件大小在表:在磚運行時的8.2及以上,磚可以自動檢測如果δ表頻繁合並重寫的操作文件,可以選擇重寫文件的大小減少的預期在未來進一步的文件改寫。上看到的部分<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/file-mgmt.html">優化文件大小獲取詳細信息。

  • 低洗牌合並:在磚運行時的9.0及以上,<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/low-shuffle-merge.html">低洗牌合並提供了一個優化的實現合並為最常見的工作負載提供更好的性能。此外,它保留了現有的數據布局優化等<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/file-mgmt.html">z值修改的數據。

合並的例子<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#merge-examples" title="">

這裏有一些關於如何使用示例合並在不同的場景中。

重複數據刪除在編寫到三角洲表中<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#data-deduplication-when-writing-into-delta-tables" title="">

常見的ETL的用例是收集日誌到三角洲表通過添加一個表。然而,通常來源可以生成重複的日誌記錄和下遊重複數據刪除步驟需要照顧他們。與合並,你可以避免插入重複的記錄。

合並日誌使用newDedupedLogs日誌uniqueId=newDedupedLogsuniqueId匹配然後插入*
deltaTable別名(“日誌”)合並(newDedupedLogs別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)\whenNotMatchedInsertAll()\執行()
deltaTable作為(“日誌”)合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)whenNotMatched()insertAll()執行()
deltaTable作為(“日誌”)合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)whenNotMatched()insertAll()執行();

請注意

包含新日誌的數據集需要刪除處理內部本身。通過SQL合並的語義,它匹配和刪除處理新數據與現有的數據表中,但是如果有新的數據集內重複數據,插入。因此,在合並之前刪除處理新的數據表。

如果你知道你可能會重複的記錄隻有幾天,您可以優化您的查詢進一步通過分區表的日期,然後指定目標表的日期範圍相匹配。

合並日誌使用newDedupedLogs日誌uniqueId=newDedupedLogsuniqueId日誌日期>當前日期()- - - - - -時間間隔7匹配newDedupedLogs日期>當前日期()- - - - - -時間間隔7然後插入*
deltaTable別名(“日誌”)合並(newDedupedLogs別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")\whenNotMatchedInsertAll(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")\執行()
deltaTable作為(“日誌”)。合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")insertAll()執行()
deltaTable作為(“日誌”)。合並(newDedupedLogs作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")insertAll()執行();

這是更有效的比前麵的命令作為副本看起來隻有在過去的7天的日誌,而不是整個表。此外,您可以使用此純插入合並結構化流執行連續重複數據刪除的日誌。

  • 在流媒體查詢,您可以使用合並操作foreachBatch不斷編寫任何流數據與重複數據刪除三角洲表。看到下麵的<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-in-streaming">流的例子的更多信息foreachBatch

  • 在另一個流媒體查詢,你可以不斷地從δ讀取刪除處理數據表。這是有可能的,因為一個純插入合並隻有δ表添加新數據。

請注意

純插入合並優化隻能在磚運行時6.2及以上的附加數據。寫道,在磚6.1運行時,下麵從純插入合並操作不能讀取流。

慢慢地改變數據(SCD) 2型操作到三角洲表中<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#slowly-changing-data-scd-type-2-operation-into-delta-tables" title="">

另一個常見的操作是SCD 2型,維護所有更改的曆史維度表中每一個關鍵。這樣的操作需要更新現有的行之前的鍵值標記為老,和插入新行最新值。給定一個源表與目標表的更新和維度數據,SCD 2型可以表達合並

下麵是一個具體的例子維護地址客戶的曆史以及每個地址的有效日期範圍。當客戶的地址需要更新時,你必須馬克前麵的地址不是當前,更新其活躍的日期範圍,並添加新地址。

使用合並筆記本SCD 2型<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#scd-type-2-using-merge-notebook" title="">

更改數據寫入一個增量表<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#write-change-data-into-a-delta-table" title="">

類似的化合物,另一個常見的用例,通常被稱為變化數據捕獲(CDC),是適用於所有數據更改來自外部數據庫到三角洲表。換句話說,一組更新,刪除,插入應用到一個外部表需要應用於三角洲表。您可以使用合並如下。

使用合並筆記本寫更改數據<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#write-change-data-using-merge-notebook" title="">

插入從流媒體查詢使用foreachBatch

您可以使用的組合合並foreachBatch(見<一個class="reference external" href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">foreachbatch對於更多信息)寫複雜的從流媒體查詢插入到三角洲表。例如:

  • 寫流聚集在更新模式:這是更有效的比完整的模式。

  • 數據庫更改流寫入一個增量表:<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-in-cdc">合並查詢寫更改數據可以用在foreachBatch不斷應用流變化δ表。

  • 寫一個數據流到三角洲與重複數據刪除表:<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-in-dedup">純插入合並查詢重複數據刪除可以用在foreachBatch不斷寫入數據(副本)三角洲表自動重複數據刪除。

請注意

  • 確保你的合並聲明內foreachBatch是冪等的重啟流查詢可以應用相同的一批數據上的操作很多次了。

  • 合並被用在foreachBatch,輸入數據流查詢(通過報道StreamingQueryProgress和可見的筆記本率圖)可能被報道為多個實際的速率生成的數據來源。這是因為合並讀取輸入數據多次導致輸入指標成倍增加。如果這是一個瓶頸,您可以緩存批DataFrame之前合並然後uncache之後合並

寫流聚集在更新模式中使用合並和foreachBatch筆記本<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#write-streaming-aggregates-in-update-mode-using-merge-and-foreachbatch-notebook" title="">