我們使用DLT管道磚工作區由微軟Azure平台失敗的間歇性和不清楚原因。Beplay体育安卓版本
管道如下:
spark.readStream.format(“δ”)。選項(“mergeSchema”,“真正的”)。選項(“ignoreChanges”,“真正的”).load (topic_name)
dlt.create_streaming_live_table (…)
dlt.apply_changes (
目標= target_table_name,
源= f“序列({topic_name})”,
鍵=(“關鍵”),
sequence_by =坳(“序列”),
stored_as_scd_type = " 1 "
)
至此,管道工作或不工作,根據周的日子(月球階段),輸入數據是相同的!它可能工作幾天,然後發出一個錯誤:“
org.apache.spark.sql.streaming。StreamingQueryException:查詢MAIN_FLOW_MOVES [id = e8a4577a - 5 - d1a 4 - bfb 9801 - 8 f47c8534f05 runId = b9364adb-a8d1-4ccf-af30-f00ad5b66520]終止與例外:發現一個數據更新(例如部分- 00004 - 7397764 - 432 - 48 - a1 - - c - a104 - 03192 - f199def c000.snappy.parquet)在源表版本3。這是目前不支持。如果你想忽略更新,設置選項“ignoreChanges”到“真正的”。如果你想反映的數據更新,請重啟該查詢以全新的關卡目錄。
要求完成SCD 1型滿意:每個唯一鍵都有獨特的序列號和沒有出現衝突或不一致的可能性。我們已經檢查很多次。
正如我所說,相同的管道工作或不工作沒有任何變化和不穩定行為的原因還不清楚。就我而言,所有條件正確並持續完成SCD 1型攝入已經滿足。
請幫助我們理解這種不穩定行為的原因。這影響我們的能力去忍受這個應用程序。
我想完成一個簡單的scd-type1放在桌上。因此,表鍵和序列號。其他數據元素的有很多,但我認為這隻是一起兜風,這是scd 1型的目的,獲取最新的更新。
就像我說的我做了驗證,每個鍵/序列號相結合是好的,是有多個序列號/關鍵和他們不重複,序列號都是連續的。目標是拿起按最高序列號最新更新。
現在,當我考慮這個操作,似乎一致和可靠的邏輯,但問題是,似乎失敗有時DLT管道。
可能我缺失或不理解的東西。
可能這不是錯誤的來源,但問題是,這是我想做什麼,它就會失敗。
但是你能確認我的邏輯,我上麵列出是正確的。
如果是,那麼一切都是必要的對我來說理解實現SCD DLT 1型。
請讓我知道我錯了
我很樂意帶您經曆實際的管道,但這需要一個在線會議。
管道組成的簡單幾個步驟(見上圖)。
按照我原來的錯誤消息,如果你能澄清這意味著什麼,當它發生時,它抱怨什麼,我將非常感謝。此刻我無法想象出了什麼問題。沒有理解問題,很難找到一個解決方案。
你能告訴我你的目標表多一點呢?我知道你把....但是:
create_streaming_live_table (name = " <表名稱>”,評論= " <評論> " spark_conf ={:“<鍵> <價值”、“<關鍵”:“<價值>”},table_properties ={<鍵>:<價值>,<鍵>:<價值>”},partition_cols =(“<劃分字段>”、“<劃分字段>”),路徑= " < storage-location-path >”,模式= "模式定義”)
目標表的任何信息你能給我一張嗎?
我想完成一個簡單的scd-type1放在桌上。因此,表鍵和序列號。其他數據元素的有很多,但我認為這隻是一起兜風,這是scd 1型的目的,獲取最新的更新。
就像我說的我做了驗證,每個鍵/序列號相結合是好的,是有多個序列號/關鍵和他們不重複,序列號都是連續的。目標是拿起按最高序列號最新更新。
現在,當我考慮這個操作,似乎一致和可靠的邏輯,但問題是,似乎失敗有時DLT管道。
可能我缺失或不理解的東西。
可能這不是錯誤的來源,但問題是,這是我想做什麼,它就會失敗。
但是你能確認我的邏輯,我上麵列出是正確的。
如果是,那麼一切都是必要的對我來說理解實現SCD DLT 1型。
請讓我知道我錯了
我很樂意帶您經曆實際的管道,但這需要一個在線會議。
管道組成的簡單幾個步驟(見上圖)。
按照我原來的錯誤消息,如果你能澄清這意味著什麼,當它發生時,它抱怨什麼,我將非常感謝。此刻我無法想象出了什麼問題。沒有理解問題,很難找到一個解決方案。
塊1 (AECTM):
@dlt.view (
評論=評論如果評論不是沒有別人f”define_key ({key_expr})”,
name = target_table_name
)
def key_func ():
df =沒有
如果在topic_name: / /:
如果流媒體:
df = spark.readStream.format(“δ”)。選項(“mergeSchema”,“真正的”)。選項(“ignoreChanges”,“真正的”).load (topic_name)
其他:
df = spark.read.format(“δ”)。選項(“mergeSchema”,“真正的”)。選項(“ignoreChanges”,“真正的”).load (topic_name)
其他:
如果流媒體:
df = dlt.readStream (topic_name)
其他:
df = dlt.read (topic_name)
df返回。withColumn(“關鍵”,eval (key_expr))
在第二三塊(& scd_type1順序):
target_table_name如果別名不是沒有別人f =別名“scd_type1 ({topic_name}, {sequence_col})”
dlt.create_streaming_live_table (
評論=評論如果評論不是沒有別人f”scd_type1 ({topic_name}, {sequence_col})”,
name = target_table_name
)
@dlt.table (
評論=評論如果評論不是沒有別人f“序列({topic_name})”,
名字= f”序列({topic_name})”
)
@dlt。expect_or_drop(“零鍵”,上校(“關鍵”).isNotNull ())
def seq_view ():
如果流媒體:
返回dlt.readStream (topic_name)。withColumn(“序列”,eval (sequence_col))
其他:
返回dlt.read (topic_name)。withColumn(“序列”,eval (sequence_col))
dlt.apply_changes (
目標= target_table_name,
源= f“序列({topic_name})”,
鍵=(“關鍵”),
sequence_by =坳(“序列”),
stored_as_scd_type = " 1 "
)
塊4(過濾視圖):
target_table_name如果別名不是沒有別人f =別名“過濾器”({filter_cond})
@dlt.view (
評論=評論如果評論不是沒有別人f“過濾器({filter_cond})”,
name = target_table_name
)
蔣春暄對於費馬大定理def ():
df = dlt.readStream (topic_name)如果其他流dlt.read (topic_name)
返回df.filter (eval (filter_cond))
塊5 (MAIN_FLOW_MOVES):
target_table_name如果別名不是沒有別人f =別名“main_flow_moves ({topic_name})”
def getn (position_info水平):
如果position_info不是沒有:
如果position_info。級別= =:
返回position_info.name
其他:
返回getn (position_info。家長,級別)
其他:
回來沒有
udfGetName = udf (getn)
@dlt.table (
評論=評論如果評論不是沒有別人f”main_flow_moves ({topic_name})”,
name = target_table_name
)
def func ():
df = dlt.readStream (topic_name)如果其他流dlt.read (topic_name)
dfSeries = df。選擇(“containerTerminalVisitKey坳(moveEvents)[0]['從'][“位置”][' locationType '] .alias (“fromWhere”),爆炸(moveEvents.to) .alias (“toLoc”))。\
選擇(“containerTerminalVisitKey”(坳(toLoc.time) / 1000.0) .cast (TimestampType ()) .alias(“時間”),“fromWhere”、“toLoc.location.positionInfo”)。\
udfGetName withColumn (“blockName”(“positionInfo”,點燃(“塊”)))。\
udfGetName withColumn (“bayName”(“positionInfo”,點燃(灣)))。\
udfGetName withColumn (“slotName”(“positionInfo”,點燃(“堆棧”)))。\
過濾器(坳(blockName) .isNotNull() &坳(bayName) .isNotNull() &坳(slotName) .isNotNull ())。\
下降(“positionInfo”)
返回dfSeries
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
輸入dataframe模式實際上是相當嚇人的遞歸結構體定義,但大部分隻是湊熱鬧而已。關鍵字段是containerTerminalVisitKey containerInfo.updateCounter序列字段。後者隻是一個序列號(長)。
我認為有很多。我不確定我的代碼共享多少幫助。我正在想了解的情況發出錯誤消息(早前發布的文章)可能會有所幫助。