取消
顯示的結果
而不是尋找
你的意思是:

在DLT ignoreDeletes管道

梅花鹿
新的貢獻者二世

你好,

所以我有一個DLT管道:

原始- >清洗(SCD2) - >策劃。

“生”是利用自動裝卸機,從datalake反複閱讀文件。這些文件可以包含大量重複,導致我們的原始表變得很大。因此,我們想截斷原始定期保存存儲大小。

我添加了“pipelines.reset。允許“:”假“潔淨的表,以確保我們不失去我們的曆史變化,我也試過添加ignoreDeletes參數原始和潔淨的沒有任何成功。

我該如何組織我的管道,如果我希望能夠定期刪除原始表嗎?

4回複4

Kaniz
社區經理
社區經理

嗨@Simon Kragh,如果你需要截斷的原始表定期節省存儲空間,你可以考慮修改DLT管道如下:

  1. 創建一個新表來存儲原始數據。
  2. 使用自動裝卸機連續數據加載到新的原始表。
  3. 建立一個流程定期將數據從原始表移動到潔淨的表。
  4. 使用工具或腳本定期截斷後的原始表的數據已經成功加載到潔淨的表。

通過創建一個新的原始表,定期刪除它,你可以避免丟失曆史數據清理桌子。此外,這種方法可以幫助管理存儲空間的原始表中刪除重複數據。

或者,您可以考慮實施一項數據保留策略,從原始表中刪除舊數據基於一個指定的時間段。這種方法將使你保持一定數量的原始表中的曆史數據,同時管理存儲空間。

梅花鹿
新的貢獻者二世

嗨Kaniz,

這就是我今天。我們利用“apply_changes_into”,不斷將數據移動到潔淨了。你建議的方法將不會工作,因為它會觸發一個警告,刪除源表中的已發現,和DLT僅追加。這將執行一個完整的更新將截斷潔淨表我們寬鬆的所有曆史數據在哪裏?

嗨@Simon Kragh,如果你需要保持“潔淨”的曆史數據表,DLT擴展。你必須找到另一種方法來更新“潔淨”表中的數據在不觸發一個完整的刷新。一個潛在的策略是創建一個新表,“cleansed_temp”,反映“潔淨”的模式,但與一個不同的名稱。然後,用“apply_changes_into”向“cleansed_temp”不斷移動數據。一旦數據被成功加載到“cleansed_temp”,然後您可以使用merge語句更新“潔淨”表中的數據與新數據“cleansed_temp”。

merge語句將基於一個唯一鍵匹配記錄,允許您更新現有記錄,添加新的形式,並保存曆史數據。合並完成後,您可以將“cleansed_temp”表。這種方法可以避免觸發一個完整的“潔淨”表的更新,讓你保持曆史數據。

梅花鹿
新的貢獻者二世

好的,我會做一個添加額外的細節。首先:下麵的關係圖顯示了我們當前的數據流:

dbworkflow

我們的原始表的定義是這樣的:

表=(“表1”、“表”)def generate_tables (table_name): @dlt。表(name = f 'raw_ {table_name}’, table_properties ={“質量”:“青銅”})def create_table():返回(spark.readStream.format .option (“cloudfiles (“cloudfiles”)。格式”、“鋪”).option (‘pathGlobfilter’,‘* .parquet‘) .option (“cloudFiles。useNotifications”、“真實”).option (cloudFiles。clientId’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-client-id)) .option (“cloudFiles。clientSecret’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-client-secret)) .option (“cloudFiles。connectionString’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-connection-string)) .option (“cloudFiles。resourceGroup’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-resource-group)) .option (“cloudFiles。subscriptionId’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-subscription-id)) .option (“cloudFiles。tenantId’, dbutils.secrets。get (SECRET_SCOPE db-autoloader-tenant-id)) .option (“mergeSchema”,“真正的”).load (f 'dbfs: / mnt /生/ {table_name} / * .parquet”) .withColumn (Meta_SourceFile, input_file_name ()) .withColumn (Meta_IngestionTS, current_timestamp()))的t表:generate_tables (t)

和我們的清洗(SCD2)創建為這樣:

def generate_scd_tables (table_name、鑰匙、seq_col、exc_cols scd_type): dlt。create_streaming_live_table (f 'cleansed_ {table_name} _scd {scd_type}”, table_properties ={'三角洲。enableChangeDataFeed”:“真正的”、“pipelines.reset。允許”:“假”、“質量”:“銀'})dlt。apply_changes(目標= f 'cleansed_ {table_name} _scd {scd_type}”,源= f 'raw_ {table_name}’,鍵=鍵sequence_by =坳(seq_col) track_history_except_column_list = exc_cols #輸入必須給出一個列表,即。g (" Id ") stored_as_scd_type = scd_type) generate_scd_tables (table_name =“tabel1”鍵= [“Id”], seq_col = Meta_IngestionTS, exc_cols =[‘身份證’,‘Meta_IngestionTS’, ' Meta_SourceFile '], scd_type = 2)

由於我們收到的數據量,我們想截斷原始表定期。然而,如果我們刪除或者截斷的原始表到目前為止,整個三角洲住管道將失敗給以下eror信息:從今以後流源在191版本。這是目前不支持。如果你想忽略刪除,設置選項“ignoreDeletes”到“真正的”

但是我們如何設置這個選項?我們已經嚐試生和SCD2沒有成功。我寧願不像你建議引入一個臨時表,該表將不是實時的。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map