這是一個循序漸進的指南連接彙合的雲數據磚:
步驟1:建立一個融合性的雲計算集群
kafka-brokers.example.com: 9092)
步驟2:配置磚
範圍:dbutils.secrets.createScope (scopeName)
通過添加引導服務器配置彙合的雲的秘密,API鍵,創建的秘密和秘密範圍:
dbutils.secrets。把(範圍= scopeName鍵= " kafka.bootstrap。服務器”,價值= " kafka-brokers.example.com: 9092”)
dbutils.secrets。把(範圍= scopeName鍵= " kafka.security。協議”,價值= " SASL_SSL”)
dbutils.secrets。把(範圍= scopeName鍵= " kafka.sasl。機製”,價值=“平原”)
dbutils.secrets。把(範圍= scopeName鍵= " kafka.sasl.jaas。配置”,價值= " org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名= \ " < API_KEY > \ "密碼= \“< API_SECRET > \”;”)
步驟3:創建一個流DataFrame磚
從pyspark.sql.functions進口from_json坳從pyspark.sql.types進口StructType StringType倍增式#定義傳入的數據模式的模式= StructType閥門()(“名字”,StringType閥門())(“年齡”,倍增式())#從卡夫卡主題kafka_bootstrap_servers = dbutils.secrets讀取數據。(範圍= scopeName鍵=“kafka.bootstrap.servers”) df = \ .readStream \火花。格式(“卡夫卡”)\ .option (“kafka.bootstrap。服務器”,kafka_bootstrap_servers) \ .option (“訂閱”,“主題名稱”)\ .option (“startingOffsets”,“最早”)\ .load ()#提取和處理數據processed_df = df \ .select (from_json(坳(“價值”).cast (“字符串”),模式).alias (“數據”))\ .select (“data.name”,“data.age”)# = processed_df開始流查詢查詢。writeStream \ .outputMode (“附加”)\。格式(“控製台”)\ .start () query.awaitTermination ()
第四步:按您的需求定製代碼