如何在磚和刪除文件列表更快

學習如何和刪除文件列表在磚更快。

寫的亞當Pavlacka

去年發表在:2022年5月31日

場景

假設您需要刪除一個分區表一年,,日期,地區,服務。然而,桌子上是巨大的,將會有1000左右部分每個分區的文件。你能列出在每個分區的所有文件,然後刪除它們使用Apache火花工作。

例如,假設您有一個表分區的,b和c:

% scala Seq ((1、2、3、4、5), (2、3、4、5、6), (3、4、5、6、7), (4、5、6、7、8)) .toDF (“a”、“b”、“c”,“d”,“e”) .write.mode .partitionBy(“覆蓋”)(“a”、“b”、“c”) .parquet (“/ mnt /道路/表”)

列表文件

您可以使用這個函數所有的部分文件列表:

% scala org.apache.hadoop.conf進口。配置進口org.apache.hadoop.fs。{路徑,文件係統}進口org.apache.spark.deploy.SparkHadoopUtil org.apache.spark.sql.execution.datasources進口。InMemoryFileIndeximport java.net.URI def listFiles(basep: String, globp: String): Seq[String] = { val conf = new Configuration(sc.hadoopConfiguration) val fs = FileSystem.get(new URI(basep), conf) def validated(path: String): Path = { if(path startsWith "/") new Path(path) else new Path("/" + path) } val fileCatalog = InMemoryFileIndex.bulkListLeafFiles( paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))), hadoopConf = conf, filter = null, sparkSession = spark, areRootPaths=true) // If you are using Databricks Runtime 6.x and below, // remove  from the bulkListLeafFiles function parameter. fileCatalog.flatMap(_._2.map(_.path)) } val root = "/mnt/path/table" val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*" val files = listFiles(root, globp) files.toDF("path").show()
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | |路徑+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | dbfs: / mnt /道路/表/ = 1 / b = 2 / c = 3 / - 00000部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——5.。拚花| | dbfs: / mnt /道路/表/ = 2 / b = 3 / c = 4 /部分- 00001 tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——6.。拚花| | dbfs: / mnt /道路/表/ = 3 / b = 4 / c - 00002 = 5 /部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——7.。拚花| | dbfs: / mnt /道路/表/ = 4 / b = 5 / - 00003 c = 6 /部分tid - 444 - afa32967 b7db - 5046671251912249212 e - b895 d12d68c05500 c000.snappy——8.。拚花| + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +

listFiles函數接受一個基地路徑和一個一團路徑作為參數,掃描的文件和匹配一團模式,然後返回所有匹配葉文件作為一個字符串序列。

還使用效用函數的函數globPathSparkHadoopUtil包中。這個函數列出所有在一個目錄路徑指定的前綴,並沒有進一步列表葉孩子(文件)。路徑傳入的列表InMemoryFileIndex.bulkListLeafFiles方法,這是一個引發內部API分布式文件清單。

這兩個清單效用函數單獨工作。通過結合他們你可以得到你想要的頂級目錄列表列出使用globPath函數,它將運行在司機,您可以分發清單為所有孩子離開的頂級目錄引發工人使用bulkListLeafFiles

周圍的加速可以根據Amdahl法則20-50x更快。原因是,您可以很容易地控製一團文件路徑根據實際物理布局和控製的並行性spark.sql.sources.parallelPartitionDiscovery.parallelismInMemoryFileIndex

刪除文件

當你從一個非托管表刪除文件或分區,您可以使用磚效用函數dbutils.fs.rm。這個函數利用原生雲存儲文件係統API,這是所有文件操作的優化。然而,您不能刪除一個巨大的表直接使用dbutils.fs.rm(“路徑/ / /表”)

你可以使用上麵的腳本列表文件有效。對於小表,收集路徑的文件刪除適合司機內存,那麼您可以使用一個火花工作分發文件刪除任務。

對於巨大的表,即使對於一個頂級分區,文件路徑的字符串表示不能適應司機記憶。解決這個問題最簡單的方法是遞歸地收集內部分區的路徑,並行路徑列表,刪除它們。

% scala scala.util進口。{嚐試,成功,失敗}def刪除(p:字符串):單位= {dbutils.fs.ls (p) . map (_.path) .toDF。= > dbutils.fs.rm foreach{文件(文件(0)。toString, true) println (s“刪除文件:$文件”)}}最後def walkDelete(根:字符串)(水平:Int):單位= {dbutils.fs.ls(根). map (_.path)。為each { p => println(s"Deleting: $p, on level: ${level}") val deleting = Try { if(level == 0) delete(p) else if(p endsWith "/") walkDelete(p)(level-1) // // Set only n levels of recursion, so it won't be a problem // else delete(p) } deleting match { case Success(v) => { println(s"Successfully deleted $p") dbutils.fs.rm(p, true) } case Failure(e) => println(e.getMessage) } } }

同時確保代碼刪除內部分區被刪除的分區是足夠小。它通過每個級別的分區遞歸搜索,隻有開始刪除當它擊中級別設置。例如,如果你想開始刪除頂級分區,使用walkDelete(根)(0)。火花將會刪除所有的文件dbfs: / mnt /道路/表/ = 1 /,然後刪除= 2 /…/模式後,直到精疲力竭。

火花的工作分配刪除任務使用刪除功能上麵,清單文件dbutils.fs.ls與假設子分區的數量在這個級別很小。你也可以通過替換更有效率dbutils.fs.ls函數與listFiles功能上麵,隻有輕微的修改。

總結

這兩種方法突出的方法清單和刪除的表。他們用一些火花效用函數和函數具體磚環境。即使你不能直接使用它們,您可以創建自己的效用函數在一個類似的方式解決這個問題。

這篇文章有用嗎?