取消
顯示的結果
而不是尋找
你的意思是:

火花流——隻在流媒體處理新的文件路徑?

Michael_Galli
因素二世

在流媒體工作,我們目前在一個目錄上運行流(cloudFiles格式)與銷售交易每5分鍾。

在這個目錄中,事務是下令在下列格式:

< streaming-checkpoint-root > / < transaction_date > / < transaction_hour > / transaction_x_y.json

隻有今天的交易感興趣的,其他所有已經過時了。

當我開始流的工作,它將處理所有曆史交易,´我不希望的。

隻可能以某種方式來處理新文件進來後流已經開始工作?

1接受解決方案

接受的解決方案

Michael_Galli
因素二世

更新:

maxFileAge似乎不是一個好主意。以下的選項“includeExistingFiles”= False解決了我的問題:

streaming_df = (

spark.readStream.format (“cloudFiles”)

.option (“cloudFiles。格式”,擴展)

.option (“cloudFiles。maxFilesPerTrigger”, 20)

.option (“cloudFiles。在cludeExistingFiles", False)

.option(“多行”,真的)

.option (“pathGlobfilter”、“*”。+擴展)\

. schema(模式).load (streaming_path)

)

在原帖子查看解決方案

3回複3

Michael_Galli
因素二世

看來,“maxFileAge”解決問題。

streaming_df = (

spark.readStream.format .option (“cloudFiles (“cloudFiles”)。”、“json格式”)\

.option (“maxFilesPerTrigger”, 20) \

.option \(“多行”,真正的)

.option (“maxFileAge”, 1) \

. schema(模式).load (streaming_path)

)

這忽略了文件超過1周。

但如何忽略文件超過1天嗎?

Hubert_Dudek1
尊敬的貢獻者三世

是的完全cloudFiles.maxFileAge請選擇你的答案是最好的:slightly_smiling_face:

Michael_Galli
因素二世

更新:

maxFileAge似乎不是一個好主意。以下的選項“includeExistingFiles”= False解決了我的問題:

streaming_df = (

spark.readStream.format (“cloudFiles”)

.option (“cloudFiles。格式”,擴展)

.option (“cloudFiles。maxFilesPerTrigger”, 20)

.option (“cloudFiles。在cludeExistingFiles", False)

.option(“多行”,真的)

.option (“pathGlobfilter”、“*”。+擴展)\

. schema(模式).load (streaming_path)

)

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map