由於三角洲住表為您處理的並行性,我會使用一個元數據表,定義了一些變量,讀這些東西,和遍曆的dict三角洲生活表一樣:
”“”假設:1:Dict名叫metadata_dict database_name, schema_name, table_name, file_path, data_format, load_type 2:數據攝入從數據庫- >雲存儲是一個單獨的過程隻覆蓋目的地或附加目的地3:你已經有一個會話作用域連接雲存儲或使用掛載點”“def create_delta_table (database_name: str, schema_name: str, table_name: str, file_path: str, data_format: str, load_type: str) - > DataFrame:“”“帶表元數據包括文件路徑、模式、表名和利用自動裝卸機要麼刪除/重新加載或δ表附加到目的地。注意——三角洲住表當前不支持統一名稱空間目錄三個水平,hive_metastore目錄將被使用。參數:database_name (str):源數據庫名稱的字符串值schema_name (str):源模式名稱的字符串值table_name (str):一個字符串值的源表名file_path (str):一個字符串值定義在雲存儲的表數據所在(s3, gcp adls) data_format (str):雲存儲數據格式的字符串值(json、拚花等)load_type (str):一個字符串值接受(“重新加載”,“添加”)的回報:火花dataframe名叫database_name__schema_name__table_name加載到目標模式中定義的DLT三角洲格式”“管道”accepted_load_types =(“重新加載”,“添加”)如果load_type不在accepted_load_types:提高ValueError (f“負載類型{load_type}不接受負載類型(“重新加載”,“添加”)“destination_table_name = database_name +“_”+ schema_name +“_”+ table_name如果load_type = =“重載”:@dlt.table (name = destination_table_name) def create_or_reload_delta_table (): df =(火花.read .format (f“{data_format}”) .load (file_path)) #做額外的轉換#返回最終的df返回df如果load_type = =“追加”:@dlt.table (name = destination_table_name) def create_or_append_delta_table (): df =(火花.readStream .format .option (“cloudFiles (“cloudFiles”)。f格式”,“{data_format}”) #添加額外的選項就像inferColumnTypes或schemaEvolutionMode .load (file_path)) #做額外的轉換#返回最終的df返回df if __name__ = = " __main__”:在metadata_dict表:database_name =表(“database_name”) schema_name =表(“schema_name”) table_name =表(“table_name”) file_path =表(“file_path”) data_format =表(“data_format”) load_type =表(“load_type”) create_delta_table (database_name = database_name schema_name = schema_name table_name = table_name file_path = file_path data_format = data_format load_type = load_type)