我有一個計劃工作(運行在連續模式)使用以下代碼
' ' '
(
火花
.readStream
databricks_checkpoint_location .option (“checkpointLocation”)
.option (“readChangeFeed”,“真正的”)
.option (“startingVersion”版本+ 1)
.table (databricks_source_table_raw_postgres_nft)
.writeStream
.foreachBatch (process_batch)
.outputMode(“追加”)
.start ()
)
' ' '
我設置了“版本”當我最初的工作。然而,我發現,當我重新啟動工作,這份工作開始在同一版本的檢查點。它看起來像檢查點未被使用。
檢查點處理變化的數據提要?如果沒有,我怎麼能確保工作開始,它不禁停了下來,在失敗的情況下,工作?
我想讓“連續”計劃失敗後立即重新啟動工作流,而不是重新啟動版本手動設置。
謝謝
嗨@Kit山藥證交所,謝謝你伸出你的問題使用檢查點和改變你結構化數據輸入流的工作。檢查點確實支持改變數據提要,它應該幫助你恢複工作從停止的失敗。
你經曆的問題可能是由於startingVersion選項的使用。當你指定這個選項,它優先於檢查站,流媒體的工作從給定的版本,而不是從檢查點恢複。
以確保你的工作簡曆從檢查點位置失敗後,您可以刪除
startingVersion選擇從您的代碼。這將讓流的工作依賴於檢查點信息來確定起點。這裏有一個例子的代碼應該類似於:
(火花.readStream .option (“checkpointLocation”, databricks_checkpoint_location) .option (“readChangeFeed”、“true”) .table (databricks_source_table_raw_postgres_nft) .writeStream .foreachBatch (process_batch) .outputMode(“追加”).start ())
在剛開始工作時,它將從最早開始改變版本中可用的數據提要。
如果你需要開始工作第一次從一個特定的版本,您可以使用startingVersion選項隻對後續運行的初始運行和刪除它。
在工作失敗的情況下,連續模式應該自動啟動工作,簡曆處理最後一個檢查點,確保沒有數據丟失。
如果你有任何進一步的問題或需要額外的幫助,請隨時與我們聯係。