表流讀寫

三角洲湖與Spark結構化流通過readStreamwriteStream.Delta Lake克服了許多與流係統和文件相關的限製,包括:

  • 合並低延遲攝取產生的小文件

  • 使用多個流(或並發批處理作業)維護“恰好一次”的處理

  • 當使用文件作為流的源時,有效地發現哪些文件是新的

表作為源

當您將Delta表加載為流源並在流查詢中使用它時,查詢將處理表中出現的所有數據以及流啟動後到達的任何新數據。

您可以將路徑和表作為流加載。

火花readStream格式“δ”負載“/ tmp /δ/事件”進口ioδ值得一提的_火花readStreamδ“/ tmp /δ/事件”

進口ioδ值得一提的_火花readStream格式“δ”)。表格“事件”

重要的

如果Delta表的模式在對該表的流讀取開始後發生變化,則查詢失敗。對於大多數模式更改,您可以重新啟動流以解決模式不匹配並繼續處理。

不能從啟用了列映射的Delta表進行流處理,該表已經曆了非附加模式演變(如重命名或刪除列)。

限製輸入速率

以下選項可用於控製微批次:

  • maxFilesPerTrigger:每個微批處理中要考慮多少個新文件。缺省值是1000。

  • maxBytesPerTrigger:每個微批處理的數據量。該選項設置了一個“軟最大值”,這意味著批處理大約這個數據量,並且可能處理超過限製的數據量,以便在最小輸入單元大於此限製的情況下使流查詢向前移動。如果你使用觸發器。一次對於流,此選項將被忽略。默認情況下沒有設置。

如果你使用maxBytesPerTrigger連同maxFilesPerTrigger時,微批處理數據直到maxFilesPerTriggermaxBytesPerTrigger達到極限。

請注意

在源表事務被清理的情況下logRetentionDuration配置並且流處理滯後,Delta Lake處理與源表的最新可用事務曆史相對應的數據,但不會使流失敗。這可能導致數據被刪除。

流Delta Lake變更數據捕獲(CDC)饋送

三角洲湖更改數據饋送記錄對Delta表的更改,包括更新和刪除。啟用後,您可以從更改數據提要進行流處理,並將邏輯寫入下遊表中處理插入、更新和刪除。雖然更改數據提要數據輸出與它所描述的Delta表略有不同,但這提供了一種解決方案,可以將增量更改傳播到數據庫中的下遊表大獎章架構

重要的

您不能從啟用了列映射的Delta表的更改數據提要進行流處理,該Delta表經曆了非附加模式演變(如重命名或刪除列)。

忽略更新和刪除

結構化流不處理非追加的輸入,如果對用作源的表進行了任何修改,則拋出異常。有兩種主要策略用於處理不能自動向下遊傳播的更改:

  • 您可以刪除輸出和檢查點,並從頭重新啟動流。

  • 您可以設置以下兩個選項之一:

    • ignoreDeletes:忽略在分區邊界刪除數據的事務。

    • skipChangeCommits:忽略刪除或修改現有記錄的事務。skipChangeCommits包容ignoreDeletes

請注意

在Databricks Runtime 12.1及以上版本中,skipChangeCommits棄用以前的設置ignoreChanges.在Databricks Runtime 12.0及以下版本中,ignoreChanges是唯一受支持的選項。

的語義ignoreChanges差異很大skipChangeCommits.與ignoreChanges啟用後,源表中重寫的數據文件將在數據更改操作之後重新發出,例如更新合並刪除(在分區內),或覆蓋.未更改的行經常與新行一起發出,因此下遊消費者必須能夠處理重複的行。刪除不會向下傳播。ignoreChanges包容ignoreDeletes

skipChangeCommits完全忽略文件更改操作。源表中由於數據更改操作而重寫的數據文件,例如更新合並刪除,覆蓋完全被忽略。為了反映上遊源表中的更改,必須實現單獨的邏輯來傳播這些更改。

例子

例如,假設您有一個表user_events日期user_email,行動所劃分的列日期.你流出user_events表,由於GDPR,您需要刪除其中的數據。

當您在分區邊界處刪除時(即在哪裏在分區列上),這些文件已經按值分割,因此刪除隻是將這些文件從元數據中刪除。因此,如果你隻是想從某些分區刪除數據,你可以使用:

火花readStream格式“δ”選項“ignoreDeletes”“真正的”負載“/ tmp /δ/ user_events”

但是,如果要刪除的數據基於user_email,則需要使用:

火花readStream格式“δ”選項“ignoreChanges”“真正的”負載“/ tmp /δ/ user_events”

