取消
顯示的結果
而不是尋找
你的意思是:

如何限製在每一批數量的文件流批處理

桑傑
價值貢獻

你好,

我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。

(spark.readStream.format(“δ”).load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.option (“maxFilesPerTrigger”, 200年)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ()

)

請建議。

問候,

桑傑

20個回複20.

werners1
尊敬的貢獻者三世

我想我發現了問題。

maxfilespertrigger選項被設置在源,而不是在水槽(你)。

試著移動加載前的選擇語句. .

所以readstream.option () .load ()……

桑傑
價值貢獻

仍把所有1000文件。

(spark.readStream.format(“δ”)。選項(“maxFilesPerTrigger”, 100) .load (silver_path)

.writeStream

gold_checkpoint_path .option (“checkpointLocation”)

.trigger(一旦= True)

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ()

)

werners1
尊敬的貢獻者三世

spark.readStream.format(“δ”)

.option (“maxFilesPerTrigger”、“100”)

.load(<表>)

.writeStream

.format(“δ”)

.outputMode(“追加”)

.option (“checkpointLocation”、“…”)

.table(<表>)

桑傑
價值貢獻

對不起,沒有這方麵的專家。但是如何處理我的自定義代碼。

.foreachBatch (foreachBatchFunction)

.start ()

.awaitTermination ()

werners1
尊敬的貢獻者三世

對不起,這基本上是隻有部分負載()是很重要的。

也嚐試輸入文件的數量作為字符串而不是int。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map