表流讀寫
三角洲湖與Spark結構化流通過readStream
和writeStream
.Delta Lake克服了許多與流係統和文件相關的限製,包括:
合並低延遲攝取產生的小文件
使用多個流(或並發批處理作業)維護“恰好一次”的處理
當使用文件作為流的源時,有效地發現哪些文件是新的
表作為源
當您將Delta表加載為流源並在流查詢中使用它時,查詢將處理表中出現的所有數據以及流啟動後到達的任何新數據。
您可以將路徑和表作為流加載。
火花.readStream.格式(“δ”).負載(“/ tmp /δ/事件”)進口io.δ.值得一提的._火花.readStream.δ(“/ tmp /δ/事件”)
或
進口io.δ.值得一提的._火花.readStream.格式(“δ”)。表格(“事件”)
重要的
如果Delta表的模式在對該表的流讀取開始後發生變化,則查詢失敗。對於大多數模式更改,您可以重新啟動流以解決模式不匹配並繼續處理。
不能從啟用了列映射的Delta表進行流處理,該表已經曆了非附加模式演變(如重命名或刪除列)。
限製輸入速率
以下選項可用於控製微批次:
maxFilesPerTrigger
:每個微批處理中要考慮多少個新文件。缺省值是1000。maxBytesPerTrigger
:每個微批處理的數據量。該選項設置了一個“軟最大值”,這意味著批處理大約這個數據量,並且可能處理超過限製的數據量,以便在最小輸入單元大於此限製的情況下使流查詢向前移動。如果你使用觸發器。一次
對於流,此選項將被忽略。默認情況下沒有設置。
如果你使用maxBytesPerTrigger
連同maxFilesPerTrigger
時,微批處理數據直到maxFilesPerTrigger
或maxBytesPerTrigger
達到極限。
請注意
在源表事務被清理的情況下logRetentionDuration
配置並且流處理滯後,Delta Lake處理與源表的最新可用事務曆史相對應的數據,但不會使流失敗。這可能導致數據被刪除。
流Delta Lake變更數據捕獲(CDC)饋送
三角洲湖更改數據饋送記錄對Delta表的更改,包括更新和刪除。啟用後,您可以從更改數據提要進行流處理,並將邏輯寫入下遊表中處理插入、更新和刪除。雖然更改數據提要數據輸出與它所描述的Delta表略有不同,但這提供了一種解決方案,可以將增量更改傳播到數據庫中的下遊表大獎章架構.
重要的
您不能從啟用了列映射的Delta表的更改數據提要進行流處理,該Delta表經曆了非附加模式演變(如重命名或刪除列)。
忽略更新和刪除
結構化流不處理非追加的輸入,如果對用作源的表進行了任何修改,則拋出異常。有兩種主要策略用於處理不能自動向下遊傳播的更改:
您可以刪除輸出和檢查點,並從頭重新啟動流。
您可以設置以下兩個選項之一:
ignoreDeletes
:忽略在分區邊界刪除數據的事務。skipChangeCommits
:忽略刪除或修改現有記錄的事務。skipChangeCommits
包容ignoreDeletes
.
請注意
在Databricks Runtime 12.1及以上版本中,skipChangeCommits
棄用以前的設置ignoreChanges
.在Databricks Runtime 12.0及以下版本中,ignoreChanges
是唯一受支持的選項。
的語義ignoreChanges
差異很大skipChangeCommits
.與ignoreChanges
啟用後,源表中重寫的數據文件將在數據更改操作之後重新發出,例如更新
,合並成
,刪除
(在分區內),或覆蓋
.未更改的行經常與新行一起發出,因此下遊消費者必須能夠處理重複的行。刪除不會向下傳播。ignoreChanges
包容ignoreDeletes
.
skipChangeCommits
完全忽略文件更改操作。源表中由於數據更改操作而重寫的數據文件,例如更新
,合並成
,刪除
,覆蓋
完全被忽略。為了反映上遊源表中的更改,必須實現單獨的邏輯來傳播這些更改。
例子
例如,假設您有一個表user_events
與日期
,user_email
,行動
所劃分的列日期
.你流出user_events
表,由於GDPR,您需要刪除其中的數據。
當您在分區邊界處刪除時(即在哪裏
在分區列上),這些文件已經按值分割,因此刪除隻是將這些文件從元數據中刪除。因此,如果你隻是想從某些分區刪除數據,你可以使用:
火花.readStream.格式(“δ”).選項(“ignoreDeletes”,“真正的”).負載(“/ tmp /δ/ user_events”)
但是,如果要刪除的數據基於user_email
,則需要使用:
火花.readStream.格式(“δ”).選項(“ignoreChanges”,“真正的”).負載(“/ tmp /δ/ user_events”)
如果你更新了user_email
與更新
語句中,包含user_email
問題是重寫。當你使用ignoreChanges
,新記錄與同一文件中所有其他未更改的記錄一起向下傳播。您的邏輯應該能夠處理這些傳入的重複記錄。
指定初始位置
您可以使用以下選項指定Delta Lake流源的起點,而無需處理整個表。
startingVersion
:從Delta Lake版本開始。從這個版本(包括)開始的所有表更改都將由流源讀取。提交版本可以從版本
的列描述曆史命令的輸出。在Databricks Runtime 7.4及以上版本中,若要僅返回最新更改,請指定
最新的
.startingTimestamp
:開始的時間戳。在時間戳或之後提交的所有表更改(包括)將被流源讀取。之一:時間戳字符串。例如,
“2019 - 01 - 01 t00:00:00.000z”
.日期字符串。例如,
“2019-01-01”
.
你不能同時設置兩個選項;你隻能使用其中一個。隻有在開始新的流查詢時才生效。如果流查詢已經啟動,並且進度已記錄在其檢查點中,則忽略這些選項。
重要的
雖然可以從指定的版本或時間戳啟動流源,但流源的模式始終是Delta表的最新模式。必須確保在指定的版本或時間戳之後,Delta表沒有不兼容的模式更改。否則,流源在讀取使用不正確模式的數據時可能返回不正確的結果。
處理初始快照,不刪除數據
請注意
該特性在Databricks Runtime 11.1及以上版本上可用。此功能已在公共預覽.
當使用Delta表作為流源時,查詢首先處理表中出現的所有數據。這個版本的Delta表稱為初始快照。默認情況下,Delta表的數據文件根據最後修改的文件進行處理。但是,最後一次修改時間並不一定代表記錄事件的時間順序。
在具有已定義水印的有狀態流查詢中,按修改時間處理文件可能導致記錄以錯誤的順序處理。這可能導致記錄通過水印作為後期事件刪除。
您可以通過啟用以下選項來避免數據丟失問題:
withEventTimeOrder:初始快照是否應該按照事件時間順序進行處理。
啟用事件時間順序後,快照初始數據的事件時間範圍被劃分為多個時間桶。每個微批通過過濾時間範圍內的數據來處理一個桶。maxFilesPerTrigger和maxBytesPerTrigger配置選項仍然適用於控製微批處理大小,但由於處理的性質,隻能以近似的方式進行控製。
下圖展示了這個過程:
關於此特性的注意事項:
隻有在按默認順序處理有狀態流查詢的初始Delta快照時,才會發生數據丟失問題。
你無法改變
withEventTimeOrder
一旦流查詢開始,而初始快照仍在處理中。重新啟動withEventTimeOrder
更改後,您需要刪除檢查點。如果您正在運行啟用theventtimeorder的流查詢,在初始快照處理完成之前,您不能將其降級為不支持此功能的DBR版本。如果需要降級,可以等待初始快照完成,或者刪除檢查點並重新啟動查詢。
以下特殊場景不支持該特性:
事件時間列是一個生成的列,並且在增量源和水印之間存在非投影轉換。
在流查詢中,有一個水印具有多個增量源。
啟用事件時間順序後,Delta初始快照處理的性能可能會變慢。
每個微批處理掃描初始快照,以過濾相應事件時間範圍內的數據。為了加快篩選操作,建議使用Delta源列作為事件時間,以便應用數據跳過(檢查數據跳躍與z順序索引三角洲湖當它適用的時候)。此外,沿著事件時間列進行表分區可以進一步加快處理速度。您可以檢查Spark UI,以查看為特定的微批處理掃描了多少增量文件。
表作為接收器
還可以使用結構化流將數據寫入Delta表。事務日誌使Delta Lake能夠保證隻進行一次處理,即使有其他流或批查詢同時對該表運行。
請注意
三角洲湖真空
函數刪除不受Delta Lake管理的所有文件,但跳過以_
.可以使用目錄結構(如。)將檢查點與Delta表的其他數據和元數據一起安全地存儲< table_name > / _checkpoints
.
指標
請注意
在Databricks Runtime 8.1及以上版本中可用。
類型中尚未處理的字節數和文件數流查詢流程隨著numBytesOutstanding
和numFilesOutstanding
指標。其他指標包括:
numNewListedFiles
:為計算此批積壓而列出的Delta Lake文件的數量。backlogEndOffset
:用於計算待辦事項的表格版本。
如果在筆記本中運行流,則可以在原始數據流查詢進度儀表板中的選項卡:
{“源”:[{“描述”:“DeltaSource(文件/道路/ /源):“,“指標”:{“numBytesOutstanding”:“3456”,“numFilesOutstanding”:“8”},}]}
Append模式
默認情況下,流以追加模式運行,即向表中添加新記錄。
你可以使用path方法:
(事件.writeStream.格式(“δ”).outputMode(“添加”).選項(“checkpointLocation”,“/ tmp /δ/ _checkpoints /”).開始(“/δ/事件”))
事件.writeStream.格式(“δ”).outputMode(“添加”).選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”).開始(“/ tmp /δ/事件”)
或者是toTable
在Spark 3.1及以上版本(Databricks Runtime 8.3及以上版本)中使用。(在3.1之前的Spark版本(Databricks Runtime 8.2及以下)中,請使用表格
方法。)
(事件.writeStream.格式(“δ”).outputMode(“添加”).選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”).toTable(“事件”))
事件.writeStream.outputMode(“添加”).選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”).toTable(“事件”)
完整的模式
您還可以使用結構化流來用每個批處理替換整個表。一個例子是使用聚合計算摘要:
(火花.readStream.格式(“δ”).負載(“/ tmp /δ/事件”).groupBy(“customerId”).數().writeStream.格式(“δ”).outputMode(“完整的”).選項(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”).開始(“/ tmp /δ/ eventsByCustomer”))
火花.readStream.格式(“δ”).負載(“/ tmp /δ/事件”).groupBy(“customerId”).數().writeStream.格式(“δ”).outputMode(“完整的”).選項(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”).開始(“/ tmp /δ/ eventsByCustomer”)
上麵的示例持續更新一個表,該表包含按客戶劃分的事件總數。
對於延遲要求較低的應用程序,可以使用一次性觸發器節省計算資源。使用這些工具更新給定計劃上的摘要聚合表,隻處理自上次更新以來到達的新數據。
冪等表寫入foreachBatch
請注意
在Databricks Runtime 8.4及以上版本中可用。
命令foreachBatch允許您指定在流查詢中的任意轉換後對每個微批的輸出執行的函數。這允許實現一個foreachBatch
函數,該函數可以將微批輸出寫入一個或多個目標Delta表目的地。然而,foreachBatch
不會使這些寫操作為冪等的,因為這些寫嚐試缺乏批處理是否正在重新執行的信息。例如,重新運行失敗的批處理可能導致重複的數據寫入。
為了解決這個問題,Delta表支持以下功能DataFrameWriter
使寫函數冪等的選項:
txnAppId
:可以傳遞給每個對象的唯一字符串DataFrame
寫。例如,您可以使用StreamingQuery ID作為txnAppId
.txnVersion
:作為事務版本的單調遞增的數字。
Delta表使用的組合txnAppId
和txnVersion
識別重複寫並忽略它們。
如果批寫操作因失敗而中斷,則使用相同的應用程序和批處理ID重新運行批處理,這將幫助運行時正確識別重複的寫操作並忽略它們。申請編號(txnAppId
)可以是任何用戶生成的唯一字符串,並且不必與流ID相關。
警告
如果刪除流檢查點並使用新的檢查點重新啟動查詢,則必須提供不同的檢查點appId
;否則,從重新啟動的查詢寫入將被忽略,因為它將包含相同的內容txnAppId
批處理ID從0開始。
相同的DataFrameWriter
選項可用於在非流作業中實現冪等寫入。有關詳細信息,啟用跨作業的冪等寫入.
例子
app_id=…#作為應用程序ID的唯一字符串。defwriteToDeltaLakeTableIdempotent(batch_df,batch_id):batch_df.寫.格式(…).選項(“txnVersion”,batch_id).選項(“txnAppId”,app_id).保存(…)#位置1batch_df.寫.格式(…).選項(“txnVersion”,batch_id).選項(“txnAppId”,app_id).保存(…)#位置2
瓦爾appId=…//作為應用程序ID的唯一字符串。streamingDF.writeStream.foreachBatch{(batchDF:DataFrame,batchId:長)=>batchDF.寫.格式(…)。選項(“txnVersion”,batchId)。選項(“txnAppId”,appId)。保存(…)// location 1batchDF.寫.格式(…)。選項(“txnVersion”,batchId)。選項(“txnAppId”,appId)。保存(…)// location 2}
執行流靜態連接
您可以依賴Delta Lake的事務保證和版本控製協議來執行stream-static連接。流靜態連接使用無狀態連接將Delta表(靜態數據)的最新有效版本連接到數據流。
當Databricks在流-靜態連接中處理微批數據時,來自靜態Delta表的最新有效數據版本將與當前微批中的記錄進行連接。因為連接是無狀態的,所以您不需要配置水印,並且可以以較低的延遲處理結果。連接中使用的靜態Delta表中的數據應該是緩慢變化的。
streamingDF=火花.readStream.表格(“訂單”)staticDF=火花.讀.表格(“beplay体育app下载地址顧客”)查詢=(streamingDF.加入(staticDF,streamingDF.customer_id==staticDF.id,“內心”).writeStream.選項(“checkpointLocation”,checkpoint_path).表格(“orders_with_customer_info”))
Upsert從流查詢使用foreachBatch
你可以使用的組合合並
和foreachBatch
將流查詢中的複雜upserts寫入Delta表。看到使用foreachBatch寫入任意數據接收器.
這個模式有很多應用,包括:
在更新模式下寫入流聚合:這比完全模式更有效率。
將數據庫更改流寫入Delta表:合並查詢,用於寫入更改數據可用於
foreachBatch
來連續地對Delta表應用更改流。使用重複數據刪除將數據流寫入Delta表:用於重複數據刪除的僅插入合並查詢可用於
foreachBatch
使用自動重複數據刪除功能,連續向Delta表寫入數據(帶重複)。
請注意
確保你的
合並
聲明內foreachBatch
是冪等的,因為重新啟動流查詢可以對同一批數據應用多次操作。當
合並
用於foreachBatch
,流查詢的輸入數據速率(通過StreamingQueryProgress
在筆記本速率圖中可見)可以報告為數據在源處生成的實際速率的倍數。這是因為合並
多次讀取輸入數據,導致輸入指標相乘。如果這是一個瓶頸,您可以在此之前緩存批DataFrame合並
然後將其解緩存合並
.
下麵的示例演示如何在foreachBatch
要完成這項任務:
//使用merge將microBatchOutputDF插入Delta表的函數defupsertToDelta(microBatchOutputDF:DataFrame,batchId:長){//設置dataframe為view namemicroBatchOutputDF.createOrReplaceTempView(“更新”)//使用視圖名來應用MERGE//注意:你必須使用SparkSession來定義' updates '數據框架microBatchOutputDF.sparkSession.sql(”“”合並成聚合體t使用更新ON s.key = t.key當匹配時,更新集合*如果不匹配,則插入*”“”)}//將流聚合查詢的輸出寫入Delta表streamingAggregatesDF.writeStream.格式(“δ”).foreachBatch(upsertToDelta_).outputMode(“更新”).開始()
使用merge將microBatchOutputDF插入Delta表defupsertToDelta(microBatchOutputDF,batchId):#設置數據幀為視圖名microBatchOutputDF.createOrReplaceTempView(“更新”)使用視圖名稱應用MERGE#注意:你必須使用SparkSession來定義“updates”數據框架在Databricks Runtime 10.5及以下版本中,您必須使用以下命令:# microBatchOutputDF._jdf.sparkSession () . sql(“””microBatchOutputDF.sparkSession.sql(”“”合並成聚合體t使用更新ON s.key = t.key當匹配時,更新集合*如果不匹配,則插入*”“”)#將流聚合查詢的輸出寫入Delta表(streamingAggregatesDF.writeStream.格式(“δ”).foreachBatch(upsertToDelta).outputMode(“更新”).開始())
你也可以選擇使用Delta Lake api來執行流upserts,如下例所示:
進口io.δ.表.*瓦爾deltaTable=DeltaTable.forPath(火花,“/數據/聚合物”)//使用merge將microBatchOutputDF插入Delta表的函數defupsertToDelta(microBatchOutputDF:DataFrame,batchId:長){deltaTable.作為(“t”).合並(microBatchOutputDF.作為(“s”),"s.key = t.key").whenMatched().updateAll().whenNotMatched().insertAll().執行()}//將流聚合查詢的輸出寫入Delta表streamingAggregatesDF.writeStream.格式(“δ”).foreachBatch(upsertToDelta_).outputMode(“更新”).開始()
從delta.tables進口*deltaTable=DeltaTable.forPath(火花,“/數據/聚合物”)使用merge將microBatchOutputDF插入Delta表defupsertToDelta(microBatchOutputDF,batchId):(deltaTable.別名(“t”).合並(microBatchOutputDF.別名(“s”),"s.key = t.key").whenMatchedUpdateAll().whenNotMatchedInsertAll().執行())#將流聚合查詢的輸出寫入Delta表(streamingAggregatesDF.writeStream.格式(“δ”).foreachBatch(upsertToDelta).outputMode(“更新”).開始())