好的。所以我想我可能錯過了顯而易見的,把自己在海裏。
這是場景:
這都很好地工作。
現在,我需要確定當記錄已被刪除(刪除),不再出現在批次。
SCD表,這將導致記錄的狀態改變從“當前”到“前”
我不管理為實現這一目標。
這是有關管道代碼:
#自動裝卸機拿起文件導入dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) -- extract the rows from the json (each file is a complete dataset with a few hundred rows) CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest AS SELECT col.* ,"current" AS status ,file_modification_time FROM ( SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students) FROM STREAM(LIVE.currStudents_streamFiles) AS fi ) WHERE col.id IS NOT NULL ; #run some de-duplication due to most records being identical each time import dlt from pyspark.sql.functions import lit @dlt.table def currStudents_dedup(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return ( df.dropDuplicates([col for col in df.columns if col != "file_modification_time"]) .select('*') .withColumn('status', lit('current')) ) -- capture SCD2 change history CREATE OR REFRESH STREAMING LIVE TABLE students_SCD; APPLY CHANGES INTO live.currStudents_SCD FROM STREAM(live.currStudents_dedup) KEYS (id) SEQUENCE BY file_modification_time STORED AS SCD TYPE 2 TRACK HISTORY ON * EXCEPT (file_modification_time) -- match the latest batch (from json file) against "current" version and identify missing records -- attempt to push identified records back through SCD with a new status "former" -- DLT pipeline doesn't like INSERT in the apply changes into .... CREATE TEMPORARY LIVE VIEW former_students_view AS SELECT *, "former" AS status -- all records from the last batch processed FROM ( SELECT * FROM STREAM(live.currStudents_ingest) WHERE file_modification_time = ( SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles) ) ) t1 WHERE NOT EXISTS ( -- "current" version of the table held in Databricks SELECT 1 FROM ( SELECT schoolId FROM STREAM(live.students_SCD) WHERE `__END_AT` IS NULL AND status != "former" ) t2 WHERE t1.schoolId = t2.schoolId ); APPLY CHANGES INTO live.currStudents_dedup FROM former_students_view KEYS (schoolId) INSERT ALL
所有的幫助感激地接受。
我知道我可能會用錯誤的方式。
@Kearon McNicol:
看來你想捕捉記錄已被刪除(刪除)的批處理數據集到JSON格式在Azure數據湖。你可以把一個方法是使用三角洲湖和利用其合並功能來識別和更新記錄SCD2表中的記錄的“前”的狀態,不再出現在批處理數據集。
這裏有一個例子如何修改管道代碼實現:
進口pyspark.sql。函數作為F進口dlt @dlt。表def currStudents_streamFiles():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。在ferColumnTypes", "true") .load("**/restAPI") .select("*","_metadata", "_metadata.file_modification_time") ) @dlt.table def currStudents_ingest(): df = ( spark.readStream.format("delta").table("live.currStudents_streamFiles") .select(F.explode_outer("students").alias("data"), "file_modification_time") .select("data.*", "file_modification_time") .dropDuplicates() .withColumn("status", F.lit("current")) ) return df @dlt.table def students_SCD(): df = spark.readStream.format("delta").table("live.currStudents_ingest") return df.writeStream.format("delta").table("live.students_SCD") @dlt.table def former_students_view(): latest_batch_df = ( spark.readStream.format("delta").table("live.currStudents_ingest") .groupBy().agg(F.max("file_modification_time").alias("file_modification_time")) ) current_df = ( spark.readStream.format("delta").table("live.students_SCD") .filter(F.col("__END_AT").isNull()) .filter(F.col("status") != "former") .select("schoolId") ) return ( latest_batch_df.join(current_df, F.col("schoolId") == F.col("schoolId"), "left_anti") .withColumn("status", F.lit("former")) ) students_SCD_data = students_SCD() former_students_data = former_students_view() ( students_SCD_data .merge( former_students_data, "schoolId" ) .whenMatchedUpdate( set={ "status": F.col("merge_action").getField("status") } ) .execute() )
在上麵的代碼中,我們使用三角洲湖創建表當前學生攝取,學生SCD,和前視圖。當前學生攝取表是由閱讀的數據批處理數據和刪除重複。學生SCD表是由讀取的數據表和應用從當前學生攝取SCD2處理。表是由以前的學生的觀點確定在最新一批記錄,不存在在當前學生SCD表,並將它們標記“前”的狀態。然後我們使用三角洲湖合並功能更新記錄SCD2表中的新地位。
我希望這可以幫助!