取消
顯示的結果
而不是尋找
你的意思是:

如何在磚自動裝卸機的過濾器文件流

kaslan
新的貢獻者二世

我想建立一個S3流使用磚自動加載程序。我已經設法建立流,但我的S3 bucket包含不同類型的JSON文件。我想過濾出來,最好是在流本身,而不是使用一個過濾器操作。

根據的文檔我應該能夠過濾使用一滴模式。但是,我似乎無法得到這個工作負載一切無論如何。

這就是我

df =(火花。readStream .format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .option("cloudFiles.schemaInference.samleSize.numFiles", 1000) .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/") .option("includeExistingFiles", "true") .option("multiLine", "true") .option("inferSchema", "true") # .option("cloudFiles.schemaHints", schemaHints) # .load("s3:///qualifier/**/*_INPUT") .load("s3:///qualifier") .withColumn("filePath", F.input_file_name()) .withColumn("date_ingested", F.current_timestamp()) )

我的文件構成的一個關鍵

限定符/版本/ YYYY-MM / DD / <名稱> _INPUT.json

,所以我想過濾文件,包含輸入名稱。這似乎負載一切:

.load (s3: / / <桶> /限定符”)

.load (s3: / / <桶> /限定符/ * * / * _INPUT”)

就是我想做的事,但這並不工作。是我一滴模式不正確,或者有別的我失蹤嗎?

6個回答6

Kaniz
社區經理
社區經理

你好@kaslan!我的名字叫Kaniz,我這裏的技術主持人。很高興認識你,謝謝你的問題!看看你的同行在社區中有一個回答你的問題。否則我將盡快給你回電。謝謝。

werners1
尊敬的貢獻者三世

據你鏈接的文檔,水珠過濾輸入通道隻有工作目錄,而不是文件本身。

所以如果你想在關於dirs過濾某些文件,您可以包括一個額外的過濾pathGlobFilter選項:

.option (“pathGlobFilter”、“* _INPUT”)

https://docs.m.eheci.com/spark/latest/structured-streaming/auto-loader-s3.html use-cloudfiles-sou……

kaslan
新的貢獻者二世

啊,忘了說,我試過。它仍然拿起其他文件當我這樣做

werners1
尊敬的貢獻者三世

奇怪,也許因為這個?:

“一滴模式將*附加到它filepath”()

或使用* _INPUT *文件過濾器。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map