你好,
我想知道如果它是可能的目標模式,以編程方式,在一個DLT。
DLT管道設置目的地,目標模式。
我想更冪等管道運行。例如,我的目標表字段:reference_date, customer_id, val1, val2等等
之前,我想寫作,馬克斯reference_date目前目標表,所以我可以查詢合適的源表。
目前DLT不允許創建另一個表之前閱讀生活。
我可以用的火花。表”,但讓它完全自動,我想有目標模式。
target_table_name = whatevertable @dlt。表(name = target_table_name) def update_target (): max_date =(火花.table (get_dlt_target_schema () + '。' + target_table_name) .select (F.max (reference_date))) .... .... ....
謝謝你!
@Gustavo馬丁斯:
我明白了,謝謝你的澄清。在這種情況下,您可以使用spark.conf.get (“spark.sql.warehouse.dir”)方法得到默認的數據庫目錄的位置,這是典型的磚文件係統的根目錄(DBFS)。從這裏,您可以構建的路徑表的元數據文件,其中包括模式信息。
這裏有一個例子:
target_table_name = whatevertable @dlt。表(name = target_table_name) def update_target (): dbfs_root = spark.conf.get target_table_path = f (“spark.sql.warehouse.dir”)“{dbfs_root} / {target_table_name}”target_table_metadata = spark.read.format(“鋪”).load (f“{target_table_path} / _metadata”) target_table_schema = target_table_metadata.schema.simpleString () max_date =(火花.table (f”“{target_table_schema}”。{target_table_name}”) .select (F.max (reference_date)))……
這段代碼讀取_metadata文件的目標表的模式信息,然後使用它來構建完整的目標表路徑。然後,您可以使用完整的模式限定表名來訪問的表
spark.table()方法。
注意,spark.sql.warehouse。dir配置屬性可能不是設置在某些環境中,在這種情況下,您可能需要手動指定默認的根目錄數據庫。
嗨@Gustavo馬丁斯,可以得到目標模式以編程的方式在一個DLT筆記本通過閱讀使用PySpark三角洲表。
這裏有一個方法來實現這一目標通過定義一個函數,得到了目標模式的目標差值表:
從pyspark。sql進口SparkSession火花= SparkSession.builder.getOrCreate () def get_dlt_target_schema (target_table_name): #閱讀目標三角洲表target_table_df = spark.read.format(“δ”).load (target_table_name) #得到目標表的模式target_schema = target_table_df。模式返回target_schema
然後,您可以使用此函數在你update_target()函數:
從pyspark.sql。函數導入馬克斯F_max target_table_name = whatevertable @dlt。表(name = target_table_name) def update_target (): target_schema = get_dlt_target_schema (target_table_name) max_date =(火花.table (target_table_name) .select (F_max (reference_date))) #……
然而,這種方法有一個限製。因為你試圖讀相同的表使用@dlt定義。表裝飾,您可能會遇到問題,當試圖讀寫同一個表在同一管道。
一個可能的解決方案是使用一種不同的方法來查詢目標表的最大標準之外的DLT管道。你可以閱讀目標表使用PySpark或不同的方法和馬克斯參考日期作為一個參數傳遞給DLT管道。
通過這種方式,您可以避免讀寫同一個表在同一管道。
記得“whatevertable”占位符替換為δ的合適路徑表
嗨@Kaniz開羅,
謝謝你的回複。
的限製,我知道,這就是為什麼我想讀一個表“spark.table()的方法。
需要清楚的是,我的意思是什麼“模式”是:
hive_metastore。< this_schema >。< the_target_table >
但是你認為,使用“spark.table (< the_target_table >)不會工作,因為如果我不使用全名(即。,< this_schema >。< the_target_table >),它將默認模式,這不是目標模式(< this_schema >)。
因為我可以用函數current_database()”,但是,這將返回默認模式,而不是對這個DLT”的目標模式。
@Gustavo馬丁斯:
我明白了,謝謝你的澄清。在這種情況下,您可以使用spark.conf.get (“spark.sql.warehouse.dir”)方法得到默認的數據庫目錄的位置,這是典型的磚文件係統的根目錄(DBFS)。從這裏,您可以構建的路徑表的元數據文件,其中包括模式信息。
這裏有一個例子:
target_table_name = whatevertable @dlt。表(name = target_table_name) def update_target (): dbfs_root = spark.conf.get target_table_path = f (“spark.sql.warehouse.dir”)“{dbfs_root} / {target_table_name}”target_table_metadata = spark.read.format(“鋪”).load (f“{target_table_path} / _metadata”) target_table_schema = target_table_metadata.schema.simpleString () max_date =(火花.table (f”“{target_table_schema}”。{target_table_name}”) .select (F.max (reference_date)))……
這段代碼讀取_metadata文件的目標表的模式信息,然後使用它來構建完整的目標表路徑。然後,您可以使用完整的模式限定表名來訪問的表
spark.table()方法。
注意,spark.sql.warehouse。dir配置屬性可能不是設置在某些環境中,在這種情況下,您可能需要手動指定默認的根目錄數據庫。