取消
顯示的結果
而不是尋找
你的意思是:

獲取文件信息在使用“觸發工作當得到新的文件”https://docs.m.eheci.com/workflows/jobs/file-arrival-triggers.html

nikhil1991
新的貢獻者二世

我目前想使用此功能的“觸發工作新文件到達時”我的一個項目。我有一個s3 bucket中,隨機文件到達天。所以我創建了一個工作,並設置觸發“文件到來”類型。在s3的筆記本我試圖讀取位置如下:

df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / < bucket_name > / <子文件夾> / "))

工作時觸發一個新文件的到來。但是當新文件到它讀取以前的文件。我隻是想讀取新文件,並將它附加到任何現有的表。

有什麼辦法文件名,這樣我可以使用下麵的代碼隻讀取新文件:

file_name = dbutils.widgets.get (“file_name”) df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / / < bucket_name > / < folder_name > / < file_1.csv >”))

或者有其他方法來解決它。嗎?

1接受解決方案

接受的解決方案

匿名
不適用

@Nikhil Kumawat:

是的,你可以讓新來的文件的名稱使用filePaths DataFrame()方法,傳遞給筆記本。這個方法返回一個對應的路徑列表文件,添加了自上次觸發器。

下麵是一個示例代碼片段展示了如何得到新文件的名稱:

#從DataFrame獲取文件路徑列表file_paths = df.input_file_name() #獲得新文件的名稱new_file_path = file_paths [1] new_file_name = new_file_path.split(“/”)[1] #新文件加載到DataFrame df_new = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option (“9”, ", ") .load (new_file_path))

在這個代碼片段,df是DataFrame傳遞到筆記本的觸發器。input_file_name()方法返回一個包含文件路徑DataFrame列的每一行。通過調用file_paths[1],你可以得到最後的路徑(新來的)文件。分割(“/”)[1]調用提取文件名的路徑。最後,您可以使用該文件名加載DataFrame新文件。

請注意,如果你想要新文件附加到一個現有的表,您可以簡單地使用

模式(“追加”)選項當加載文件:

df_new.write.format(“δ”).mode(“追加”).saveAsTable (“my_table”)

這將對現有my_table表添加新數據。

在原帖子查看解決方案

5回複5

匿名
不適用

@Nikhil Kumawat:

是的,你可以讓新來的文件的名稱使用filePaths DataFrame()方法,傳遞給筆記本。這個方法返回一個對應的路徑列表文件,添加了自上次觸發器。

下麵是一個示例代碼片段展示了如何得到新文件的名稱:

#從DataFrame獲取文件路徑列表file_paths = df.input_file_name() #獲得新文件的名稱new_file_path = file_paths [1] new_file_name = new_file_path.split(“/”)[1] #新文件加載到DataFrame df_new = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option (“9”, ", ") .load (new_file_path))

在這個代碼片段,df是DataFrame傳遞到筆記本的觸發器。input_file_name()方法返回一個包含文件路徑DataFrame列的每一行。通過調用file_paths[1],你可以得到最後的路徑(新來的)文件。分割(“/”)[1]調用提取文件名的路徑。最後,您可以使用該文件名加載DataFrame新文件。

請注意,如果你想要新文件附加到一個現有的表,您可以簡單地使用

模式(“追加”)選項當加載文件:

df_new.write.format(“δ”).mode(“追加”).saveAsTable (“my_table”)

這將對現有my_table表添加新數據。

nikhil1991
新的貢獻者二世

由於@Suteja卡努裏人的回複。

一個問題,我怎麼能把這個dataframe觸發的工作?還是我丟失的東西。

我嚐試以下方法:

df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / < bucket_name > / <子文件夾> / "))

然後我試著使用input_file_name()方法在這個dataframe像下麵但它給了我錯誤說“沒有這樣的方法”:

input_file_method讓我知道如果我遺漏了什麼東西?

我把筆記本供參考,引發的工作:

匿名
不適用

@Nikhil Kumawat:

似乎你想使用input_file_name DataFrame()函數,這是不可能的。input_file_name()函數是一個輸入文件元數據函數,可以使用隻有一個結構化流DataFrame正在處理的文件的名稱。

如果你想通過DataFrame觸發工作,你可以考慮寫DataFrame文件並把文件路徑作為參數傳遞給觸發的工作。這裏有一個例子如何編寫DataFrame CSV文件和通過文件路徑作為參數:

df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / < bucket_name > / <子文件夾> / "))#寫DataFrame csv文件output_path = " s3: / < bucket_name > / <子文件夾> /輸出。csv”df.write.format (csv)。選項(“標題”,真的).save (output_path) #文件路徑作為參數傳遞給工作引發trigger_args = {“output_path”: output_path} #觸發響應=客戶的工作參數。start_trigger (Name = < trigger_name >,參數= trigger_args)

在上麵的例子中,我們在寫DataFrame CSV文件使用DataFrame寫()方法,傳遞output_path輸出文件路徑。然後我們創建一個字典trigger_args參數名稱和值通過觸發器的工作。最後,我們與start_trigger觸發工作()方法的AWS膠客戶,通過觸發器名稱和參數。

Vidula_Khanna
主持人
主持人

嗨@Nikhil Kumawat

謝謝你發布你的問題在我們的社區!我們很高興幫助你。

幫助我們為您提供最準確的信息,請您花一些時間來回顧反應和選擇一個最好的回答了你的問題嗎?

這也將有助於其他社區成員可能也有類似的問題在未來。謝謝你的參與,讓我們知道如果你需要任何進一步的援助!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map