你好,
我運行批處理作業流程的文件。我試圖限製在每個批處理文件數量所以添加maxFilesPerTrigger選項。但它不工作。它處理所有輸入文件。
(spark.readStream.format(“δ”).load (silver_path)
.writeStream
gold_checkpoint_path .option (“checkpointLocation”)
.option (“maxFilesPerTrigger”, 200年)
.trigger(一旦= True)
.foreachBatch (foreachBatchFunction)
.start ()
.awaitTermination ()
)
請建議。
問候,
桑傑
@Sanjay Jain抱歉錯過了一件事。.trigger(一旦= True)不支持速率限製器。您可以使用.trigger (availableNow = True)。
裁判:https://docs.m.eheci.com/structured-streaming/triggers.html configuring-incremental-batch-process……
spark.readStream.format(“δ”)
.option (“maxFilesPerTrigger”, 200年)
.load (silver_path)
.writeStream
gold_checkpoint_path .option (“checkpointLocation”)
.trigger (availableNow = True)
.foreachBatch (foreachBatchFunction)
.start ()