與Apache卡夫卡和磚流處理
本文描述了如何使用Apache卡夫卡作為源或彙結構化流負載運行時數據磚。
卡夫卡,看到卡夫卡的文檔。
從卡夫卡讀取數據
下麵是一個例子從卡夫卡讀取數據:
df=(火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“<服務器:ip >”)。選項(“訂閱”,“<主題>”)。選項(“startingOffsets”,“最新”)。負載())
寫數據到卡夫卡
下麵是一個示例卡夫卡寫入數據:
(df。writeStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“<服務器:ip >”)。選項(“主題”,“<主題>”)。選項(“checkpointLocation”,“< checkpoint_path >”)。開始())
為Apache配置卡夫卡火花磚
磚提供了卡夫卡
關鍵字作為數據格式配置連接卡夫卡0.10 +。
以下是對卡夫卡最常見的配置:
有多種方法的指定主題訂閱。你應該隻提供其中一個參數:
選項 |
價值 |
描述 |
---|---|---|
訂閱 |
一個以逗號分隔的話題。 |
主題訂閱列表。 |
subscribePattern |
Java正則表達式字符串。 |
該模式用於訂閱主題(s)。 |
分配 |
JSON字符串 |
具體topicPartitions消費。 |
其他值得注意的配置:
選項 |
價值 |
默認值 |
描述 |
---|---|---|---|
kafka.bootstrap.servers |
以逗號分隔的主持人:端口。 |
空 |
[要求]卡夫卡 |
failOnDataLoss |
|
|
(可選的)是否失敗的查詢時數據丟失的可能。查詢可以從卡夫卡永久無法讀取數據,由於許多場景如刪除話題,話題截斷前處理,等等。我們試圖估計保守是否數據可能丟失。有時這可能導致假警報。設置這個選項 |
minPartitions |
整數> = 0,0 =禁用。 |
0(禁用) |
(可選的)最小數量的分區從卡夫卡讀取。您可以配置火花使用任意最小的分區從卡夫卡使用讀取 |
kafka.group.id |
卡夫卡消費者組ID。 |
沒有設置 |
(可選的)組ID使用從卡夫卡在閱讀。小心地使用這個。默認情況下,每個查詢生成一個獨特的組ID讀取數據。這可以確保每個查詢都有自己的消費群體,沒有麵臨幹擾其他消費者一樣,因此可以閱讀所有分區的訂閱的主題。在某些情況下(例如,卡夫卡組的授權),您可能需要使用特定的授權組id讀取數據。您可以選擇設置組ID。然而,這個要特別小心,因為它可能導致不可預測的行為。
|
startingOffsets |
最早的,最新的 |
最新的 |
(可選的)查詢時的起點開始,要麼是“最早”,從最早的偏移量,或一個json字符串指定為每個TopicPartition的起始偏移量。在json, 2作為一個抵消最早可以用來參考,最新的1。注意:對於批處理查詢,最新(隱式或通過使用1以json)是不允許的。對於流媒體查詢,這隻適用於當開始一個新的查詢和恢複總是撿起從哪裏查詢。新發現的分區在查詢將從最早開始。 |
看到結構化流卡夫卡集成指南其他可選的配置。
模式卡夫卡記錄
卡夫卡記錄的模式是:
列 |
類型 |
---|---|
關鍵 |
二進製 |
價值 |
二進製 |
主題 |
字符串 |
分區 |
int |
抵消 |
長 |
時間戳 |
長 |
timestampType |
int |
的關鍵
和價值
總是反序列化為字節數組ByteArrayDeserializer
。使用DataFrame操作(例如鑄造(“字符串”)
)顯式地反序列化鍵和值。
檢索卡夫卡指標
請注意
在磚運行時8.1及以上。
你可以得到平均最小值和最大值的補償的數量背後的流媒體查詢最新抵消在所有的訂閱的主題avgOffsetsBehindLatest
,maxOffsetsBehindLatest
,minOffsetsBehindLatest
指標。看到閱讀指標交互。
請注意
在磚運行時9.1及以上。
獲得的字節總數估計查詢過程沒有消耗從訂閱的主題通過檢查的價值estimatedTotalBytesBehindLatest
。這估計是基於批量加工的最後300秒。估計的時間框架是基於可以改變通過設置選項bytesEstimateWindowLength
到一個不同的值。例如,將其設置為10分鍾:
df=(火花。readStream。格式(“卡夫卡”)。選項(“bytesEstimateWindowLength”,“10 m”)#米幾分鍾,您還可以使用“600年代”600秒)
如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據流查詢進展儀表板選項卡:
{“源”:({“描述”:“KafkaV2訂閱(主題)”,“指標”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}
使用SSL連接磚卡夫卡
啟用SSL連接卡夫卡,聽從指示的融合性的文檔通過SSL加密和身份驗證。您可以提供描述的配置,前綴卡夫卡。
,如選項。例如,您指定的信任存儲位置屬性kafka.ssl.truststore.location
。
磚建議你:
下麵的例子使用對象存儲位置和磚秘密啟用SSL連接:
df=(火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,…)。選項(“kafka.security.protocol”,“SASL_SSL”)。選項(“kafka.ssl.truststore.location”,<信任存儲庫- - - - - -位置>)。選項(“kafka.ssl.keystore.location”,<密鑰存儲庫- - - - - -位置>)。選項(“kafka.ssl.keystore.password”,dbutils。秘密。得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <密鑰存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))。選項(“kafka.ssl.truststore.password”,dbutils。秘密。得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <信任存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>)))