目標
本地數據的導入和鞏固GBs / TBs 20 mb的塊鑲花文件到磚/三角洲湖/分區表。
我所做的
我把一小部分的數據,約72.5 GB和攝取使用下麵流。數據的順序和等級排序已經“id / time_bin / smaller_time_bin.parquet”。每個分區應該表示~ 12-20 GB。結果:
為什麼吹那麼大,為什麼要這麼長時間?我以前做的一個項目~ ~ 15 GB,隻花了5 - 10分鍾來攝取1 - 2大一般情況下,所以我想有一個問題我下麵閱讀代碼,否則它應該隻需要~ 30分鍾到1個小時。
注意:brotli原始壓縮,但這是不支持的三角洲緩存,所以我讓它轉換為時髦的(默認壓縮)。
代碼
從pyspark.sql。從pyspark.sql進口*類型。功能導入坳、current_timestamp expr、input_file_name regexp_extract, unix_timestamp, from_unixtime,年,月,dayofmonth,小時#下麵定義變量用於代碼file_path = " s3: / / <桶> /拚花/ < id > / * * / < time_series_data > / *。拚花“用戶名=火花。的sql(“選擇regexp_replace (current_user (),“[^ a-zA-Z0-9] ', ' _ ')”)當代()[0]table_name = f“test_small_1 checkpoint_path = f“/ tmp / {username} / _checkpoint / ingest_test_small_1”#清除數據從之前的演示執行火花。sql (f“DROP TABLE如果存在{table_name}”) dbutils.fs。rm (checkpoint_path,真)模式= StructType ([StructField (“t”, LongType ()), StructField (“x”, FloatType ()), StructField (“y”, FloatType ()), StructField (“z”, FloatType()),])(火花。readStream .format .option (“cloudFiles (“cloudFiles”)。allowOverwrites”,真的).option (“modifiedAfter”、“2022-12-21 00:00:00.000000 UTC + 0”) .option (“cloudFiles。格式”、“鋪”).option (“cloudFiles。schemaLocation”, checkpoint_path) .option (“spark.databricks.io.cache。啟用”,假).option (“mergeSchema”,“真正的”)# . schema(模式).load (file_path) .select (“*”) .withColumn (“id”, regexp_extract (input_file_name (), r年代:/ / <桶> /拚花/ ((a-zA-Z0-9) *) /。* ',1)).withColumn(“時間戳”,expr (“timestamp_micros (BIGINT (ts))”)) .withColumn(“年”,年(col(“時間戳”))).withColumn(“月”,月(col(“時間戳”))).writeStream .partitionBy .option(“年”、“月”)(“checkpointLocation”, checkpoint_path) .trigger (availableNow = True) .toTable (table_name))