我想建立一個S3流使用磚自動加載程序。我已經設法建立流,但我的S3 bucket包含不同類型的JSON文件。我想過濾出來,最好是在流本身,而不是使用一個過濾器操作。
根據的文檔我應該能夠過濾使用一滴模式。但是,我似乎無法得到這個工作負載一切無論如何。
這就是我
df =(火花。readStream .format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .option("cloudFiles.schemaInference.samleSize.numFiles", 1000) .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/") .option("includeExistingFiles", "true") .option("multiLine", "true") .option("inferSchema", "true") # .option("cloudFiles.schemaHints", schemaHints) # .load("s3:///qualifier/**/*_INPUT") .load("s3:///qualifier") .withColumn("filePath", F.input_file_name()) .withColumn("date_ingested", F.current_timestamp()) )
我的文件構成的一個關鍵
限定符/版本/ YYYY-MM / DD / <名稱> _INPUT.json
,所以我想過濾文件,包含輸入名稱。這似乎負載一切:
.load (s3: / / <桶> /限定符”)
和
.load (s3: / / <桶> /限定符/ * * / * _INPUT”)
就是我想做的事,但這並不工作。是我一滴模式不正確,或者有別的我失蹤嗎?
據你鏈接的文檔,水珠過濾輸入通道隻有工作目錄,而不是文件本身。
所以如果你想在關於dirs過濾某些文件,您可以包括一個額外的過濾pathGlobFilter選項:
.option (“pathGlobFilter”、“* _INPUT”)
https://docs.m.eheci.com/spark/latest/structured-streaming/auto-loader-s3.html use-cloudfiles-sou……