當我試著設置“pathGlobFilter”自動裝卸機工作,它似乎過濾掉一切。
桶/目錄設置
“s3a: / / my_bucket / level_1_dir / level_2_dir / < some_name > /一/二/ < the_files_i_want_to_load > '
所以我想要的是能夠提供一個列表的名稱來加載數據。這些目錄都將共享相同的子目錄結構,和所有的文件(可能有任意擴展和命名約定)將兩個目錄下。
以下是我目前最好的嚐試加載這些目錄的內容。我隻是想每個文件的全部內容加載到單個列在我dataframe——這一部分工作沒有過濾器。
' ' '
MY_S3_PATH = " s3a: / / my_bucket / level_1_dir level_2_dir /”
名稱=[“愛麗絲”、“bob”、“馬洛裏”)
模式= f”/ {{{', ' . join(名)}}}/一/二/ *”
流= (
spark.readStream.format (“cloudFiles”)
. schema (StructType ([StructField(“價值”,StringType(),真的))))
.option (“cloudFiles。格式”、“文本”)
.option (“wholeText”,真的)
.option (“cloudFiles.fetchParallelism”,
include_patterns .option (“pathGlobFilter”)
)
stream.load .writeStream (MY_S3_PATH)。選項(queryName my_loader_query) .trigger (availableNow = True) .toTable (my_table)
' ' '
當我運行,流初始化和運行,但沒有數據處理。它似乎是過濾掉一切(當我刪除過濾器,文件加載像我期望)。
我尋找一個解決,但也理解我在哪裏可以尋找什麼實際上是被發現/過濾信息。