開始
加載和管理數據
處理數據
政府
引用和資源
2023年8月3日更新
給我們反饋
這篇文章解釋了如何使用三角洲生活表來處理消息從Azure事件中心。你不能使用結構化流事件中心的連接器因為這個庫不可用作為磚運行時的一部分,和δ生活表不允許您使用第三方JVM庫。
Azure事件中心提供了一個端點與Apache卡夫卡兼容,您可以使用結構化流卡夫卡連接器磚中可用的運行時,處理消息從Azure事件中心。關於Azure事件的更多信息中心和Apache卡夫卡兼容性,明白了從Apache卡夫卡應用程序使用Azure事件中心。
以下步驟描述三角洲住表管道連接到現有的事件中心實例從主題和使用事件。完成這些步驟,您需要以下事件中心連接值:
活動中心名稱空間的名稱。
事件中心實例的名稱在中心名稱空間。
共享訪問政策名稱和政策的關鍵事件中心。默認情況下,RootManageSharedAccessKey政策是為每個事件創建中心名稱空間。這個政策管理,發送和聽權限。如果你的管道隻讀取事件中心,磚建議創建一個新的政策,隻聽許可。
RootManageSharedAccessKey
管理
發送
聽
關於事件的中心連接字符串的更多信息,見得到一個事件中心連接字符串。
請注意
Azure事件中心提供了OAuth 2.0和共享訪問簽名(SAS)選擇授權訪問安全資源。這些指令使用SAS-based身份驗證。
如果你得到事件從Azure門戶樞紐連接字符串,它可能不包含EntityPath價值。的EntityPath價值隻有在需要使用結構化流事件中心的連接器。使用結構化流卡夫卡連接器隻需要提供主題名稱。
EntityPath
因為政策關鍵是敏感信息,磚建議不要硬編碼管道代碼的價值。相反,使用磚秘密存儲和管理的關鍵。
下麵的例子使用磚CLI創建一個秘密範圍和存儲秘密範圍的關鍵。在管道代碼中,使用dbutils.secrets.get ()函數與scope-name和shared-policy-name獲取鍵值。
dbutils.secrets.get ()
scope-name
shared-policy-name
磚——概要文件<配置文件名稱>秘密創建範圍範圍< scope-name >磚——概要文件<配置文件名稱>秘密把範圍< scope-name >——< shared-policy-name >鍵字符串值< shared-policy-key >
磚秘密的更多信息,請參閱保密管理。關於使用秘密CLI的更多信息,請參閱秘密CLI(遺留)。
下麵的例子看物聯網事件從一個話題,但是你可以適應的示例應用程序的需求。作為一項最佳實踐,磚建議使用三角洲住表管道設置來配置應用程序變量。然後使用你的管道代碼spark.conf.get ()函數來檢索值。更多信息使用管道設置參數化你的管道,明白了參數化管道。
spark.conf.get ()
進口dlt進口pyspark.sql.types作為T從pyspark.sql.functions進口*#事件中心配置EH_NAMESPACE=火花。相依。得到(“iot.ingestion.eh.namespace”)EH_NAME=火花。相依。得到(“iot.ingestion.eh.name”)EH_CONN_SHARED_ACCESS_KEY_NAME=火花。相依。得到(“iot.ingestion.eh.accessKeyName”)SECRET_SCOPE=火花。相依。得到(“io.ingestion.eh.secretsScopeName”)EH_CONN_SHARED_ACCESS_KEY_VALUE=dbutils。秘密。得到(範圍=SECRET_SCOPE,關鍵=EH_CONN_SHARED_ACCESS_KEY_NAME)EH_CONN_STR=f“端點=某人:/ /{EH_NAMESPACE}.servicebus.windows.net/; SharedAccessKeyName ={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey ={EH_CONN_SHARED_ACCESS_KEY_VALUE}”#卡夫卡消費者配置KAFKA_OPTIONS={“kafka.bootstrap.servers”:f”{EH_NAMESPACE}.servicebus.windows.net: 9093”,“訂閱”:EH_NAME,“kafka.sasl.mechanism”:“普通”,“kafka.security.protocol”:“SASL_SSL”,“kafka.sasl.jaas.config”:f“kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名=\”ConnectionString美元\”密碼=\”{EH_CONN_STR}\”;“,“kafka.request.timeout.ms”:火花。相依。得到(“iot.ingestion.kafka.requestTimeout”),“kafka.session.timeout.ms”:火花。相依。得到(“iot.ingestion.kafka.sessionTimeout”),“maxOffsetsPerTrigger”:火花。相依。得到(“iot.ingestion.spark.maxOffsetsPerTrigger”),“failOnDataLoss”:火花。相依。得到(“iot.ingestion.spark.failOnDataLoss”),“startingOffsets”:火花。相依。得到(“iot.ingestion.spark.startingOffsets”)}#負載模式payload_ddl=””“battery_level BIGINT c02_level BIGINT, cca2字符串,cca3字符串,cn字符串,device_id BIGINT, device_name字符串,濕度長整型數字ip字符串,緯度雙lcd字符串,經度翻倍,規模字符串,臨時長整型數字,時間戳BIGINT“”“payload_schema=T。_parse_datatype_string(payload_ddl)#基本記錄解析和添加ETL審計列def解析(df):返回(df。withColumn(“記錄”,上校(“價值”)。投(“字符串”))。withColumn(“parsed_records”,from_json(上校(“記錄”),payload_schema))。withColumn(“iot_event_timestamp”,expr(”(from_unixtime (parsed_records。時間戳/ 1000)作為時間戳)”))。withColumn(“eh_enqueued_timestamp”,expr(“時間戳”))。withColumn(“eh_enqueued_date”,expr(“to_date(時間戳)”))。withColumn(“etl_processed_timestamp”,上校(“current_timestamp”))。withColumn(“etl_rec_uuid”,expr(“uuid ()”))。下降(“記錄”,“價值”,“關鍵”))@dlt。create_table(評論=“原始物聯網事件”,table_properties={“質量”:“青銅”,“pipelines.reset.allowed”:“假”#保護三角洲中的數據表如果你完全刷新},partition_cols=(“eh_enqueued_date”])@dlt。預計(“valid_topic”,“話題不是零”)@dlt。預計(“有效的記錄”,“parsed_records NOT NULL”)defiot_raw():返回(火花。readStream。格式(“卡夫卡”)。選項(* *KAFKA_OPTIONS)。負載()。變換(解析))
創建一個新的管道使用以下設置,用適當的值替換占位符的值對於您的環境。
{“集群”:({“標簽”:“默認”,“spark_conf”:{“spark.hadoop.fs.azure.account.key < storage-account-name > .dfs.core.windows.net”。:“{{秘密/ < scope-name > / <秘密名字>}}”},“num_workers”:4}),“發展”:真正的,“連續”:假,“通道”:“當前”,“版”:“高級”,“光子”:假,“庫”:({“筆記本”:{“路徑”:“< path-to-notebook >”}}),“名稱”:“dlt_eventhub_ingestion_using_kafka”,“存儲”:“abfss: / / <容器名稱> @ < storage-account-name >.dfs.core.windows.net/iot/”,“配置”:{“iot.ingestion.eh.namespace”:“< eh-namespace >”,“iot.ingestion.eh.accessKeyName”:“< eh-policy-name >”,“iot.ingestion.eh.name”:“< eventhub >”,“io.ingestion.eh.secretsScopeName”:“< secret-scope-name >”,“iot.ingestion.spark.maxOffsetsPerTrigger”:“50000”,“iot.ingestion.spark.startingOffsets”:“最新”,“iot.ingestion.spark.failOnDataLoss”:“假”,“iot.ingestion.kafka.requestTimeout”:“60000”,“iot.ingestion.kafka.sessionTimeout”:“30000”},“目標”:“< target-database-name >”}
取代
<容器名稱>容器Azure存儲賬戶的名稱。
<容器名稱>
< storage-account-name >的名字ADLS Gen2存儲賬戶。
< storage-account-name >
< eh-namespace >與你的活動中心名稱空間的名稱。
< eh-namespace >
< eh-policy-name >的秘密範圍鍵事件中心政策的關鍵。
< eh-policy-name >
< eventhub >事件中心實例的名稱。
< eventhub >
< secret-scope-name >與磚秘密範圍的名稱包含事件中心政策的關鍵。
< secret-scope-name >
作為一項最佳實踐,這個管道不使用默認DBFS存儲路徑,而是使用Azure代(ADLS Gen2)存儲數據湖存儲賬戶。為更多的信息關於配置的身份驗證ADLS Gen2存儲賬戶,明白了安全地訪問存儲憑證與秘密管道。