我有一個dms任務處理全負荷和複製正在進行的任務
從源(該)目標(AWS S3)
然後使用三角洲湖處理中心日誌
我一個筆記本,將數據插入該軟件不斷與id(主鍵)
然後從該軟件直接刪除一些記錄
三角洲湖工作完全當隻有插入記錄
但不刪除的
它會有一些隨機記錄被刪除,適用於三角洲湖
為例。
我第一次插入10000條記錄
三角洲湖應用插入動作完美
但是當我刪除一些記錄(id > 8000)在三角洲湖從該軟件和統計記錄
它不會正確運用(計數結果> 8000)
不知道我錯過了任何一步
謝謝
這個圖
代碼
1。連續插入工作
#磚筆記本”“源”的這個腳本用於數據加載到sql server數據庫。”““命令- - - - - - - - - - - # #魔法% pip安裝攤販#命令- - - - - - - - - - -從攤販從pyspark進口偽裝者。sql進口SparkSession,函數,從pyspark.sql DataFrame。類型進口StructField、StructType DecimalType從輸入導入列表#命令- - - - - - - - - - - = SparkSession.builder火花。\瀏覽器名稱(“ExamplePySparkSubmitTask”)。\ config (“spark.databricks.hive.metastore.glueCatalog。啟用”、“真正的”)。\ enableHiveSupport ()。\ getOrCreate() #命令- - - - - - - - - - -攤販=騙子()#命令- - - - - - - - - - - def create_sample_profiles (_faker:騙子()):““這功能是創建示例概要文件”“資料= [_faker.profile _()的範圍(1000)]返回資料#命令- - - - - - - - - - - def read_profiles_as_dataframe (_spark: SparkSession _profiles:列表[dict]):“”這個函數就是配置文件讀dataframe”“df = _spark \ .createDataFrame (_profiles)返回df #命令- - - - - - - - - - - def process_data (_df: dataframe _count: int):““這功能是過程數據與需求”“processed_df = _df \ .coalesce (1) \ .withColumn (“id”, (1000 * _count) + (functions.monotonically_increasing_id () + 1)) \ .withColumn(“年齡”,functions.round (functions.rand () * 130)) \ .withColumn(緯度,functions.col (current_location._1)) \ .withColumn(經度,functions.col (current_location._2)) \ .withColumn(“網站”功能。concat_ws(', ', '網站'))\ .drop (current_location)返回processed_df #命令- - - - - - - - - - - def write_dataframe_to_sql_server (_df: DataFrame):”“這個函數是寫DataFrame sql server”“jdbc_options = {“url”:“< url >”,“用戶”:“管理”,“密碼”:“<密碼>”,“數據表”:“概要”}_df \ .write \ .format (jdbc) \ .options (* * jdbc_options) \ .mode(附加)\ .save() #命令- - - - - - - - - - - def主要(_count: int):““這是主要功能”“試題:資料= create_sample_profiles (_faker =攤販)df = read_profiles_as_dataframe (_spark =火花,_profiles =概要文件)processed_df = process_data (_df = df, _count = _count) write_dataframe_to_sql_server (_df = processed_df)除了:提高#命令- - - - - - - - - - - if __name__ = =“__main__”:主要(0)#命令- - - - - - - - - - -進口時間數= 0(計數< 5):打印(計數)主要(_count = count)計數+ = 1 #命令- - - - - - - - - - -
2。DLT工作
#磚筆記本”“源”這個腳本是處理三角洲湖”“#命令- - - - - - - - - - -從pyspark.sql進口dlt。功能導入坳,- - - - - - - - - - - @dlt expr #命令。表(name = " profiles_changes "臨時= True) # @dlt。期望(“有效的時代”,“年齡> 0和年齡< = 100 ")def profiles_changes():模式=“Op字符串,cdc_load_timestamp時間戳,地址字符串,生日日期、公司字符串,字符串,工作郵件字符串,字符串名稱,住所字符串,ssn字符串,用戶名的字符串,字符串,網站id長,年齡兩倍,緯度小數(38歲,18),經度小數(38歲,18)的返回火花\ .readStream \ .format (cloudFiles) \ .option (“cloudFiles。格式”、“鋪”)\ .option (“cloudFiles。schemaHints”模式). schema(模式)\ \ .load (s3: / / < replication_bucket > / sql_server_repication / dbo / profiles / * / * / * / * / * .parquet”) - - - - - - - - - - - dlt #命令。create_streaming_live_table (name =“配置文件”,路徑=“s3: / / < staging_bucket > /臨時/ profiles /”) - - - - - - - - - - - dlt #命令。apply_changes(目標=“配置文件”,源=“profiles_changes”鍵= (" id "), sequence_by =坳(“cdc_load_timestamp”), apply_as_deletes = expr (“Op = ' D '”), apply_as_truncates = expr (“Op = T”), except_column_list =[“人事處”、“cdc_load_timestamp”], stored_as_scd_type = 1)
#觀察結果
不適用三角洲湖CDC日誌正確(在該計數和三角洲湖)
#預期結果
三角洲湖應該正確應用中心日誌(在該計數和三角洲湖)
#環境信息