取消
顯示的結果
而不是尋找
你的意思是:

連接彙合的磚。

Mbinyala
新的貢獻者二世

嗨! !

誰能告訴我如何連接彙合的雲數據磚嗎?我是新手所以請詳細說明你的答案。

2回答2

匿名
不適用

這是一個循序漸進的指南連接彙合的雲數據磚:

步驟1:建立一個融合性的雲計算集群

  • 注冊一個支流雲賬戶https://confluent.cloud/,如果您還沒有創建一個新的集群。
  • 一旦準備好了您的集群,記下以下信息:
    • 引導服務器(例如,kafka-brokers.example.com: 9092)
    • API密匙和秘密(身份驗證)

      步驟2:配置磚

      • 磚的工作區中創建一個新的筆記本或打開一個現有的。
      • 設置必要的配置使用磚秘密範圍的特性。秘密讓你安全地存儲敏感信息,如憑證。創建一個秘密範圍和添加的秘密,遵循這些步驟:
        • 打開筆記本電腦,運行以下命令創建一個秘密
        • 範圍:dbutils.secrets.createScope (scopeName)

          通過添加引導服務器配置彙合的雲的秘密,API鍵,創建的秘密和秘密範圍:

          dbutils.secrets。把(範圍= scopeName鍵= " kafka.bootstrap。服務器”,價值= " kafka-brokers.example.com: 9092”)
          dbutils.secrets。把(範圍= scopeName鍵= " kafka.security。協議”,價值= " SASL_SSL”)
          dbutils.secrets。把(範圍= scopeName鍵= " kafka.sasl。機製”,價值=“平原”)
          dbutils.secrets。把(範圍= scopeName鍵= " kafka.sasl.jaas。配置”,價值= " org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名= \ " < API_KEY > \ "密碼= \“< API_SECRET > \”;”)

          步驟3:創建一個流DataFrame磚

          • 在同一數據磚筆記本,您現在可以創建一個流DataFrame消費數據彙合的雲。下麵是一個示例代碼片段:
            python
            從pyspark.sql.functions進口from_json坳從pyspark.sql.types進口StructType StringType倍增式#定義傳入的數據模式的模式= StructType閥門()(“名字”,StringType閥門())(“年齡”,倍增式())#從卡夫卡主題kafka_bootstrap_servers = dbutils.secrets讀取數據。(範圍= scopeName鍵=“kafka.bootstrap.servers”) df = \ .readStream \火花。格式(“卡夫卡”)\ .option (“kafka.bootstrap。服務器”,kafka_bootstrap_servers) \ .option (“訂閱”,“主題名稱”)\ .option (“startingOffsets”,“最早”)\ .load ()#提取和處理數據processed_df = df \ .select (from_json(坳(“價值”).cast (“字符串”),模式).alias (“數據”))\ .select (“data.name”,“data.age”)# = processed_df開始流查詢查詢。writeStream \ .outputMode (“附加”)\。格式(“控製台”)\ .start () query.awaitTermination ()

            第四步:按您的需求定製代碼

            • 修改上麵的代碼片段以適合您的特定的用例。更新模式定義,卡夫卡主題名稱,並根據需要輸出任何數據轉換或下沉

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map