如果你更新了user_email更新語句中,包含user_email問題是重寫。當你使用ignoreChanges,新記錄與同一文件中所有其他未更改的記錄一起向下傳播。您的邏輯應該能夠處理這些傳入的重複記錄。

指定初始位置

您可以使用以下選項指定Delta Lake流源的起點,而無需處理整個表。

  • startingVersion:從Delta Lake版本開始。從這個版本(包括)開始的所有表更改都將由流源讀取。提交版本可以從版本的列描述曆史命令的輸出。

    在Databricks Runtime 7.4及以上版本中,若要僅返回最新更改,請指定最新的

  • startingTimestamp:開始的時間戳。在時間戳或之後提交的所有表更改(包括)將被流源讀取。之一:

    • 時間戳字符串。例如,“2019 - 01 - 01 t00:00:00.000z”

    • 日期字符串。例如,“2019-01-01”

你不能同時設置兩個選項;你隻能使用其中一個。隻有在開始新的流查詢時才生效。如果流查詢已經啟動,並且進度已記錄在其檢查點中,則忽略這些選項。

重要的

雖然可以從指定的版本或時間戳啟動流源,但流源的模式始終是Delta表的最新模式。必須確保在指定的版本或時間戳之後,Delta表沒有不兼容的模式更改。否則,流源在讀取使用不正確模式的數據時可能返回不正確的結果。

例子

例如,假設您有一個表user_events.如果您想讀取版本5以來的更改,請使用:

火花readStream格式“δ”選項“startingVersion”“5”負載“/ tmp /δ/ user_events”

如果您想讀取自2018-10-18以來的更改,請使用:

火花readStream格式“δ”選項“startingTimestamp”“2018-10-18”負載“/ tmp /δ/ user_events”

處理初始快照,不刪除數據

請注意

該特性在Databricks Runtime 11.1及以上版本上可用。此功能已在公共預覽

當使用Delta表作為流源時,查詢首先處理表中出現的所有數據。這個版本的Delta表稱為初始快照。默認情況下,Delta表的數據文件根據最後修改的文件進行處理。但是,最後一次修改時間並不一定代表記錄事件的時間順序。

在具有已定義水印的有狀態流查詢中,按修改時間處理文件可能導致記錄以錯誤的順序處理。這可能導致記錄通過水印作為後期事件刪除。

您可以通過啟用以下選項來避免數據丟失問題:

  • withEventTimeOrder:初始快照是否應該按照事件時間順序進行處理。

啟用事件時間順序後,快照初始數據的事件時間範圍被劃分為多個時間桶。每個微批通過過濾時間範圍內的數據來處理一個桶。maxFilesPerTrigger和maxBytesPerTrigger配置選項仍然適用於控製微批處理大小,但由於處理的性質,隻能以近似的方式進行控製。

下圖展示了這個過程:

初始快照

關於此特性的注意事項:

  • 隻有在按默認順序處理有狀態流查詢的初始Delta快照時,才會發生數據丟失問題。

  • 你無法改變withEventTimeOrder一旦流查詢開始,而初始快照仍在處理中。重新啟動withEventTimeOrder更改後,您需要刪除檢查點。

  • 如果您正在運行啟用theventtimeorder的流查詢,在初始快照處理完成之前,您不能將其降級為不支持此功能的DBR版本。如果需要降級,可以等待初始快照完成,或者刪除檢查點並重新啟動查詢。

  • 以下特殊場景不支持該特性:

    • 事件時間列是一個生成的列,並且在增量源和水印之間存在非投影轉換。

    • 在流查詢中,有一個水印具有多個增量源。

  • 啟用事件時間順序後,Delta初始快照處理的性能可能會變慢。

  • 每個微批處理掃描初始快照,以過濾相應事件時間範圍內的數據。為了加快篩選操作,建議使用Delta源列作為事件時間,以便應用數據跳過(檢查數據跳躍與z順序索引三角洲湖當它適用的時候)。此外,沿著事件時間列進行表分區可以進一步加快處理速度。您可以檢查Spark UI,以查看為特定的微批處理掃描了多少增量文件。

例子

假設你有一張表user_events與一個event_time列。您的流查詢是一個聚合查詢。如果您想確保在初始快照處理過程中沒有數據丟失,您可以使用:

火花readStream格式“δ”選項“withEventTimeOrder”“真正的”負載“/ tmp /δ/ user_events”withWatermark“event_time”“10秒”

請注意

你也可以在集群上啟用Spark配置,它將應用於所有流查詢:spark.databricks.delta.withEventTimeOrder.enabled真正的

表作為接收器

還可以使用結構化流將數據寫入Delta表。事務日誌使Delta Lake能夠保證隻進行一次處理,即使有其他流或批查詢同時對該表運行。

