使用Azure磚:
我可以創建一個DLT表在python中使用
進口dlt pyspark.sql進口。從pyspark.sql fn功能。類型進口StringType @dlt。表(name = " < < landingTable > >”,路徑=“< <存儲路徑> >”,評論=“< <描述性評論> >”)def landingTable (): jasConfig =“kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名=“{}”密碼=“{}”;“。格式(confluentApiKey confluentSecret) binary_to_string = fn。udf(λx: str (int.from_bytes (x, byteorder = '大')),StringType ()) kafkaOptions = {" kafka.bootstrap。kafka.security服務器”:confluentBootstrapServers。”協議”:“SASL_SSL”、“kafka.sasl.jaas。kafka.ssl.endpoint.identification配置”:jasConfig。”算法”:“https”、“kafka.sasl。機製”:“普通”、“訂閱”:confluentTopicName,”startingOffsets”:“最早”、“failOnDataLoss”:“false”}返回(火花.readStream .format(“卡夫卡”).options (* * kafkaOptions) .load () .withColumn(“關鍵”,fn.col(“關鍵”).cast (StringType ())) .withColumn (valueSchemaId, binary_to_string (fn。expr(“子串(價值,2、4)))).withColumn (avroValue, fn。expr(“子串(價值6長度(值)5)”)).select(“主題”、“分區”,“抵消”,“時間戳”,‘timestampType’,‘鑰匙’,‘valueSchemaId’,‘avroValue’)
但不知道進展:
我目前的假設是將原始avro消息存儲在青銅/著陸STreamiung住表然後使用流媒體實時視圖,python UDF,執行deserializtion。
隻是不知道如何到達那裏
不,我沒有。事實上我不得不停止使用DLT當另一個問題出現在執行部分/流增量的白金聚合表。我最終回到使用: