取消
顯示的結果
而不是尋找
你的意思是:

過程批次流管道,確認刪除

Kearon
新的貢獻者三世

好的。所以我想我可能錯過了顯而易見的,把自己在海裏。

這是場景:

  1. 批處理數據集到json格式的Azure數據湖
  2. 每一批是一套完整的“當前”記錄(完整的表)
  3. 這些處理中使用自動裝卸機流管道(因為我們有其他相關流程運行流數據管道,因為自動裝卸機使生活簡單)
  4. 重複數據刪除從提取行
  5. 變化捕獲通過內置的DLT SCD2處理

這都很好地工作。

現在,我需要確定當記錄已被刪除(刪除),不再出現在批次。

SCD表,這將導致記錄的狀態改變從“當前”到“前”

我不管理為實現這一目標。

這是有關管道代碼:

#自動裝卸機拿起文件導入dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) -- extract the rows from the json (each file is a complete dataset with a few hundred rows) CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest AS SELECT col.* ,"current" AS status ,file_modification_time FROM ( SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students) FROM STREAM(LIVE.currStudents_streamFiles) AS fi ) WHERE col.id IS NOT NULL ; #run some de-duplication due to most records being identical each time import dlt from pyspark.sql.functions import lit @dlt.table def currStudents_dedup(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return ( df.dropDuplicates([col for col in df.columns if col != "file_modification_time"]) .select('*') .withColumn('status', lit('current')) ) -- capture SCD2 change history CREATE OR REFRESH STREAMING LIVE TABLE students_SCD; APPLY CHANGES INTO live.currStudents_SCD FROM STREAM(live.currStudents_dedup) KEYS (id) SEQUENCE BY file_modification_time STORED AS SCD TYPE 2 TRACK HISTORY ON * EXCEPT (file_modification_time) -- match the latest batch (from json file) against "current" version and identify missing records -- attempt to push identified records back through SCD with a new status "former" -- DLT pipeline doesn't like INSERT in the apply changes into .... CREATE TEMPORARY LIVE VIEW former_students_view AS SELECT *, "former" AS status -- all records from the last batch processed FROM ( SELECT * FROM STREAM(live.currStudents_ingest) WHERE file_modification_time = ( SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles) ) ) t1 WHERE NOT EXISTS ( -- "current" version of the table held in Databricks SELECT 1 FROM ( SELECT schoolId FROM STREAM(live.students_SCD) WHERE `__END_AT` IS NULL AND status != "former" ) t2 WHERE t1.schoolId = t2.schoolId ); APPLY CHANGES INTO live.currStudents_dedup FROM former_students_view KEYS (schoolId) INSERT ALL

所有的幫助感激地接受。

我知道我可能會用錯誤的方式。

11日回複11

匿名
不適用

@Kearon McNicol:

看來你想捕捉記錄已被刪除(刪除)的批處理數據集到JSON格式在Azure數據湖。你可以把一個方法是使用三角洲湖和利用其合並功能來識別和更新記錄SCD2表中的記錄的“前”的狀態,不再出現在批處理數據集。

這裏有一個例子如何修改管道代碼實現:

進口pyspark.sql。函數作為F進口dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("**/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) @dlt.table def currStudents_ingest(): df = ( spark.readStream.format("delta").table("live.currStudents_streamFiles") .select(F.explode_outer("students").alias("data"), "file_modification_time") .select("data.*", "file_modification_time") .dropDuplicates() .withColumn("status", F.lit("current")) ) return df @dlt.table def students_SCD(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return df.writeStream.format("delta").table("live.students_SCD") @dlt.table def former_students_view(): latest_batch_df = ( spark.readStream.format("delta").table("live.currStudents_ingest") .groupBy().agg(F.max("file_modification_time").alias("file_modification_time")) ) current_df = ( spark.readStream.format("delta").table("live.students_SCD") .filter(F.col("__END_AT").isNull()) .filter(F.col("status") != "former") .select("schoolId") ) return ( latest_batch_df.join(current_df, F.col("schoolId") == F.col("schoolId"), "left_anti") .withColumn("status", F.lit("former")) ) students_SCD_data = students_SCD() former_students_data = former_students_view() ( students_SCD_data .merge( former_students_data, "schoolId" ) .whenMatchedUpdate( set={ "status": F.col("merge_action").getField("status") } ) .execute() )

在上麵的代碼中,我們使用三角洲湖創建表當前學生攝取,學生SCD,和前視圖。當前學生攝取表是由閱讀的數據批處理數據和刪除重複。學生SCD表是由讀取的數據表和應用從當前學生攝取SCD2處理。表是由以前的學生的觀點確定在最新一批記錄,不存在在當前學生SCD表,並將它們標記“前”的狀態。然後我們使用三角洲湖合並功能更新記錄SCD2表中的新地位。

我希望這可以幫助!

Kearon
新的貢獻者三世

@Suteja卡努裏人。謝謝你再一次。我現在檢查你的回複。

我意識到我離開湖賬戶信息在我的代碼,我複製數據。雖然這不是一個主要的風險,你介意把它從你的回複好嗎?謝謝你!

匿名
不適用

@Kearon McNicol:當然,完成,幹杯

Kearon
新的貢獻者三世

嗨@Suteja卡努裏人,

再次感謝。

我得到這個錯誤:

AttributeError:“數據集”對象沒有屬性“合並”,地圖(),(),地圖地圖列表(),(),())

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map