表刪除、更新和合並<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#table-deletes-updates-and-merges" title="">
三角洲湖支持多個語句來促進三角洲表中刪除的數據和更新數據。
概述和示範三角洲湖的刪除和更新數據,看這個YouTube視頻(54分鍾)。
額外的信息捕獲變更數據從三角洲湖,看這個YouTube視頻(53分鍾)。
刪除從表<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#delete-from-a-table" title="">
您可以刪除數據匹配謂詞從三角洲表。例如,在一個表命名people10m
或一個路徑/ tmp /δ/ people-10m
刪除所有行對應於人的價值生日
之前的列1955年
,您可以運行下麵的:
刪除從people10m在哪裏生日<“1955-01-01”刪除從δ。' /tmp/δ/人- - - - - -10米”在哪裏生日<“1955-01-01”
請注意
Python API有磚運行時的6.1及以上。
從delta.tables進口*從pyspark.sql.functions進口*deltaTable=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)#聲明使用SQL-formatted字符串謂詞。deltaTable。刪除(“生日< 1955-01-01”)#使用謂詞的火花SQL函數聲明。deltaTable。刪除(上校(“生日”)<“1960-01-01”)
請注意
Scala API有磚運行時的6.0及以上。
進口io。δ。表。_瓦爾deltaTable=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)/ /聲明使用SQL-formatted字符串謂詞。deltaTable。刪除(“生日< 1955-01-01”)進口org。apache。火花。sql。功能。_進口火花。值得一提的。_/ /聲明使用火花值得一提的是SQL函數和謂詞。deltaTable。刪除(上校(“生日”)<“1955-01-01”)
請注意
磚的Java API可用運行時6.0及以上。
進口io.delta.tables。*;進口org.apache.spark.sql.functions;DeltaTabledeltaTable=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”);/ /聲明使用SQL-formatted字符串謂詞。deltaTable。刪除(“生日< 1955-01-01”);/ /聲明謂詞使用火花SQL函數。deltaTable。刪除(功能。上校(“生日”)。lt(功能。點燃(“1955-01-01”)));
看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-apidoc.html">三角洲湖api獲取詳細信息。
重要的
刪除
從三角洲的最新版本中刪除數據表,但不刪除它從物理存儲到舊版本明確真空的。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-utility.html">真空獲取詳細信息。
提示
在可能的情況下,提供一個分區的分區列上的謂詞三角洲表這樣謂詞可以顯著加快操作。
更新一個表<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#update-a-table" title="">
你可以更新數據相匹配的謂詞在三角洲表。例如,在一個表命名people10m
或一個路徑/ tmp /δ/ people-10m
,改變的縮寫性別
列從米
orgydF4y2BaF
來男性
orgydF4y2Ba女
,您可以運行下麵的:
更新people10m集性別=“女”在哪裏性別=“F”;更新people10m集性別=“男”在哪裏性別=“米”;更新δ。' /tmp/δ/人- - - - - -10米”集性別=“女”在哪裏性別=“F”;更新δ。' /tmp/δ/人- - - - - -10米”集性別=“男”在哪裏性別=“米”;
請注意
Python API有磚運行時的6.1及以上。
從delta.tables進口*從pyspark.sql.functions進口*deltaTable=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)#聲明使用SQL-formatted字符串謂詞。deltaTable。更新(條件=“性別= ' F '”,集={“性別”:“女”})#使用謂詞的火花SQL函數聲明。deltaTable。更新(條件=上校(“性別”)= =“米”,集={“性別”:點燃(“男”)})
請注意
Scala API有磚運行時的6.0及以上。
進口io。δ。表。_瓦爾deltaTable=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)/ /聲明使用SQL-formatted字符串謂詞。deltaTable。updateExpr(“性別= ' F '”,地圖(“性別”- >“女”)進口org。apache。火花。sql。功能。_進口火花。值得一提的。_/ /聲明使用火花值得一提的是SQL函數和謂詞。deltaTable。更新(上校(“性別”)= = =“M”,地圖(“性別”- >點燃(“男性”)));
請注意
Scala API有磚運行時的6.0及以上。
進口io.delta.tables。*;進口org.apache.spark.sql.functions;進口java.util.HashMap;DeltaTabledeltaTable=DeltaTable。forPath(火花,“/數據/事件/”);/ /聲明使用SQL-formatted字符串謂詞。deltaTable。updateExpr(“性別= ' F '”,新HashMap<字符串,字符串>(){{把(“性別”,“女”);}});/ /聲明謂詞使用火花SQL函數。deltaTable。更新(功能。上校(性別)。情商(“M”),新HashMap<字符串,列>(){{把(“性別”,功能。點燃(“男性”));}});
看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-apidoc.html">三角洲湖api獲取詳細信息。
提示
類似於刪除,更新操作可以顯著加速分區上的謂詞。
使用合並插入到表中<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#upsert-into-a-table-using-merge" title="">
您可以插入數據從源表,視圖,或者DataFrame目標三角洲表使用合並
SQL操作。三角洲湖支持插入、更新和刪除合並
,它支持擴展語法之外的SQL標準促進先進的用例。
假設您有一個源表命名people10mupdates
或一個源路徑/ tmp /δ/ people-10m-updates
,包含一個目標表命名的新數據people10m
或一個目標路徑/ tmp /δ/ people-10m
。其中一些新記錄可能已經出現在目標數據。合並的新數據,你想更新人的行id
已經存在並插入新行,不匹配id
是禮物。你可以運行以下:
合並成people10m使用people10mupdates在people10m。id=people10mupdates。id當匹配然後更新集id=people10mupdates。id,firstName=people10mupdates。firstName,middleName=people10mupdates。middleName,姓=people10mupdates。姓,性別=people10mupdates。性別,生日=people10mupdates。生日,ssn=people10mupdates。ssn,工資=people10mupdates。工資當不匹配然後插入(id,firstName,middleName,姓,性別,生日,ssn,工資)值(people10mupdates。id,people10mupdates。firstName,people10mupdates。middleName,people10mupdates。姓,people10mupdates。性別,people10mupdates。生日,people10mupdates。ssn,people10mupdates。工資)
語法細節,請參閱
磚運行時7。x,上圖:<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/spark/latest/spark-sql/language-manual/delta-merge-into.html">合並成
磚運行時5.5 LTS和6. x:_
從delta.tables進口*deltaTablePeople=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)deltaTablePeopleUpdates=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m-updates”)dfUpdates=deltaTablePeopleUpdates。toDF()deltaTablePeople。別名(“人”)\。合並(dfUpdates。別名(“更新”),”的人。id=更新s.id')\。whenMatchedUpdate(集={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\。whenNotMatchedInsert(值={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\。執行()
進口io。δ。表。_進口org。apache。火花。sql。功能。_瓦爾deltaTablePeople=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)瓦爾deltaTablePeopleUpdates=DeltaTable。forPath(火花,“tmp /δ/ people-10m-updates”)瓦爾dfUpdates=deltaTablePeopleUpdates。toDF()deltaTablePeople。作為(“人”)。合並(dfUpdates。作為(“更新”),”的人。id=更新s.id")。whenMatched。updateExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))。whenNotMatched。insertExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))。執行()
進口io.delta.tables。*;進口org.apache.spark.sql.functions;進口java.util.HashMap;DeltaTabledeltaTable=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)數據集<行>dfUpdates=火花。讀(“δ”)。負載(“/ tmp /δ/ people-10m-updates”)deltaTable。作為(“人”)。合並(dfUpdates。作為(“更新”),”的人。id=更新s.id")。whenMatched()。updateExpr(新HashMap<字符串,字符串>(){{把(“id”,“updates.id”);把(“firstName”,“updates.firstName”);把(“middleName”,“updates.middleName”);把(“姓”,“updates.lastName”);把(“性別”,“updates.gender”);把(“生日”,“updates.birthDate”);把(“ssn”,“updates.ssn”);把(“工資”,“updates.salary”);}})。whenNotMatched()。insertExpr(新HashMap<字符串,字符串>(){{把(“id”,“updates.id”);把(“firstName”,“updates.firstName”);把(“middleName”,“updates.middleName”);把(“姓”,“updates.lastName”);把(“性別”,“updates.gender”);把(“生日”,“updates.birthDate”);把(“ssn”,“updates.ssn”);把(“工資”,“updates.salary”);}})。執行();
看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/delta-apidoc.html">三角洲湖apiScala的Java、Python語法細節。
三角洲湖合並操作通常需要兩個通過源數據。如果你的源數據包含不確定性表達式,多個傳遞源數據可以產生不同的行導致不正確的結果。一些常見的例子包括不確定性表達式當前日期
和current_timestamp
功能。如果你不能避免使用不確定的函數,考慮源數據保存到存儲,例如作為臨時三角洲表。緩存的源數據可能不解決這個問題,如緩存失效所導致源數據重新計算部分或完全(例如當集群失去一些執行人當縮放)。
操作語義<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#operation-semantics" title="">
這是一個詳細的描述合並
編程操作。
可以有任意數量的
whenMatched
和whenNotMatched
條款。請注意
在磚運行時7.2及以下,
合並
可以有最多2whenMatched
條款和最多1whenNotMatched
條款。whenMatched
條款執行時源行匹配目標表行根據匹配條件。這些條款有以下語義。whenMatched
條款最多隻能有一個更新
和一個刪除
行動。的更新
行動合並
僅更新(類似於指定的列更新
操作)匹配的目標行。的刪除
刪除匹配的行。每一個
whenMatched
條款可以有一個可選的條件。如果這一條款條件存在,更新
orgydF4y2Ba刪除
行動執行任何匹配的源-目標行隻有當條款條件為真。如果有多個
whenMatched
條款,然後他們評估的順序指定。所有whenMatched
條款,除了最後一個,必須有條件。如果沒有一個
whenMatched
條件評估為true的源和目標匹配的行對合並條件,那麼目標行左不變。更新目標三角洲表的所有列的相應列源數據集,使用
whenMatched (…) .updateAll ()
。這相當於:whenMatched(…)。updateExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))
對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。
請注意
這種行為變化時自動啟用模式遷移。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。
whenNotMatched
條款執行時源行不匹配任何行基於目標匹配條件。這些條款有以下語義。whenNotMatched
條款可以隻有插入
行動。生成新行根據指定的列和相應的表達式。你不需要指定目標表中的所有列。對於未指定的目標列,零
被插入。請注意
在磚運行時的6.5和下麵,您必須提供的目標表中所有的列
插入
行動。每一個
whenNotMatched
條款可以有一個可選的條件。如果條款條件存在,源行插入隻有當條件為真行。否則,源列將被忽略。如果有多個
whenNotMatched
條款,然後他們評估的順序指定。所有whenNotMatched
條款,除了最後一個,必須有條件。插入目標三角洲表的所有列的相應列源數據集,使用
whenNotMatched (…) .insertAll ()
。這相當於:whenNotMatched(…)。insertExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))
對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。
請注意
這種行為變化時自動啟用模式遷移。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-schema-evolution">自動模式演化獲取詳細信息。
重要的
一個
合並
源數據集的操作就會失敗,如果多行匹配和合並嚐試更新相同的目標三角洲表行。根據SQL合並的語義,等更新操作是模棱兩可的尚不清楚應該使用哪個源行更新匹配的目標行。源表可以進行預處理來消除多個匹配的可能性。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#write-change-data-into-a-delta-table">變化數據捕獲的例子——顯示了如何預處理改變數據集(即源數據集)隻保留最新的改變對於每個關鍵申請前三角洲到目標表。一個
合並
操作可以產生不正確的結果如果源數據集是不確定的。這是因為合並
可能執行的兩個掃描源數據集和數據兩個掃描產生的不同,最後更改表可以是不正確的。非確定性源可以出現在許多方麵。其中一些如下:閱讀從non-Delta表。例如,閱讀可以改變從一個CSV表底層文件之間的多個掃描。
使用不確定的操作。例如,
Dataset.filter ()
使用當前時間戳來過濾數據的操作多個掃描之間可以產生不同的結果。
你可以申請一個SQL
合並
操作在一個SQL視圖隻有視圖已被定義為創建視圖viewName作為選擇*從deltaTable
。
請注意
在磚運行時7.3 LTS以上,無條件刪除多個匹配時允許匹配(因為無條件的刪除不是模棱兩可的即使有多個匹配)。
模式驗證<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#schema-validation" title="">
合並
自動驗證的模式由插入和更新表達式生成的數據表的兼容模式。它使用以下規則來確定合並
操作兼容:
為
更新
和插入
操作,指定的目標列必須存在於目標三角洲表。為
updateAll
和insertAll
行動,源數據集必須有目標三角洲表的所有列。源數據集可以有額外的列,他們將被忽略。為所有操作,如果數據類型表達式生成的生產目標列不同於目標三角洲表中相應的列,
合並
試圖把他們的類型表。
自動模式演化<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#automatic-schema-evolution" title="">
請注意
模式演化的合並
有磚運行時6.6及以上。
默認情況下,updateAll
和insertAll
指定目標三角洲表中的所有列和列名稱相同的源數據集。任何列在源數據集不匹配目標表中的列將被忽略。但是,在某些用例,需要自動添加源列到目標差值表。自動更新在表模式合並
操作updateAll
和insertAll
(至少一個),您可以設置引發會話配置spark.databricks.delta.schema.autoMerge.enabled
來真正的
在運行之前合並
操作。
請注意
模式演化發生隻有當有一個
updateAll
(更新集*
)或一個insertAll
(插入*
)行動,或兩者兼而有之。更新
和插入
行動不能顯式地引用目標列不已經存在於目標表(即使它有updateAll
orgydF4y2BainsertAll
的條款)。請參見下麵的示例。
請注意
在磚運行時7.4及以下,合並
隻支持模式演化的頂級列,而不是嵌套的列。
這裏有一些例子的影響合並
操作,沒有模式演化。
列 |
查詢(在Scala中) |
行為沒有模式演化(默認) |
行為模式演化 |
---|---|---|---|
目標列: 源列: |
targetDeltaTable。別名(“t”)。合並(sourceDataFrame。別名(“s”),“t。關鍵= s.key”)。whenMatched()。updateAll()。whenNotMatched()。insertAll()。執行()
|
表模式保持不變;隻列 |
表模式改變 |
目標列: 源列: |
targetDeltaTable。別名(“t”)。合並(sourceDataFrame。別名(“s”),“t。關鍵= s.key”)。whenMatched()。updateAll()。whenNotMatched()。insertAll()。執行()
|
|
表模式改變 |
目標列: 源列: |
targetDeltaTable。別名(“t”)。合並(sourceDataFrame。別名(“s”),“t。關鍵= s.key”)。whenMatched()。更新(地圖(“newValue”- >上校(“s.newValue”)))。whenNotMatched()。insertAll()。執行()
|
|
|
目標列: 源列: |
targetDeltaTable。別名(“t”)。合並(sourceDataFrame。別名(“s”),“t。關鍵= s.key”)。whenMatched()。updateAll()。whenNotMatched()。插入(地圖(“關鍵”- >上校(“s.key”),“newValue”- >上校(“s.newValue”)))。執行()
|
|
|
特別注意事項包含陣列結構模式<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#special-considerations-for-schemas-that-contain-arrays-of-structs" title="">
δ合並成
支持解決結構體字段為結構體數組的名字和發展模式。啟用模式演化後,目標表模式將為陣列結構的進化,也適用於任何嵌套的結構體數組的內部。
請注意
這個特性可以在磚運行時9.1及以上。磚運行時的9.0及以下,隱式火花鑄造用於數組結構來解決結構體字段的位置,和合並操作的影響,沒有模式演化的結構體數組與以外的結構體數組的行為不一致。
這裏有一些例子的合並操作的影響,沒有模式演化的結構體數組。
源模式 |
目標模式 |
行為沒有模式演化(默認) |
行為模式演化 |
---|---|---|---|
數組< struct < b:字符串,答:string > > |
<結構體數組< int, b: int > > |
表模式保持不變。列將解決名稱和更新或插入。 |
表模式保持不變。列將解決名稱和更新或插入。 |
數組< struct < int, c:字符串,d: string > > |
<結構體數組<字符串,b: string > > |
|
表模式更改為數組< struct <字符串,b:字符串,c:字符串,d: string > >。 |
數組< struct <字符串,b: struct < c:字符串,d: string > > > |
數組< struct <字符串,b: struct < c: string > > > |
|
目標表模式更改為數組< struct <字符串,b: struct < c:字符串,d: string > > >。 |
性能調優<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#performance-tuning" title="">
你可以減少花費的時間合並使用以下方法:
減少匹配的搜索空間:默認情況下,
合並
操作搜索整個三角洲表來找到匹配的源表。加速的一種方式合並
是減少搜索空間在比賽中通過添加已知約束條件。例如,假設您有一個表分區的國家
和日期
你想使用合並
為最後一天更新信息,一個特定的國家。添加條件事件。日期=當前日期()和事件。國家=“美國”
將使查詢速度因為它查找匹配隻有在相關的分區。此外,它還將減少衝突的機會與其他並發操作。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/concurrency-control.html">並發控製為更多的細節。
緊湊的文件:如果數據存儲在許多小文件,讀取的數據搜索匹配會變得緩慢。你可以緊湊的小文件到大文件來提高閱讀的吞吐量。看到<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/best-practices.html">緊湊的文件獲取詳細信息。
控製調整分區寫道:
合並
多次操作打亂數據更新的數據計算和寫作。任務的數量用於洗牌由火花控製會話配置spark.sql.shuffle.partitions
。設置這個參數不僅控製並行性,也決定了輸出文件的數量。增加價值增加並行,也產生大量的小的數據文件。
啟用優化寫道:對於分區表,
合並
可以產生大量小文件的數量比洗牌分區。這是因為每一個洗牌的任務在多個分區,可以寫多個文件,可以成為一個性能瓶頸。你可以減少文件通過啟用<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/auto-optimize.html">優化的寫。
請注意
在磚運行時的7.4及以上,<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/auto-optimize.html">優化的寫是自動啟用合並
分區表的操作。
優化文件大小在表:在磚運行時的8.2及以上,磚可以自動檢測如果δ表頻繁
合並
重寫的操作文件,可以選擇重寫文件的大小減少的預期在未來進一步的文件改寫。上看到的部分<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/file-mgmt.html">優化文件大小獲取詳細信息。低洗牌合並:在磚運行時的9.0及以上,<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/low-shuffle-merge.html">低洗牌合並提供了一個優化的實現
合並
為最常見的工作負載提供更好的性能。此外,它保留了現有的數據布局優化等<一個class="reference internal" href="//m.eheci.com/docs.gcp/docs.gcp/delta/optimizations/file-mgmt.html">z值修改的數據。
合並的例子<一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#merge-examples" title="">
這裏有一些關於如何使用示例合並
在不同的場景中。
重複數據刪除在編寫到三角洲表中一個><一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#data-deduplication-when-writing-into-delta-tables" title="">
常見的ETL的用例是收集日誌到三角洲表通過添加一個表。然而,通常來源可以生成重複的日誌記錄和下遊重複數據刪除步驟需要照顧他們。與合並
,你可以避免插入重複的記錄。
合並成日誌使用newDedupedLogs在日誌。uniqueId=newDedupedLogs。uniqueId當不匹配然後插入*
deltaTable。別名(“日誌”)。合並(newDedupedLogs。別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)\。whenNotMatchedInsertAll()\。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)。whenNotMatched()。insertAll()。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)。whenNotMatched()。insertAll()。執行();
請注意
包含新日誌的數據集需要刪除處理內部本身。通過SQL合並的語義,它匹配和刪除處理新數據與現有的數據表中,但是如果有新的數據集內重複數據,插入。因此,在合並之前刪除處理新的數據表。
如果你知道你可能會重複的記錄隻有幾天,您可以優化您的查詢進一步通過分區表的日期,然後指定目標表的日期範圍相匹配。
合並成日誌使用newDedupedLogs在日誌。uniqueId=newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天當不匹配和newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天然後插入*
deltaTable。別名(“日誌”)。合並(newDedupedLogs。別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")\。whenNotMatchedInsertAll(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")\。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")。whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")。insertAll()。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")。whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")。insertAll()。執行();
這是更有效的比前麵的命令作為副本看起來隻有在過去的7天的日誌,而不是整個表。此外,您可以使用此純插入合並結構化流執行連續重複數據刪除的日誌。
在流媒體查詢,您可以使用合並操作
foreachBatch
不斷編寫任何流數據與重複數據刪除三角洲表。看到下麵的<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-in-streaming">流的例子的更多信息foreachBatch
。在另一個流媒體查詢,你可以不斷地從δ讀取刪除處理數據表。這是有可能的,因為一個純插入合並隻有δ表添加新數據。
請注意
純插入合並優化隻能在磚運行時6.2及以上的附加數據。寫道,在磚6.1運行時,下麵從純插入合並操作不能讀取流。
慢慢地改變數據(SCD) 2型操作到三角洲表中一個><一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#slowly-changing-data-scd-type-2-operation-into-delta-tables" title="">
另一個常見的操作是SCD 2型,維護所有更改的曆史維度表中每一個關鍵。這樣的操作需要更新現有的行之前的鍵值標記為老,和插入新行最新值。給定一個源表與目標表的更新和維度數據,SCD 2型可以表達合並
。
下麵是一個具體的例子維護地址客戶的曆史以及每個地址的有效日期範圍。當客戶的地址需要更新時,你必須馬克前麵的地址不是當前,更新其活躍的日期範圍,並添加新地址。
更改數據寫入一個增量表一個><一個class="headerlink" href="//m.eheci.com/docs.gcp/delta/#write-change-data-into-a-delta-table" title="">
類似的化合物,另一個常見的用例,通常被稱為變化數據捕獲(CDC),是適用於所有數據更改來自外部數據庫到三角洲表。換句話說,一組更新,刪除,插入應用到一個外部表需要應用於三角洲表。您可以使用合並
如下。
插入從流媒體查詢使用foreachBatch
您可以使用的組合合並
和foreachBatch
(見<一個class="reference external" href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">foreachbatch一個>對於更多信息)寫複雜的從流媒體查詢插入到三角洲表。例如:
寫流聚集在更新模式:這是更有效的比完整的模式。
數據庫更改流寫入一個增量表:<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-in-cdc">合並查詢寫更改數據可以用在
foreachBatch
不斷應用流變化δ表。寫一個數據流到三角洲與重複數據刪除表:<一個class="reference internal" href="//m.eheci.com/docs.gcp/delta/#merge-in-dedup">純插入合並查詢重複數據刪除可以用在
foreachBatch
不斷寫入數據(副本)三角洲表自動重複數據刪除。
請注意
確保你的
合並
聲明內foreachBatch
是冪等的重啟流查詢可以應用相同的一批數據上的操作很多次了。當
合並
被用在foreachBatch
,輸入數據流查詢(通過報道StreamingQueryProgress
和可見的筆記本率圖)可能被報道為多個實際的速率生成的數據來源。這是因為合並
讀取輸入數據多次導致輸入指標成倍增加。如果這是一個瓶頸,您可以緩存批DataFrame之前合並
然後uncache之後合並
。