當你處理流媒體文件自動加載器(AWS|Azure|GCP),事件記錄基於底層存儲中創建的文件。
本文向您展示如何添加每個文件名的文件路徑的新列DataFrame輸出。
一個用例是審計。當文件被吸收到分區的文件夾結構通常是有用的元數據,如時間戳,可以從審計的路徑。
例如,假設一個文件的路徑和文件名2020/2021-01-01 / file1_T191634.csv。
從這條路可以應用定製udf和使用正則表達式來提取細節(2021-01-01)日期和時間戳(T191634)。
下麵的示例代碼使用input_file_name ()得到每一行的路徑和文件名,寫一個新列命名filePath。
% scala val df = spark.readStream.format (“cloudFiles”) . schema .option (“cloudFiles(模式)。格式”、“csv”) .option (“cloudFiles.region”、“ap-south-1”) .load .withColumn(“路徑”)(“filePath input_file_name ())