開始
加載和管理數據
處理數據
政府
引用和資源
2023年8月3日更新
給我們反饋
Apache Avro是一種常用的數據序列化係統流的世界。一個典型的解決方案是將數據在Apache卡夫卡Avro格式,元數據融合性的模式注冊表,然後運行查詢的流媒體框架連接到卡夫卡和模式注冊表。
磚支持from_avro和to_avro功能在卡夫卡建立與Avro數據流管道在注冊表模式和元數據。這個函數to_avro在Avro二進製格式和編碼專欄from_avro解碼Avro二進製數據列。函數變換到另一列一列,和輸入/輸出的SQL數據類型可以是一個複雜類型或一個原始類型。
from_avro
to_avro
請注意
的from_avro和to_avro功能:
可在PythonScala和Java。
可以傳遞給SQL函數在兩個批處理和流媒體查詢。
也看到Avro文件數據源。
類似於from_json和to_json,你可以使用from_avro和to_avro與任何二進製列,但你必須指定Avro手動模式。
進口org。apache。火花。sql。avro。功能。_進口org。apache。avro。SchemaBuilder/ /當閱讀卡夫卡的鍵和值的話題,解碼/ /二進製(Avro)數據結構化數據。/ /結果DataFrame的模式是:<關鍵:字符串,價值:int >瓦爾df=火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“訂閱”,“t”)。負載()。選擇(from_avro(美元“關鍵”,SchemaBuilder。構建器()。stringType())。作為(“關鍵”),from_avro(美元“價值”,SchemaBuilder。構建器()。intType())。作為(“價值”))/ /將結構化數據轉換為二進製字符串(鍵列)和/ / int(值列)並保存到卡夫卡的主題。dataDF。選擇(to_avro(美元“關鍵”)。作為(“關鍵”),to_avro(美元“價值”)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。開始()
您還可以指定一個模式作為一個JSON字符串。例如,如果/ tmp / user.avsc是:
/ tmp / user.avsc
{“名稱”:“example.avro”,“類型”:“記錄”,“名稱”:“用戶”,“字段”:({“名稱”:“名稱”,“類型”:“字符串”},{“名稱”:“favorite_color”,“類型”:(“字符串”,“零”]}]}
您可以創建一個JSON字符串:
從pyspark.sql.avro.functions進口from_avro,to_avrojsonFormatSchema=開放(“/ tmp / user.avsc”,“r”)。讀()
然後使用模式from_avro:
# 1。解碼Avro數據結構。# 2。過濾器由列“favorite_color”。# 3。以Avro格式編碼的“名稱”列。輸出=df\。選擇(from_avro(“價值”,jsonFormatSchema)。別名(“用戶”))\。在哪裏(的用戶。favorite_color == "red"')\。選擇(to_avro(“user.name”)。別名(“價值”))
如果您的集群模式注冊表服務,from_avro可以使用它,這樣你不需要指定Avro手動模式。
下麵的例子演示了閱讀卡夫卡的主題“t”,假設鍵和值已經注冊在注冊表模式主題“t鍵”和“值”的類型字符串和INT:
字符串
INT
進口org。apache。火花。sql。avro。功能。_瓦爾schemaRegistryAddr=“https://myhost: 8081”瓦爾df=火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“訂閱”,“t”)。負載()。選擇(from_avro(美元“關鍵”,“t鍵”,schemaRegistryAddr)。作為(“關鍵”),from_avro(美元“價值”,“值”,schemaRegistryAddr)。作為(“價值”))
為to_avro,默認輸出Avro模式可能不匹配模式的目標主題注冊表服務有以下原因:
從火花SQL類型映射到Avro模式不是一對一的。看到支持類型的火花SQL - > Avro轉換。
如果轉換輸出Avro模式記錄類型,記錄的名字是topLevelRecord沒有默認名稱空間。
topLevelRecord
如果默認輸出模式to_avro匹配模式的目標主題,您可以執行以下操作:
/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF。選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。開始()
否則,你必須提供模式的目標對象to_avro功能:
/ / Avro模式的JSON字符串格式的“值”。瓦爾avroSchema=…/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF。選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr,avroSchema)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。開始()
在磚運行時的12.1及以上,可以驗證外部彙合的模式注冊表。下麵的例子演示如何配置您的模式注冊表選項包括身份驗證憑據和API密鑰。
進口org。apache。火花。sql。avro。功能。_進口scala。集合。JavaConverters。_瓦爾schemaRegistryAddr=“https://confluent-schema-registry-endpoint”瓦爾schemaRegistryOptions=地圖(“confluent.schema.registry.basic.auth.credentials.source”- >“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”- >“confluentApiKey: confluentApiSecret”)瓦爾df=火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“訂閱”,“t”)。負載()。選擇(from_avro(美元“關鍵”,“t鍵”,schemaRegistryAddr,schemaRegistryOptions。asJava)。作為(“關鍵”),from_avro(美元“價值”,“值”,schemaRegistryAddr,schemaRegistryOptions。asJava)。作為(“價值”))/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF。選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr,schemaRegistryOptions。asJava)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr,schemaRegistryOptions。asJava)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()/ / Avro模式的JSON字符串格式的“值”。瓦爾avroSchema=…/ /轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。dataDF。選擇(to_avro(美元“關鍵”,點燃(“t鍵”),schemaRegistryAddr,schemaRegistryOptions。asJava)。作為(“關鍵”),to_avro(美元“價值”,點燃(“值”),schemaRegistryAddr,schemaRegistryOptions。asJava,avroSchema)。作為(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()
從pyspark.sql.functions進口上校,點燃從pyspark.sql.avro.functions進口from_avro,to_avroschema_registry_address=“https://confluent-schema-registry-endpoint”schema_registry_options={“confluent.schema.registry.basic.auth.credentials.source”:“USER_INFO”,“confluent.schema.registry.basic.auth.user.info”:f”{關鍵}:{秘密}”}df=(火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“訂閱”,“t”)。負載()。選擇(from_avro(數據=上校(“關鍵”),選項=schema_registry_options,主題=“t鍵”,schemaRegistryAddress=schema_registry_address)。別名(“關鍵”),from_avro(數據=上校(“價值”),選項=schema_registry_options,主題=“值”,schemaRegistryAddress=schema_registry_address)。別名(“價值”)))#轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。data_df。選擇(to_avro(數據=上校(“關鍵”),主題=點燃(“t鍵”),schemaRegistryAddress=schema_registry_address,選項=schema_registry_options)。別名(“關鍵”),to_avro(數據=上校(“價值”),主題=點燃(“值”),schemaRegistryAddress=schema_registry_address,選項=schema_registry_options)。別名(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()# Avro模式的JSON字符串格式的“值”。avro_schema=…#轉換後的數據保存到卡夫卡作為卡夫卡主題“t”。data_df。選擇(to_avro(上校(“關鍵”),點燃(“t鍵”),schema_registry_address,schema_registry_options)。別名(“關鍵”),to_avro(上校(“價值”),點燃(“值”),schema_registry_address,schema_registry_options,avro_schema)。別名(“價值”))。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,服務器)。選項(“主題”,“t”)。保存()