取消
顯示的結果
而不是尋找
你的意思是:

攝取卡夫卡Avroδ直播表

jm99
新的貢獻者三世

使用Azure磚:

我可以創建一個DLT表在python中使用

進口dlt pyspark.sql進口。從pyspark.sql fn功能。類型進口StringType @dlt。表(name = " < < landingTable > >”,路徑=“< <存儲路徑> >”,評論=“< <描述性評論> >”)def landingTable (): jasConfig =“kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名=“{}”密碼=“{}”;“。格式(confluentApiKey confluentSecret) binary_to_string = fn。udf(λx: str (int.from_bytes (x, byteorder = '大')),StringType ()) kafkaOptions = {" kafka.bootstrap。kafka.security服務器”:confluentBootstrapServers。”協議”:“SASL_SSL”、“kafka.sasl.jaas。kafka.ssl.endpoint.identification配置”:jasConfig。”算法”:“https”、“kafka.sasl。機製”:“普通”、“訂閱”:confluentTopicName,”startingOffsets”:“最早”、“failOnDataLoss”:“false”}返回(火花.readStream .format(“卡夫卡”).options (* * kafkaOptions) .load () .withColumn(“關鍵”,fn.col(“關鍵”).cast (StringType ())) .withColumn (valueSchemaId, binary_to_string (fn。expr(“子串(價值,2、4)))).withColumn (avroValue, fn。expr(“子串(價值6長度(值)5)”)).select(“主題”、“分區”,“抵消”,“時間戳”,‘timestampType’,‘鑰匙’,‘valueSchemaId’,‘avroValue’)

但不知道進展:

  1. 確保DLT表增量/流
  2. 反序列化的AVRO:
  • 融合性的模式注冊客戶端
  • 在Azure存儲賬戶avsc文件
  • 硬編碼在python UDF

我目前的假設是將原始avro消息存儲在青銅/著陸STreamiung住表然後使用流媒體實時視圖,python UDF,執行deserializtion。

隻是不知道如何到達那裏

2回答2

lninza
新的貢獻者二世

嗨@John馬修斯

你找到一種進步嗎?

我被困在同一點…

jm99
新的貢獻者三世

不,我沒有。事實上我不得不停止使用DLT當另一個問題出現在執行部分/流增量的白金聚合表。我最終回到使用:

  • 卡夫卡讀者(見從Apache卡夫卡消費數據)
  • 流dataframes
  • 三角洲表:使enableChangeDataFeed和處理“readChangeFeed”
  • 使用foreachBatch()方法在流應用所需的聚合和SCD1 upsert作家
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map