與Apache卡夫卡和磚流處理
本文描述了如何使用Apache卡夫卡作為源或彙結構化流負載運行時數據磚。
卡夫卡,看到卡夫卡的文檔。
從卡夫卡讀取數據
下麵是一個示例從卡夫卡流讀:
df=(火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“<服務器:ip >”)。選項(“訂閱”,“<主題>”)。選項(“startingOffsets”,“最新”)。負載())
磚還支持批量讀卡夫卡數據源語義,如以下示例所示:
df=(火花。讀。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“<服務器:ip >”)。選項(“訂閱”,“<主題>”)。選項(“startingOffsets”,“最早”)。選項(“endingOffsets”,“最新”)。負載())
增量的批量加載,磚建議使用卡夫卡Trigger.AvailableNow
。看到配置增量的批處理。
在磚運行時的13.1及以上,磚提供了閱讀卡夫卡的SQL函數的數據。與SQL支持流媒體隻有在三角洲住在磚SQL表或流表。看到read_kafka表值函數。
配置卡夫卡結構化流讀者
磚提供了卡夫卡
關鍵字作為數據格式配置連接卡夫卡0.10 +。
以下是對卡夫卡最常見的配置:
有多種方法的指定主題訂閱。你應該隻提供其中一個參數:
選項 |
價值 |
描述 |
---|---|---|
訂閱 |
一個以逗號分隔的話題。 |
主題訂閱列表。 |
subscribePattern |
Java正則表達式字符串。 |
該模式用於訂閱主題(s)。 |
分配 |
JSON字符串 |
具體topicPartitions消費。 |
其他值得注意的配置:
選項 |
價值 |
默認值 |
描述 |
---|---|---|---|
kafka.bootstrap.servers |
以逗號分隔的主持人:端口。 |
空 |
[要求]卡夫卡 |
failOnDataLoss |
|
|
(可選的)是否失敗的查詢時數據丟失的可能。查詢可以從卡夫卡永久無法讀取數據,由於許多場景如刪除話題,話題截斷前處理,等等。我們試圖估計保守是否數據可能丟失。有時這可能導致假警報。設置這個選項 |
minPartitions |
整數> = 0,0 =禁用。 |
0(禁用) |
(可選的)最小數量的分區從卡夫卡讀取。您可以配置火花使用任意最小的分區從卡夫卡使用讀取 |
kafka.group.id |
卡夫卡消費者組ID。 |
沒有設置 |
(可選的)組ID使用從卡夫卡在閱讀。小心地使用這個。默認情況下,每個查詢生成一個獨特的組ID讀取數據。這可以確保每個查詢都有自己的消費群體,沒有麵臨幹擾其他消費者一樣,因此可以閱讀所有分區的訂閱的主題。在某些情況下(例如,卡夫卡組的授權),您可能需要使用特定的授權組id讀取數據。您可以選擇設置組ID。然而,這個要特別小心,因為它可能導致不可預測的行為。
|
startingOffsets |
最早的,最新的 |
最新的 |
(可選的)查詢時的起點開始,要麼是“最早”,從最早的偏移量,或一個json字符串指定為每個TopicPartition的起始偏移量。在json, 2作為一個抵消最早可以用來參考,最新的1。注意:對於批處理查詢,最新(隱式或通過使用1以json)是不允許的。對於流媒體查詢,這隻適用於當開始一個新的查詢和恢複總是撿起從哪裏查詢。新發現的分區在查詢將從最早開始。 |
看到結構化流卡夫卡集成指南其他可選的配置。
模式卡夫卡記錄
卡夫卡記錄的模式是:
列 |
類型 |
---|---|
關鍵 |
二進製 |
價值 |
二進製 |
主題 |
字符串 |
分區 |
int |
抵消 |
長 |
時間戳 |
長 |
timestampType |
int |
的關鍵
和價值
總是反序列化為字節數組ByteArrayDeserializer
。使用DataFrame操作(例如鑄造(“字符串”)
)顯式地反序列化鍵和值。
寫數據到卡夫卡
下麵是一個示例流寫入卡夫卡:
(df。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“<服務器:ip >”)。選項(“主題”,“<主題>”)。開始())
磚還支持批處理寫語義下沉卡夫卡數據,如以下示例所示:
(df。寫。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“<服務器:ip >”)。選項(“主題”,“<主題>”)。保存())
配置卡夫卡結構化流的作家
重要的
在磚運行時的13.1及以上,一個更新版本的kafka-clients
圖書館使用,使冪等默認寫道。如果卡夫卡水槽使用版本2.8.0或低於acl配置但沒有IDEMPOTENT_WRITE
啟用時,寫失敗和錯誤消息org.apache.kafka.common.KafkaException:不能執行事務方法因為我們是在一個錯誤狀態
。
可以通過升級解決這個錯誤卡夫卡版本2.8.0以上或通過設置.option (“kafka.enable.idempotence”,“假”)
雖然配置結構化流的作家。
模式提供給DataStreamWriter與卡夫卡的水槽。您可以使用以下字段:
列名 |
必需的或可選的 |
類型 |
---|---|---|
|
可選 |
|
|
要求 |
|
|
可選 |
|
|
可選(忽略了如果 |
|
|
可選 |
|
以下是常見的選項設置卡夫卡在寫:
選項 |
價值 |
默認值 |
描述 |
---|---|---|---|
|
一個以逗號分隔的 |
沒有一個 |
[要求]卡夫卡 |
|
|
沒有設置 |
(可選的)集所有行可以寫的主題。這個選擇將會重寫任何話題存在於數據的列。 |
|
|
|
(可選的)是否包括卡夫卡標題行。 |
看到結構化流卡夫卡集成指南其他可選的配置。
檢索卡夫卡指標
請注意
在磚運行時8.1及以上。
你可以得到平均最小值和最大值的補償的數量背後的流媒體查詢最新抵消在所有的訂閱的主題avgOffsetsBehindLatest
,maxOffsetsBehindLatest
,minOffsetsBehindLatest
指標。看到閱讀指標交互。
請注意
在磚運行時9.1及以上。
獲得的字節總數估計查詢過程沒有消耗從訂閱的主題通過檢查的價值estimatedTotalBytesBehindLatest
。這估計是基於批量加工的最後300秒。估計的時間框架是基於可以改變通過設置選項bytesEstimateWindowLength
到一個不同的值。例如,將其設置為10分鍾:
df=(火花。readStream。格式(“卡夫卡”)。選項(“bytesEstimateWindowLength”,“10 m”)#米幾分鍾,您還可以使用“600年代”600秒)
如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據流查詢進展儀表板選項卡:
{“源”:({“描述”:“KafkaV2訂閱(主題)”,“指標”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}
使用SSL連接磚卡夫卡
啟用SSL連接卡夫卡,聽從指示的融合性的文檔通過SSL加密和身份驗證。您可以提供描述的配置,前綴卡夫卡。
,如選項。例如,您指定的信任存儲位置屬性kafka.ssl.truststore.location
。
磚建議你:
下麵的例子使用對象存儲位置和磚秘密啟用SSL連接:
df=(火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,…)。選項(“kafka.security.protocol”,“SASL_SSL”)。選項(“kafka.ssl.truststore.location”,<信任存儲庫- - - - - -位置>)。選項(“kafka.ssl.keystore.location”,<密鑰存儲庫- - - - - -位置>)。選項(“kafka.ssl.keystore.password”,dbutils。秘密。得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <密鑰存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))。選項(“kafka.ssl.truststore.password”,dbutils。秘密。得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <信任存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>)))