請注意

三角洲湖真空函數刪除不受Delta Lake管理的所有文件,但跳過以_.可以使用目錄結構(如。)將檢查點與Delta表的其他數據和元數據一起安全地存儲< table_name > / _checkpoints

指標

請注意

在Databricks Runtime 8.1及以上版本中可用。

類型中尚未處理的字節數和文件數流查詢流程隨著numBytesOutstandingnumFilesOutstanding指標。其他指標包括:

  • numNewListedFiles:為計算此批積壓而列出的Delta Lake文件的數量。

    • backlogEndOffset:用於計算待辦事項的表格版本。

如果在筆記本中運行流,則可以在原始數據流查詢進度儀表板中的選項卡:

{“源”{“描述”“DeltaSource(文件/道路/ /源):““指標”{“numBytesOutstanding”“3456”“numFilesOutstanding”“8”},}}

Append模式

默認情況下,流以追加模式運行,即向表中添加新記錄。

你可以使用path方法:

事件writeStream格式“δ”outputMode“添加”選項“checkpointLocation”“/ tmp /δ/ _checkpoints /”開始“/δ/事件”
事件writeStream格式“δ”outputMode“添加”選項“checkpointLocation”“/ tmp /δ/事件/ _checkpoints /”開始“/ tmp /δ/事件”

或者是toTable在Spark 3.1及以上版本(Databricks Runtime 8.3及以上版本)中使用。(在3.1之前的Spark版本(Databricks Runtime 8.2及以下)中,請使用表格方法。)

事件writeStream格式“δ”outputMode“添加”選項“checkpointLocation”“/ tmp /δ/事件/ _checkpoints /”toTable“事件”
事件writeStreamoutputMode“添加”選項“checkpointLocation”“/ tmp /δ/事件/ _checkpoints /”toTable“事件”

完整的模式

您還可以使用結構化流來用每個批處理替換整個表。一個例子是使用聚合計算摘要:

火花readStream格式“δ”負載“/ tmp /δ/事件”groupBy“customerId”()writeStream格式“δ”outputMode“完整的”選項“checkpointLocation”“/ tmp /δ/ eventsByCustomer / _checkpoints /”開始“/ tmp /δ/ eventsByCustomer”
火花readStream格式“δ”負載“/ tmp /δ/事件”groupBy“customerId”()writeStream格式“δ”outputMode“完整的”選項“checkpointLocation”“/ tmp /δ/ eventsByCustomer / _checkpoints /”開始“/ tmp /δ/ eventsByCustomer”

上麵的示例持續更新一個表,該表包含按客戶劃分的事件總數。

對於延遲要求較低的應用程序,可以使用一次性觸發器節省計算資源。使用這些工具更新給定計劃上的摘要聚合表,隻處理自上次更新以來到達的新數據。

冪等表寫入foreachBatch

請注意

在Databricks Runtime 8.4及以上版本中可用。

命令foreachBatch允許您指定在流查詢中的任意轉換後對每個微批的輸出執行的函數。這允許實現一個foreachBatch函數,該函數可以將微批輸出寫入一個或多個目標Delta表目的地。然而,foreachBatch不會使這些寫操作為冪等的,因為這些寫嚐試缺乏批處理是否正在重新執行的信息。例如,重新運行失敗的批處理可能導致重複的數據寫入。

為了解決這個問題,Delta表支持以下功能DataFrameWriter使寫函數冪等的選項:

  • txnAppId:可以傳遞給每個對象的唯一字符串DataFrame寫。例如,您可以使用StreamingQuery ID作為txnAppId

  • txnVersion:作為事務版本的單調遞增的數字。

Delta表使用的組合txnAppIdtxnVersion識別重複寫並忽略它們。

如果批寫操作因失敗而中斷,則使用相同的應用程序和批處理ID重新運行批處理,這將幫助運行時正確識別重複的寫操作並忽略它們。申請編號(txnAppId)可以是任何用戶生成的唯一字符串,並且不必與流ID相關。

警告

如果刪除流檢查點並使用新的檢查點重新啟動查詢,則必須提供不同的檢查點appId;否則,從重新啟動的查詢寫入將被忽略,因為它將包含相同的內容txnAppId批處理ID從0開始。

相同的DataFrameWriter選項可用於在非流作業中實現冪等寫入。有關詳細信息,啟用跨作業的冪等寫入

例子

