嗨@Simon Kragh,如果你需要截斷的原始表定期節省存儲空間,你可以考慮修改DLT管道如下:
通過創建一個新的原始表,定期刪除它,你可以避免丟失曆史數據清理桌子。此外,這種方法可以幫助管理存儲空間的原始表中刪除重複數據。
或者,您可以考慮實施一項數據保留策略,從原始表中刪除舊數據基於一個指定的時間段。這種方法將使你保持一定數量的原始表中的曆史數據,同時管理存儲空間。
嗨@Simon Kragh,如果你需要保持“潔淨”的曆史數據表,DLT擴展。你必須找到另一種方法來更新“潔淨”表中的數據在不觸發一個完整的刷新。一個潛在的策略是創建一個新表,“cleansed_temp”,反映“潔淨”的模式,但與一個不同的名稱。然後,用“apply_changes_into”向“cleansed_temp”不斷移動數據。一旦數據被成功加載到“cleansed_temp”,然後您可以使用merge語句更新“潔淨”表中的數據與新數據“cleansed_temp”。
merge語句將基於一個唯一鍵匹配記錄,允許您更新現有記錄,添加新的形式,並保存曆史數據。合並完成後,您可以將“cleansed_temp”表。這種方法可以避免觸發一個完整的“潔淨”表的更新,讓你保持曆史數據。
好的,我會做一個添加額外的細節。首先:下麵的關係圖顯示了我們當前的數據流:
我們的原始表的定義是這樣的:
表=(“表1”、“表”)def generate_tables (table_name): @dlt。表(name = f 'raw_ {table_name}’, table_properties ={“質量”:“青銅”})def create_table():返回(spark.readStream.format .option (“cloudfiles (“cloudfiles”)。格式”、“鋪”).option (‘pathGlobfilter’,‘* .parquet‘) .option (“cloudFiles。useNotifications”、“真實”).option (cloudFiles。clientId’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-client-id)) .option (“cloudFiles。clientSecret’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-client-secret)) .option (“cloudFiles。connectionString’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-connection-string)) .option (“cloudFiles。resourceGroup’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-resource-group)) .option (“cloudFiles。subscriptionId’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-subscription-id)) .option (“cloudFiles。tenantId’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-tenant-id)) .option (“mergeSchema”,“真正的”).load (f 'dbfs: / mnt /生/ {table_name} / * .parquet”) .withColumn (Meta_SourceFile, input_file_name ()) .withColumn (Meta_IngestionTS, current_timestamp()))的t表:generate_tables (t)
和我們的清洗(SCD2)創建為這樣:
def generate_scd_tables (table_name、鑰匙、seq_col、exc_cols scd_type): dlt。create_streaming_live_table (f 'cleansed_ {table_name} _scd {scd_type}”, table_properties ={'三角洲。enableChangeDataFeed”:“真正的”、“pipelines.reset。允許”:“假”、“質量”:“銀'})dlt。apply_changes(目標= f 'cleansed_ {table_name} _scd {scd_type}”,源= f 'raw_ {table_name}’,鍵=鍵sequence_by =坳(seq_col) track_history_except_column_list = exc_cols #輸入必須給出一個列表,即。g (" Id ") stored_as_scd_type = scd_type) generate_scd_tables (table_name =“tabel1”鍵= [“Id”], seq_col = Meta_IngestionTS, exc_cols =[‘身份證’,‘Meta_IngestionTS’, ' Meta_SourceFile '], scd_type = 2)
由於我們收到的數據量,我們想截斷原始表定期。然而,如果我們刪除或者截斷的原始表到目前為止,整個三角洲住管道將失敗給以下eror信息:從今以後流源在191版本。這是目前不支持。如果你想忽略刪除,設置選項“ignoreDeletes”到“真正的”
但是我們如何設置這個選項?我們已經嚐試生和SCD2沒有成功。我寧願不像你建議引入一個臨時表,該表將不是實時的。