你好,
我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。
(spark.readStream.format(“δ”).load (silver_path)
.writeStream
gold_checkpoint_path .option (“checkpointLocation”)
.option (“maxFilesPerTrigger”, 200年)
.trigger(一旦= True)
.foreachBatch (foreachBatchFunction)
.start ()
.awaitTermination ()
)
請建議。
問候,
桑傑
gold_checkpoint_path @Sanjay Jain,裏麵,有幾個子文件夾。
去“提交”並檢查這是最新的文件裏麵(你可以看到文件命名為1、2、3、4,.....50歲的51。文件命名數量最高的是最新的一個。假設它是60。這意味著微批60承諾。如果沒有批承諾,你將看不到文件)。
然後檢查文件在文件夾“偏移量”。看到最新的一個文件夾。,在幾乎所有情況下,您將看到一個文件的名字=最新batchID發現提交+ 1(61按照這個例子。如果沒有文件在提交,然後你會看到一個名為“0”的文件在這個文件夾)。如果你看到這種行為,這個最新的備份文件,然後刪除它。然後重新啟動工作。這應該幫助!