app_id=#作為應用程序ID的唯一字符串。defwriteToDeltaLakeTableIdempotentbatch_dfbatch_id):batch_df格式選項“txnVersion”batch_id選項“txnAppId”app_id保存#位置1batch_df格式選項“txnVersion”batch_id選項“txnAppId”app_id保存#位置2
瓦爾appId=//作為應用程序ID的唯一字符串。streamingDFwriteStreamforeachBatch{batchDFDataFramebatchId=>batchDF格式(…)。選項“txnVersion”batchId)。選項“txnAppId”appId)。保存(…)// location 1batchDF格式(…)。選項“txnVersion”batchId)。選項“txnAppId”appId)。保存(…)// location 2}

執行流靜態連接

您可以依賴Delta Lake的事務保證和版本控製協議來執行stream-static連接。流靜態連接使用無狀態連接將Delta表(靜態數據)的最新有效版本連接到數據流。

當Databricks在流-靜態連接中處理微批數據時,來自靜態Delta表的最新有效數據版本將與當前微批中的記錄進行連接。因為連接是無狀態的,所以您不需要配置水印,並且可以以較低的延遲處理結果。連接中使用的靜態Delta表中的數據應該是緩慢變化的。

streamingDF=火花readStream表格“訂單”staticDF=火花表格“beplay体育app下载地址顧客”查詢=streamingDF加入staticDFstreamingDFcustomer_id==staticDFid“內心”writeStream選項“checkpointLocation”checkpoint_path表格“orders_with_customer_info”

Upsert從流查詢使用foreachBatch

你可以使用的組合合並foreachBatch將流查詢中的複雜upserts寫入Delta表。看到使用foreachBatch寫入任意數據接收器

這個模式有很多應用,包括:

請注意

  • 確保你的合並聲明內foreachBatch是冪等的,因為重新啟動流查詢可以對同一批數據應用多次操作。

  • 合並用於foreachBatch,流查詢的輸入數據速率(通過StreamingQueryProgress在筆記本速率圖中可見)可以報告為數據在源處生成的實際速率的倍數。這是因為合並多次讀取輸入數據,導致輸入指標相乘。如果這是一個瓶頸,您可以在此之前緩存批DataFrame合並然後將其解緩存合並

下麵的示例演示如何在foreachBatch要完成這項任務:

//使用merge將microBatchOutputDF插入Delta表的函數defupsertToDeltamicroBatchOutputDFDataFramebatchId{//設置dataframe為view namemicroBatchOutputDFcreateOrReplaceTempView“更新”//使用視圖名來應用MERGE//注意:你必須使用SparkSession來定義' updates '數據框架microBatchOutputDFsparkSessionsql”“”合並成聚合體t使用更新ON s.key = t.key當匹配時,更新集合*如果不匹配,則插入*”“”}//將流聚合查詢的輸出寫入Delta表streamingAggregatesDFwriteStream格式“δ”foreachBatchupsertToDelta_outputMode“更新”開始()
使用merge將microBatchOutputDF插入Delta表defupsertToDeltamicroBatchOutputDFbatchId):#設置數據幀為視圖名microBatchOutputDFcreateOrReplaceTempView“更新”使用視圖名稱應用MERGE#注意:你必須使用SparkSession來定義“updates”數據框架在Databricks Runtime 10.5及以下版本中,您必須使用以下命令:# microBatchOutputDF._jdf.sparkSession () . sql(“””microBatchOutputDFsparkSessionsql”“”合並成聚合體t使用更新ON s.key = t.key當匹配時,更新集合*如果不匹配,則插入*”“”#將流聚合查詢的輸出寫入Delta表streamingAggregatesDFwriteStream格式“δ”foreachBatchupsertToDeltaoutputMode“更新”開始()

你也可以選擇使用Delta Lake api來執行流upserts,如下例所示:

進口ioδ*瓦爾deltaTable=DeltaTableforPath火花“/數據/聚合物”//使用merge將microBatchOutputDF插入Delta表的函數defupsertToDeltamicroBatchOutputDFDataFramebatchId{deltaTable作為“t”合並microBatchOutputDF作為“s”),"s.key = t.key"whenMatched().updateAll()whenNotMatched().insertAll()執行()}//將流聚合查詢的輸出寫入Delta表streamingAggregatesDFwriteStream格式“δ”foreachBatchupsertToDelta_outputMode“更新”開始()
delta.tables進口*deltaTable=DeltaTableforPath火花“/數據/聚合物”使用merge將microBatchOutputDF插入Delta表defupsertToDeltamicroBatchOutputDFbatchId):deltaTable別名“t”合並microBatchOutputDF別名“s”),"s.key = t.key"whenMatchedUpdateAll()whenNotMatchedInsertAll()執行()#將流聚合查詢的輸出寫入Delta表streamingAggregatesDFwriteStream格式“δ”foreachBatchupsertToDeltaoutputMode“更新”開始()