我有一個從一個數據流管道吸入json文件使用自動裝卸機湖。這些文件被定期。主要是文件包含重複數據,但偶爾會有一些變化。我想處理這些文件到一個數據倉庫使用存儲為SCD 2選項的三角洲住表管道。一切都似乎工作好,但我得到分割表中重複的行——即使沒有數據變化。
在管道集群配置,我補充道
管道。enableTrackHistory真實
這是SQL SCD表:
創建或更新直播表currStudents_SCD;應用到生活變化。currStudents_SCD從流(live.currStudents_ingest)鍵(id)序列file_modification_time存儲為SCD 2型跟蹤曆史*除了(file_modification_time);
任何想法我為什麼得到這個和我怎麼能阻止這些副本嗎?
我找不到任何信息在文檔來幫助。除了可能模糊的暗示,化合物2,將新記錄時沒有列變化。但跟蹤曆史文檔似乎表明,隻有改變監控列將導致一個新記錄....
為了清晰起見,這裏是最後的代碼,避免重複,使用@Suteja卡努裏人的建議:
進口dlt @dlt。表def currStudents_dedup (): df = spark.readStream.format(“δ”).table (live.currStudents_ingest)返回(df。dropDuplicates df([坳坳。列如果坳! = " file_modification_time "]) .select (“*”))
我很好奇,如果有一種方法可以在SQL的呢?我隻找到python文檔。
@Kearon McNicol:
是的,可以刪除重複的SQL。這裏有一個例子查詢,達到相同的結果作為Python代碼你提供:
從生活選擇不同的*。currStudents_ingest file_modification_time不是空的地方;
在這個SQL查詢,不同的關鍵字用於刪除重複的結果集。WHERE子句過濾掉任何行file_modification_time列是空的地方。
注意,這個查詢假設生活。currStudents_ingest引發環境中是一個三角洲表,你可以訪問運行SQL查詢。