@Artem Sachuk:
是的,跟蹤哪些文件被自動裝卸機加載,您可以添加input_file_name()函數作為一個新列在加載過程中數據幀。通過這種方式,您可以確切地知道哪些文件每一行的數據來源。
另外,您還可以使用自動裝卸機的檢查點機製來跟蹤哪些文件已經成功加載。檢查點機製存儲文件元數據(如文件名、大小和修改時間)在一個檢查點文件中。您可以查詢該檢查點文件,看看哪些文件已經處理,加載成功。
使檢查點,您需要指定一個檢查點目錄當您創建自動裝卸機流。例如:
checkpoint_dir自動裝彈機= =“/ mnt /檢查站”火花。readStream \ .format (cloudFiles) \ .option (“cloudFiles。checkpointLocation”, checkpoint_dir) \ .load(路徑)
檢查點目錄可以是任何在支持寫訪問的文件係統路徑。確保檢查點持久文件係統上的目錄(例如HDFS或Azure Blob存儲),而不是本地磁盤,檢查點信息需要生存驅動程序或執行器故障。
一旦目錄設置檢查站,自動裝卸機流將在該目錄創建檢查點文件,跟蹤其進展。您可以查詢使用StreamingQuery檢查點文件。recentProgress方法來獲取最新的批處理文件的信息。
例如:
查詢=自動裝卸機。writeStream \ .outputMode(“追加”)\ .format(“控製台”)\ .start()而不是查詢。isActive: time . sleep(1),查詢。isActive:進展查詢。recentProgress:如果“numInputRows”進展:打印(“{}{}行從文件加載”.format(進步(“numInputRows”),進步[" inputFiles "])) time . sleep (1) query.awaitTermination ()
這段代碼將打印的行數加載和每個批處理文件的文件名。
@Artem Sachuk:
實現著陸區清洗的方法之一是使用一個腳本中的存儲Azure SDK或工作成功後通過自動裝卸機加載的文件。
首先,您可以使用磚dbutils.fs.ls()命令獲取著陸區目錄中的文件列表。然後,使用Azure存儲SDK,可以刪除的文件已經被加載。
下麵是一個示例Python代碼片段演示了這種方法:
從azure.storage進口操作係統。blob進口BlobServiceClient #設置Azure存儲賬戶細節account_name =“your_account_name account_key”=“your_account_key”#設置你的著陸區目錄路徑landing_zone_path =“/ mnt / landing_zone”#創建BlobServiceClient對象連接到存儲賬戶connect_str =“DefaultEndpointsProtocol = https;帳號名稱= {};AccountKey = {}; EndpointSuffix = core.windows.net”.format (account_name, account_key) blob_service_client = BlobServiceClient.from_connection_string (connect_str) #的著陸區目錄中的文件列表landing_zone_files = dbutils.fs.ls (landing_zone_path) #循環的著陸區文件中的每個文件landing_zone_files: file_path =文件。路徑file_name = os.path.basename (file_path) #檢查文件是否已經被加載(您可能需要修改該邏輯根據你具體的用例)如果file_name.startswith (“loaded_”): #刪除文件從Azure存儲container_name =“your_container_name”blob_client = blob_service_client。get_blob_client(容器= container_name blob = file_name) blob_client.delete_blob從著陸區()#刪除文件目錄dbutils.fs.rm (file_path)
注意,在這個例子中,已經加載的著陸區文件被認為有前綴“loaded_”。你可能需要修改以適合您的特定用例的邏輯。確保取代account_name, account_key landing_zone_path, container_name變量用你自己的價值觀。
@Artem Sachuk:
是的,跟蹤哪些文件被自動裝卸機加載,您可以添加input_file_name()函數作為一個新列在加載過程中數據幀。通過這種方式,您可以確切地知道哪些文件每一行的數據來源。
另外,您還可以使用自動裝卸機的檢查點機製來跟蹤哪些文件已經成功加載。檢查點機製存儲文件元數據(如文件名、大小和修改時間)在一個檢查點文件中。您可以查詢該檢查點文件,看看哪些文件已經處理,加載成功。
使檢查點,您需要指定一個檢查點目錄當您創建自動裝卸機流。例如:
checkpoint_dir自動裝彈機= =“/ mnt /檢查站”火花。readStream \ .format (cloudFiles) \ .option (“cloudFiles。checkpointLocation”, checkpoint_dir) \ .load(路徑)
檢查點目錄可以是任何在支持寫訪問的文件係統路徑。確保檢查點持久文件係統上的目錄(例如HDFS或Azure Blob存儲),而不是本地磁盤,檢查點信息需要生存驅動程序或執行器故障。
一旦目錄設置檢查站,自動裝卸機流將在該目錄創建檢查點文件,跟蹤其進展。您可以查詢使用StreamingQuery檢查點文件。recentProgress方法來獲取最新的批處理文件的信息。
例如:
查詢=自動裝卸機。writeStream \ .outputMode(“追加”)\ .format(“控製台”)\ .start()而不是查詢。isActive: time . sleep(1),查詢。isActive:進展查詢。recentProgress:如果“numInputRows”進展:打印(“{}{}行從文件加載”.format(進步(“numInputRows”),進步[" inputFiles "])) time . sleep (1) query.awaitTermination ()
這段代碼將打印的行數加載和每個批處理文件的文件名。