取消
顯示的結果
而不是尋找
你的意思是:

如何限製在每一批數量的文件流批處理

桑傑
價值貢獻

你好,

我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。

(spark.readStream.format(“δ”).load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.option (“maxFilesPerTrigger”, 200年)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ()

)

請建議。

問候,

桑傑

20個回複20.

Sandeep
貢獻者三世

gold_checkpoint_path @Sanjay Jain,裏麵,有幾個子文件夾。

去“提交”並檢查這是最新的文件裏麵(你可以看到文件命名為1、2、3、4,.....50歲的51。文件命名數量最高的是最新的一個。假設它是60。這意味著微批60承諾。如果沒有批承諾,你將看不到文件)。

然後檢查文件在文件夾“偏移量”。看到最新的一個文件夾。,在幾乎所有情況下,您將看到一個文件的名字=最新batchID發現提交+ 1(61按照這個例子。如果沒有文件在提交,然後你會看到一個名為“0”的文件在這個文件夾)。如果你看到這種行為,這個最新的備份文件,然後刪除它。然後重新啟動工作。這應該幫助!

桑傑
價值貢獻

這似乎是手動步驟,有什麼方法我可以這自動再處理該文件是否有更新了這個文件。

werners1
尊敬的貢獻者三世

這聽起來更像是三角洲湖的數據提要功能變化。

https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed

Vidula_Khanna
主持人
主持人

嗨@Sanjay耆那教徒的

希望一切進行得很順利。

隻是想檢查如果你能解決你的問題。如果是的,你會很高興的答案標記為最好,其他成員可以找到解決方案更快嗎?如果不是,請告訴我們,我們可以幫助你。

幹杯!

嗨Vidula,

上麵的解決方案是不工作。請建議其他的解決方案。

問候,

桑傑

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map