我試圖複製現有火花DLT的管道。
我不使用DLT能夠實現預期的結果。
目前管道:
源設置:CSV文件使用SCP攝取在青銅
頻率:每月一次
青銅dir: / cntdlt /青銅/電磁脈衝/年= 2022/月= 1
銀:讀青銅和執行一些轉換和寫在銀同一個分區列
val df = spark.read.option (…) . csv (s“{basepath} /年= ${一}/月= ${月}”)。withColumn(“邏輯”、“”)火花。sql (s“創建數據庫如果不存在美元dbName”) spark.conf.set (“spark.sql.sources.partitionOverwriteMode”、“動態”)= val分區列表(“年”、“月”)df.write.partitionBy(分區:_ *).mode (saveMode) .option(“路徑”,silverpath) .saveAsTable (“$ {dbName}。${表}”)火花。sql (s”msck修理表dbName。美元的表”)
場景:源數據考慮一些問題,如果我想重新加載數據,一些上個月在我火花pipleine設置我隻是運行特定的年和月的工作價值(由airlfow提供)
因為spark.conf.set (“spark.sql.sources.partitionOverwriteMode”、“動態”)
它更新數據在分區處理。
我試圖使用DLT管道實現類似的設置。
DLT設置
從pyspark.sql進口dlt。函數從pyspark.sql進口*。從datetime類型導入*從pyspark進口日期。sql導入函數F模式= ' id int \日期日期\工資int, \城市字符串,\名稱字符串,int歲\ \年int, int \月”@dlt。create_table(評論= "原始的員工,從/ databricks-datasets攝取。”diggibyte_emp table_properties = {”。”:“青銅”、“pipelines.autoOptimize質量。管理”:“真正的”、“三角洲。appendOnly”:“false”}) def t_emp_raw():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式,json) .option (“partitionColumns”、“年、月”).option (“cloudFiles。allowOverwrites”、“真實”). schema(模式).load (“/ mnt / cntdlt /銅/電磁脈衝/ '))@dlt。create_table(評論= "的清洗emp reocrd分區,月”,partition_cols =“年”、“月”,table_properties = {" diggibyte_emp。質量”:“銀”、“pipelines.autoOptimize。管理”:“真正的”、“三角洲。appendOnly”:“false”}) def t_emp_silver():返回(dlt.readStream .withColumn (“t_emp_raw”) (“load_date”,當前日期()))
管道設計
{" id ": " 8496 fc69-5ee3-4d61-9db5-47a38f130785”、“集群”:[{“標簽”:“默認”,“num_workers”: 1}],“發展”:真的,“連續”:假的,“渠道”:“當前”,“版”:“先進”、“光子”:假的,“庫”:[{“筆記本”:{“路徑”:“/演示/ notebook_employe}}],“名字”:“monthly_load_employe”,“存儲”:“dbfs: /管道/ 8496 fc69-5ee3-4d61-9db5-47a38f130785”,“目標”:“org”}
DLT管道:
我已經運行管道當青銅隻有1分區。
/ cntdlt /青銅/電磁脈衝/年= 2022/月= 1(包括10個記錄)
導致
t_emp_raw為分區年= 2022/月= 1 = 10記錄
t_empl_silver為分區年= 2022/月= 1 = 10記錄
添加銅源中的數據
/ cntdlt /青銅/電磁脈衝/年= 2022/月= 2(包含10個記錄)
結果
t_emp_raw為分區年= 2022/月= 2 = 10記錄
t_empl_silver為分區年= 2022/月= 2 = 10記錄
現在考慮一下,源發送更新的數據進行1個月了
我的分區01需要更新和覆蓋
/ cntdlt /青銅/電磁脈衝/年= 2022/月= 1(包括5條記錄)
預期的結果
t_emp_raw為分區年= 2022/月= 1 /:5條記錄
t_empl_silver為分區年= 2022/月= 1 /:5條記錄
實際結果
t_emp_raw為分區年= 2022/月= 1 /:15記錄
t_empl_silver為分區年= 2022/月= 1 /:15記錄
我怎麼能實現分區更新時的覆蓋?
好的,我會的。謝謝你幫助我。我想知道如何在DLT管道覆蓋分區,我發現我的答案在這裏。我很高興今天我在線尋找你的帖子的時候,我也發現了https://writinguniverse.com/free-essay-examples/graphic-design/網站,我發現作業方案,現在我不會低分的論文作業。