我們正在努力寫多個來源相同的目標表使用DLT,但是得到以下錯誤。
不知道我們在這裏失蹤在代碼....
文件/磚/火花/ python / dlt / api。py: 817年apply_changes(目標、源、鑰匙、sequence_by, ignore_null_updates, apply_as_deletes, apply_as_truncates, column_list, except_column_list, stored_as_scd_type, track_history_column_list, track_history_except_column_list, flow_name) 814年籌集RuntimeError(“隻有SCD 1型和SCD 2型現在支持。”)
dlt.create_streaming_table
(
name =
“unified_events_test_11”
)
dlt.apply_changes
(
目標=
“unified_events_test_11”
,
源=
“unified_events_pv_raw”
,
鍵=
(
“event_id”
),
sequence_by = F.col
(
“cdcTimestamp”
),
apply_as_deletes = F。
expr (
”
操作
=
' D '
”
),
except_column_list =
(
“操作”
,
“cdcTimestamp”
),
)
dlt.apply_changes
(
目標=
“unified_events_test_11”
,
源=
“unified_events_wc_raw”
,
鍵=
(
“event_id”
),
sequence_by = F.col
(
“cdcTimestamp”
),
apply_as_deletes = F。
expr (
”
操作
=
' D '
”
),
except_column_list =
(
“操作”
,
“cdcTimestamp”
),
)
注意:unified_events_pv_raw和unified_events_wc_raw流表
@dlt
.table
(
name =
“unified_events_wc_raw”
)
def
unified_events_wc_raw
():
df =
(
spark.readStream。
格式
(
“cloudFiles”
)
.option
(
“cloudFiles.format”
,
“csv”
)
.option
(
“9”
,
“| | | |”
)
. schema
(
wallet_connect_schema
)
.load
(
“dbfs: / FileStore / wallet_connect_events”
)
)