表流讀取和寫入
三角洲湖深入結合火花結構化流通過readStream
和writeStream
。三角洲湖克服了許多限製通常與流媒體係統和相關文件,包括:
合並小文件產生的低延遲攝取
維護“隻有一次”的處理與不止一個流(或並發批處理作業)
有效地發現哪些文件是新當使用作為一個流的源文件
另請參閱生產磚上考慮結構的流媒體應用程序。
δ表來源
當你加載一個三角洲表作為流源和流查詢中使用它,查詢流程表中所有的數據以及任何新到達的數據流後開始。
你可以加載路徑和表作為一個流。
火花。readStream。格式(“δ”)。負載(“/ tmp /δ/事件”)進口io。δ。值得一提的。_火花。readStream。δ(“/ tmp /δ/事件”)
或
進口io。δ。值得一提的。_火花。readStream。格式(“δ”)。表(“事件”)
在本節中:
限製輸入的速度
以下選項可以控製micro-batches:
maxFilesPerTrigger
:有多少新文件被認為是在每個micro-batch。默認值是1000。maxBytesPerTrigger
在每個micro-batch:多少數據被處理。這個選項設置一個“軟馬克斯”,這意味著一個批處理過程大約這個過程的數據量,可能超過極限為了使流查詢前進情況下的最小輸入單位超過這個極限。如果你使用Trigger.Once
為你流,此選項將被忽略。這不是默認設置。
如果你使用maxBytesPerTrigger
結合maxFilesPerTrigger
,micro-batch過程數據,直到maxFilesPerTrigger
或maxBytesPerTrigger
達到極限。
請注意
當源表事務是清理由於logRetentionDuration
配置在處理流滯後,三角洲湖處理數據對應的最新交易曆史源表流但不失敗。這可能導致數據被刪除。
忽略更新和刪除
結構化流不處理不是一個附加的輸入,將拋出一個異常如果發生任何修改在桌子上被用作源。有兩個主要的策略來處理變化,不能自動下遊傳播:
您可以刪除輸出從一開始就和檢查點和重啟流。
你可以設置這兩個選擇:
ignoreDeletes
:忽略事務刪除數據分區的邊界。ignoreChanges
:如果文件必須被重寫處理文檔更新源表中的數據修改操作,比如更新
,合並成
,刪除
(分區),或覆蓋
。不變行可能仍在釋放,因此你的下遊消費者應該能夠處理重複。刪除不是下遊傳播。ignoreChanges
包容ignoreDeletes
。因此如果你使用ignoreChanges
,你流將不會被刪除或更新源表。
例子
例如,假設您有一個表user_events
與日期
,user_email
,行動
分區的列日期
。你流的user_events
你需要刪除數據表由於GDPR。
當你在分區邊界(即刪除在哪裏
分區列),文件已經分割的價值所以刪除掉這些文件的元數據。因此,如果你隻是想一些分區的刪除數據,您可以使用:
火花。readStream。格式(“δ”)。選項(“ignoreDeletes”,“真正的”)。負載(“/ tmp /δ/ user_events”)
但是,如果你必須刪除數據的基礎上user_email
,那麼您將需要使用:
火花。readStream。格式(“δ”)。選項(“ignoreChanges”,“真正的”)。負載(“/ tmp /δ/ user_events”)
如果你更新user_email
與更新
聲明中包含的文件user_email
在的問題是重寫。當你使用ignoreChanges
,新記錄與所有其他不變向下遊傳播記錄在同一個文件中。你的邏輯應該能夠處理這些輸入重複的記錄。
指定初始位置
請注意
這個特性可以在磚運行時7.3 LTS及以上。
您可以使用以下選項指定的起點三角洲湖流源沒有處理整個表。
startingVersion
:從三角洲湖版本。所有表變化從這個版本(包容)將讀取流源。你可以獲取提交的版本版本
列的描述曆史命令的輸出。在磚運行時的7.4及以上,隻返回最新的變化,指定
最新的
。startingTimestamp
:從時間戳。所有表更改後承諾或時間戳(包容)將讀取流源。之一:時間戳字符串。例如,
“2019 - 01 - 01 t00:00:00.000z”
。一個日期字符串。例如,
“2019-01-01”
。
你不能同時設置兩個選項;你可以隻使用其中的一個。他們隻有當開始一個新的流媒體查詢生效。如果一個流媒體查詢已經開始和檢查點的進展記錄,這些選項將被忽略。
重要的
雖然您可以啟動流源從一個指定的版本或時間戳、流媒體來源的模式總是最新的三角洲表的模式。你必須確保沒有不兼容模式改變δ表後指定的版本或時間戳。否則,流源讀取數據時可能會返回不正確的結果,錯誤的模式。
沒有數據被刪除過程初始快照
請注意
這個特性可以在磚運行時11.1及以上。這個特性是在公共預覽。參與預覽,請聯係您的磚的代表。
使用增量表作為流源時,數據的查詢過程首先出現在桌子上。三角洲表在這個版本被稱為初始快照。默認情況下,三角洲表的數據文件處理基於文件最後修改。然而,最後修改時間並不一定代表記錄事件的時間順序。
在有狀態流查詢定義水印,處理文件的修改時間會導致記錄處理錯誤的訂單。這可能導致晚期事件的記錄刪除水印。
你可以避免數據下降問題通過支持下列選項:
withEventTimeOrder:初始快照是否應該處理事件的時間順序。
啟用事件時間順序後,初始快照數據的事件時間範圍分為桶。每個微一桶的批處理流程過濾數據的時間範圍內。maxFilesPerTrigger和maxBytesPerTrigger配置選項仍適用於控製microbatch大小,但隻在一個近似的方法由於處理的本質。
下圖顯示了這個過程:
明顯的這一特性的信息:
數據下降問題隻有當初始增量快照的狀態在默認順序流查詢處理。
你不能改變
withEventTimeOrder
一旦流查詢開始時初始快照還是正在處理。重新啟動,withEventTimeOrder
改變,你需要刪除檢查點。如果您正在運行一個啟用了withEventTimeOrder流查詢,你不能降級到DBR版本不支持這個功能,直到初始快照處理完成。如果你需要降級,你可以等待初始快照完成,查詢或刪除檢查點和重啟。
不支持這個功能在以下常見場景:
事件時間列是一個生成的列有non-projectionδ源和水印之間的轉換。
有一個水印,不止一個δ源在流查詢。
啟用事件時間順序後,三角洲初始快照處理的性能可能較慢。
每一批微掃描初始快照來過濾數據對應的事件時間範圍內。加快過濾操作,建議使用一個δ源列事件時間,以便數據不可以應用(檢查數據不在適當的時候)。另外,表分區沿著事件時間列可以進一步加快處理。您可以檢查火花UI,看看有多少δ特定微批處理文件掃描。
三角洲表作為一個水槽
您還可以使用結構化數據寫入三角洲表流。事務日誌可以讓三角洲湖保證隻有一次處理,即使還有其他流對表或批量查詢並發運行的情況。
請注意
三角洲湖真空
函數刪除所有文件不是由三角洲湖但跳過任何目錄開始_
。您可以安全地存儲檢查點與其他數據和元數據表使用一個目錄結構如δ< table_name > / _checkpoints
。
指標
請注意
在磚運行時8.1及以上。
你可以找到的字節數和文件數量有待處理流媒體查詢流程隨著numBytesOutstanding
和numFilesOutstanding
指標。如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據選項卡中流儀表板查詢進展:
{“源”:({“描述”:“DeltaSource(文件/道路/ /源):“,“指標”:{“numBytesOutstanding”:“3456”,“numFilesOutstanding”:“8”},}]}
Append模式
默認情況下,流在附加模式下運行,向表添加新記錄。
您可以使用路徑的方法:
事件。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/ _checkpoints /”)。開始(“/δ/事件”)
事件。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。開始(“/ tmp /δ/事件”)進口io。δ。值得一提的。_事件。writeStream。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。δ(“/ tmp /δ/事件”)
或者是toTable
方法在火花3.1和更高版本(磚運行時的8.3及以上),如下所示。(在火花之前版本3.1(磚運行時8.2及以下),使用表
方法。)
事件。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。toTable(“事件”)
事件。writeStream。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。toTable(“事件”)
完整的模式
您還可以使用結構化流與每一批替換整個表。用例的一個例子是使用聚合計算一個總結:
(火花。readStream。格式(“δ”)。負載(“/ tmp /δ/事件”)。groupBy(“customerId”)。數()。writeStream。格式(“δ”)。outputMode(“完整的”)。選項(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”)。開始(“/ tmp /δ/ eventsByCustomer”))
火花。readStream。格式(“δ”)。負載(“/ tmp /δ/事件”)。groupBy(“customerId”)。數()。writeStream。格式(“δ”)。outputMode(“完整的”)。選項(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”)。開始(“/ tmp /δ/ eventsByCustomer”)
前麵的例子不斷更新一個表,其中包含客戶總數量的事件。
更寬鬆的延遲需求的應用程序,你可以節省計算資源與一次性觸發器。使用這些更新總結聚合表在給定的時間表,隻處理新數據到達自上次更新。
冪等表中寫道foreachBatch
請注意
在磚運行時8.4及以上。
命令foreachBatch允許您指定一個函數執行後的輸出每個micro-batch流查詢任意轉換。這使得作為補充foreachBatch
函數可以寫micro-batch輸出到一個或多個目標三角洲表目的地。然而,foreachBatch
不會讓那些寫冪等作為寫嚐試缺乏信息是否重新執行批處理。例如,重新運行失敗的批處理可能導致重複數據寫道。
為了解決這個問題,δ表支持以下DataFrameWriter
的選項,使得冪等寫道:
txnAppId
:一個獨一無二的字符串,您可以通過在每個DataFrame
寫。例如,您可以使用StreamingQuery IDtxnAppId
。txnVersion
:一個單調遞增數字作為事務的版本。
δ表使用的組合txnAppId
和txnVersion
識別重複的寫和忽略它們。
如果一批寫中斷故障,運行批處理使用相同的應用程序和批處理ID,這將有助於正確運行時識別重複的寫和忽略它們。應用程序ID (txnAppId
)可以是任何用戶生成唯一的字符串,不需要流相關ID。
警告
如果你刪除流檢查點和重啟查詢新的關卡,你必須提供一個不同的appId
;否則,寫的重新啟動查詢將被忽略,因為它將包含相同的txnAppId
和批處理ID將從0開始。
相同的DataFrameWriter
選項可以用來實現在非冪等寫工作。有關詳細信息,冪等寫道。
例子
app_id=…#一個獨一無二的字符串用作應用程序ID。defwriteToDeltaLakeTableIdempotent(batch_df,batch_id):batch_df。寫。格式(…)。選項(“txnVersion”,batch_id)。選項(“txnAppId”,app_id)。保存(…)#位置1batch_df。寫。格式(…)。選項(“txnVersion”,batch_id)。選項(“txnAppId”,app_id)。保存(…)#位置2
瓦爾appId=…/ /一個獨一無二的字符串用作應用程序ID。streamingDF。writeStream。foreachBatch{(batchDF:DataFrame,batchId:長)= >batchDF。寫。格式(…)。選項(“txnVersion”,batchId)。選項(“txnAppId”,appId)。保存(…)/ /位置1batchDF。寫。格式(…)。選項(“txnVersion”,batchId)。選項(“txnAppId”,appId)。保存(…)/ /位置2}
執行stream-static連接
你可以依賴於事務擔保和三角洲湖執行版本控製協議stream-static連接。stream-static加入加入最新的有效版本增量表(靜態數據)的數據流使用無狀態連接。
當磚過程數據的micro-batch stream-static加入,從靜態三角洲的最新有效版本數據表連接的記錄出現在當前micro-batch。因為連接是無狀態的,你不需要配置水印和可以處理結果與低延遲。靜態三角洲表中的數據使用的加入應該不常更改。
streamingDF=火花。readStream。表(“訂單”)staticDF=火花。讀。表(“beplay体育app下载地址顧客”)查詢=(streamingDF。加入(staticDF,streamingDF。customer_id= =staticDF。id,“內心”)。writeStream。選項(“checkpointLocation”,checkpoint_path)。表(“orders_with_customer_info”))