你好社區:)。
我目前使用DLT實施一些管道。他們正在對我medalion架構json降落在青銅- >銀(使用apply_changes)然後物化金護的看法。
然而,我試圖創建一個新的銀表合並到它從現有銀表。我隻想插入一列(我們稱之為id1),然後我想設置一個第二列(id2)等於我的源列id1。
所以本質上的數據將如下所示:
來源:id =
- >合並
目標:id1 =一個id2 =
這是可能的與apply_changes嗎?
我已經能夠實現傳統上使用deltaTables .merge:
silver_match.alias (“t”)。合並(silver_supporter.alias (“s”),“s。id1 = t。source_supporter_id”)。whenMatchedUpdate(設置= {“source_supporter_id”:“s.id1”})。whenNotMatchedInsert(值= {“source_supporter_id”:“s。id1”、“cbi_id”:“s。cbi_id "}) . execute ()
如果這不是支持使用apply_changes,有沒有推薦的方法(或一般建議)能夠加載我的dlt管道這個表嗎?如果不是,誰有經驗的混合dlt和non-dlt表管道運行。
任何幫助或建議,我可以獲得更多的信息會大大apprecitated !
謝謝,
羅比
@Robert皮爾斯:
可以達到預期的行為在磚使用apply_changes三角洲湖。您可以使用合並操作將數據從源到目標三角洲表,然後使用whenMatchedUpdate更新id2列等於id1列在源數據。
下麵是一個示例代碼片段:
從三角洲。表導入* #定義源和目標表source_table =“bronze_table target_table”=“silver_table”#讀取源數據作為DataFrame source_df = spark.read.table (source_table) #創建一個為目標表target_delta_table = DeltaTable DeltaTable對象。forPath(火花,target_table) #將數據從源表合並到目標表target_delta_table.alias (“t”)。合並(source_df.alias (“s”),“s。id = t。id1”)。whenMatchedUpdate(設置= {" id2”:“s。id "})。whenNotMatchedInsert(值= {“id1”:“s。id”、“id2”:“s。id "})。execute()
這段代碼會將bronze_table源表的數據合並到silver_table目標表。的
whenMatchedUpdate條款更新id2列等於id1列在源數據,和whenNotMatchedInsert條款插入新行id1和id2列設置為
在源數據id列。
如果你有任何問題或需要進一步的指導,磚文檔三角洲湖的合並
操作和apply_changes函數可以是非常有益的。
@Robert皮爾斯:
可以達到預期的行為在磚使用apply_changes三角洲湖。您可以使用合並操作將數據從源到目標三角洲表,然後使用whenMatchedUpdate更新id2列等於id1列在源數據。
下麵是一個示例代碼片段:
從三角洲。表導入* #定義源和目標表source_table =“bronze_table target_table”=“silver_table”#讀取源數據作為DataFrame source_df = spark.read.table (source_table) #創建一個為目標表target_delta_table = DeltaTable DeltaTable對象。forPath(火花,target_table) #將數據從源表合並到目標表target_delta_table.alias (“t”)。合並(source_df.alias (“s”),“s。id = t。id1”)。whenMatchedUpdate(設置= {" id2”:“s。id "})。whenNotMatchedInsert(值= {“id1”:“s。id”、“id2”:“s。id "})。execute()
這段代碼會將bronze_table源表的數據合並到silver_table目標表。的
whenMatchedUpdate條款更新id2列等於id1列在源數據,和whenNotMatchedInsert條款插入新行id1和id2列設置為
在源數據id列。
如果你有任何問題或需要進一步的指導,磚文檔三角洲湖的合並
操作和apply_changes函數可以是非常有益的。