取消
顯示的結果
而不是尋找
你的意思是:

如何讓自動裝卸機加載成功後刪除文件

附庸風雅的
新的貢獻者二世

嗨,所有

你能請告訴我如何安排裝載文件刪除從Azure存儲在其成功通過自動裝卸機負載嗎?我理解,火花自動裝卸機流“cleanSource”選項不可用,所以我試圖找到最好的辦法實現著陸區清洗。

1接受解決方案

接受的解決方案

匿名
不適用

@Artem Sachuk:

是的,跟蹤哪些文件被自動裝卸機加載,您可以添加input_file_name()函數作為一個新列在加載過程中數據幀。通過這種方式,您可以確切地知道哪些文件每一行的數據來源。

另外,您還可以使用自動裝卸機的檢查點機製來跟蹤哪些文件已經成功加載。檢查點機製存儲文件元數據(如文件名、大小和修改時間)在一個檢查點文件中。您可以查詢該檢查點文件,看看哪些文件已經處理,加載成功。

使檢查點,您需要指定一個檢查點目錄當您創建自動裝卸機流。例如:

checkpoint_dir自動裝彈機= =“/ mnt /檢查站”火花。readStream \ .format (cloudFiles) \ .option (“cloudFiles。checkpointLocation”, checkpoint_dir) \ .load(路徑)

檢查點目錄可以是任何在支持寫訪問的文件係統路徑。確保檢查點持久文件係統上的目錄(例如HDFS或Azure Blob存儲),而不是本地磁盤,檢查點信息需要生存驅動程序或執行器故障。

一旦目錄設置檢查站,自動裝卸機流將在該目錄創建檢查點文件,跟蹤其進展。您可以查詢使用StreamingQuery檢查點文件。recentProgress方法來獲取最新的批處理文件的信息。

例如:

查詢=自動裝卸機。writeStream \ .outputMode(“追加”)\ .format(“控製台”)\ .start()而不是查詢。isActive: time . sleep(1),查詢。isActive:進展查詢。recentProgress:如果“numInputRows”進展:打印(“{}{}行從文件加載”.format(進步(“numInputRows”),進步[" inputFiles "])) time . sleep (1) query.awaitTermination ()

這段代碼將打印的行數加載和每個批處理文件的文件名。

在原帖子查看解決方案

5回複5

匿名
不適用

@Artem Sachuk:

實現著陸區清洗的方法之一是使用一個腳本中的存儲Azure SDK或工作成功後通過自動裝卸機加載的文件。

首先,您可以使用磚dbutils.fs.ls()命令獲取著陸區目錄中的文件列表。然後,使用Azure存儲SDK,可以刪除的文件已經被加載。

下麵是一個示例Python代碼片段演示了這種方法:

從azure.storage進口操作係統。blob進口BlobServiceClient #設置Azure存儲賬戶細節account_name =“your_account_name account_key”=“your_account_key”#設置你的著陸區目錄路徑landing_zone_path =“/ mnt / landing_zone”#創建BlobServiceClient對象連接到存儲賬戶connect_str =“DefaultEndpointsProtocol = https;帳號名稱= {};AccountKey = {}; EndpointSuffix = core.windows.net”.format (account_name, account_key) blob_service_client = BlobServiceClient.from_connection_string (connect_str) #的著陸區目錄中的文件列表landing_zone_files = dbutils.fs.ls (landing_zone_path) #循環的著陸區文件中的每個文件landing_zone_files: file_path =文件。路徑file_name = os.path.basename (file_path) #檢查文件是否已經被加載(您可能需要修改該邏輯根據你具體的用例)如果file_name.startswith (“loaded_”): #刪除文件從Azure存儲container_name =“your_container_name”blob_client = blob_service_client。get_blob_client(容器= container_name blob = file_name) blob_client.delete_blob從著陸區()#刪除文件目錄dbutils.fs.rm (file_path)

注意,在這個例子中,已經加載的著陸區文件被認為有前綴“loaded_”。你可能需要修改以適合您的特定用例的邏輯。確保取代account_name, account_key landing_zone_path, container_name變量用你自己的價值觀。

附庸風雅的
新的貢獻者二世

非常感謝,@Suteja卡努裏人。

我想我唯一需要的是了解哪些文件被加載,對吧?

我能以某種方式查詢自動裝卸機檢查站,或者我需要跟蹤文件含有類似input_file_name()被添加到我的數據?

匿名
不適用

@Artem Sachuk:

是的,跟蹤哪些文件被自動裝卸機加載,您可以添加input_file_name()函數作為一個新列在加載過程中數據幀。通過這種方式,您可以確切地知道哪些文件每一行的數據來源。

另外,您還可以使用自動裝卸機的檢查點機製來跟蹤哪些文件已經成功加載。檢查點機製存儲文件元數據(如文件名、大小和修改時間)在一個檢查點文件中。您可以查詢該檢查點文件,看看哪些文件已經處理,加載成功。

使檢查點,您需要指定一個檢查點目錄當您創建自動裝卸機流。例如:

checkpoint_dir自動裝彈機= =“/ mnt /檢查站”火花。readStream \ .format (cloudFiles) \ .option (“cloudFiles。checkpointLocation”, checkpoint_dir) \ .load(路徑)

檢查點目錄可以是任何在支持寫訪問的文件係統路徑。確保檢查點持久文件係統上的目錄(例如HDFS或Azure Blob存儲),而不是本地磁盤,檢查點信息需要生存驅動程序或執行器故障。

一旦目錄設置檢查站,自動裝卸機流將在該目錄創建檢查點文件,跟蹤其進展。您可以查詢使用StreamingQuery檢查點文件。recentProgress方法來獲取最新的批處理文件的信息。

例如:

查詢=自動裝卸機。writeStream \ .outputMode(“追加”)\ .format(“控製台”)\ .start()而不是查詢。isActive: time . sleep(1),查詢。isActive:進展查詢。recentProgress:如果“numInputRows”進展:打印(“{}{}行從文件加載”.format(進步(“numInputRows”),進步[" inputFiles "])) time . sleep (1) query.awaitTermination ()

這段代碼將打印的行數加載和每個批處理文件的文件名。

@Suteja卡努裏人謝謝!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map