得到所有文件最後修改時間在自動加載程序和批處理作業

定義一個UDF列出所有文件路徑和為每一個返回的最後修改時間。

寫的DD沙瑪

去年發表在:2022年12月1日

您正在運行一個流的工作自動加載器(AWS|Azure|GCP),想要每個文件的最後修改時間存儲賬戶。

指令

得到文件的路徑被自動加載程序文章描述了如何得到所有文件的文件名和路徑被自動加載程序。在本文中,我們建立在這一基礎上,並使用示例代碼演示如何使用一個定製的UDF,然後提取文件的最後修改時間。

  1. 首先定義您的進口和變量。您需要定義< storage-base-path >,以及< input-dir >,< output-dir >您正在使用。
    進口org.apache.hadoop.conf。配置進口org.apache.hadoop.fs。{FileStatus、文件係統、路徑}org.apache.spark.sql.functions進口。{input_file_name坳,udf, from_unixtime} org.apache.spark.sql.types進口。_ val basePath = " < storage-base-path > " val inputLocation = basePath + " < input-dir > " val outputLocation = basePath +“< output-dir >”
  2. 對於本例,我們需要生成示例數據並將其存儲在DataFrame。在實際用例中,您將讀取數據存儲桶。
    進口org.apache.spark.sql.types。_ val sampleData = Seq(行(1,“詹姆斯”,“M”, 1000年)、行(1,“邁克爾”,20歲的“F”, 2000年)、行(2,“羅伯特”,30歲的“M”, 3000年)、行(2“瑪麗亞”40歲的“F”, 4000年)、行(3,“珍”,50歲的“M”, 5000)) val sampleSchema = StructType(數組(StructField (“id”, IntegerType,真的),StructField(“名字”,StringType,真),StructField(“時代”,IntegerType,真的),StructField(“性別”,StringType,真的),StructField(“工資”,IntegerType,真)))val df = spark.createDataFrame (sc.parallelize (sampleData) sampleSchema) df.coalesce (1) .write.format(“鋪”)。partitionBy (“id”,“年齡”).mode(“追加”).save (inputLocation);spark.read.format(“鋪”).load (inputLocation) .count ();
  3. 創建一個定製的UDF列出所有文件的存儲路徑和返回每個文件的最後修改時間。
    val getModificationTimeUDF = udf((路徑:字符串)= > {val finalPath =新路徑(路徑)val fs = finalPath.getFileSystem(參看)如果(fs.exists (finalPath)) {fs。.head listStatus(新路徑(路徑))。其他getModificationTime}{1 / /或其他價值基於業務決定}})
  4. 應用UDF的批處理作業。UDF返回每個文件的最後修改時間在UNIX格式。將其轉換成一個人類可讀的格式除以1000,然後丟的時間戳
    val df = spark.read.format(“鋪”).load (inputLocation) .withColumn (“filePath input_file_name ()) .withColumn (“fileModificationTime getModificationTimeUDF (col (“filePath”))) .withColumn (“fileModificationTimestamp from_unixtime(美元/ 1000“fileModificationTime”,“yyyy-MM-dd HH: mm: ss”) .cast (TimestampType)。as(“時間戳”)).drop (“fileModificationTime”)顯示(df)
  5. 應用UDF汽車加載程序流的工作。
    val自衛隊= spark.readStream.format (“cloudFiles”) . schema (sampleSchema) .option (“cloudFiles。格式”、“鋪”).option (“cloudFiles。includeExistingFiles”、“真實”).option (“cloudFiles。connectionString”, connectionString) .option (“cloudFiles。resourceGroup”, resourceGroup) .option (“cloudFiles。subscriptionId”, subscriptionId) .option (“cloudFiles。tenantId”, tenantId) .option (“cloudFiles。clientId”, clientId) .option (“cloudFiles。clientSecret”, clientSecret) .option (“cloudFiles。useNotifications”、“真實”).load (inputLocation) .withColumn (“filePath input_file_name ()) .withColumn (“fileModificationTime getModificationTimeUDF (col (“filePath”))) .withColumn (“fileModificationTimestamp from_unixtime (“fileModificationTime”/ 1000美元,yyyy-MM-dd HH: mm: ss) .cast (TimestampType)。as(“時間戳”)).drop (“fileModificationTime”)顯示(sdf)


回顧一下,input_file_name ()是用來讀取文件絕對路徑,包括文件名。然後,我們創建一個定製的UDF列出所有文件的存儲路徑。你可以得到文件的最後修改時間從每個文件但它是列在UNIX時間格式。UNIX時間格式轉換成可讀的格式UNIX時間除以1000,將它轉換為一個時間戳。




這篇文章有用嗎?