與Apache卡夫卡和磚流處理

本文描述了如何使用Apache卡夫卡作為源或彙結構化流負載運行時數據磚。

卡夫卡,看到卡夫卡的文檔

從卡夫卡讀取數據

下麵是一個示例從卡夫卡流讀:

df=(火花readStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,“<服務器:ip >”)選項(“訂閱”,“<主題>”)選項(“startingOffsets”,“最新”)負載())

磚還支持批量讀卡夫卡數據源語義,如以下示例所示:

df=(火花格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,“<服務器:ip >”)選項(“訂閱”,“<主題>”)選項(“startingOffsets”,“最早”)選項(“endingOffsets”,“最新”)負載())

增量的批量加載,磚建議使用卡夫卡Trigger.AvailableNow。看到配置增量的批處理

在磚運行時的13.1及以上,磚提供了閱讀卡夫卡的SQL函數的數據。與SQL支持流媒體隻有在三角洲住在磚SQL表或流表。看到read_kafka表值函數

配置卡夫卡結構化流讀者

磚提供了卡夫卡關鍵字作為數據格式配置連接卡夫卡0.10 +。

以下是對卡夫卡最常見的配置:

有多種方法的指定主題訂閱。你應該隻提供其中一個參數:

選項

價值

描述

訂閱

一個以逗號分隔的話題。

主題訂閱列表。

subscribePattern

Java正則表達式字符串。

該模式用於訂閱主題(s)。

分配

JSON字符串{“局部藥”:[0,1],“主題”:(2、4)}

具體topicPartitions消費。

其他值得注意的配置:

選項

價值

默認值

描述

kafka.bootstrap.servers

以逗號分隔的主持人:端口。

[要求]卡夫卡bootstrap.servers配置。如果你發現沒有數據從卡夫卡,首先檢查代理地址列表。如果代理地址列表不正確,可能沒有任何錯誤。這是因為卡夫卡端假設經紀人最終會變得可用,在發生網絡錯誤重試,直到永遠。

failOnDataLoss

真正的

真正的

(可選的)是否失敗的查詢時數據丟失的可能。查詢可以從卡夫卡永久無法讀取數據,由於許多場景如刪除話題,話題截斷前處理,等等。我們試圖估計保守是否數據可能丟失。有時這可能導致假警報。設置這個選項如果它不正常工作,或者你想要查詢繼續處理盡管數據丟失。

minPartitions

整數> = 0,0 =禁用。

0(禁用)

(可選的)最小數量的分區從卡夫卡讀取。您可以配置火花使用任意最小的分區從卡夫卡使用讀取minPartitions選擇。通常火花的1:1映射卡夫卡topicPartitions從卡夫卡火花分區使用。如果你設置minPartitions選擇一個值大於你的卡夫卡topicPartitions,火花將分配大型卡夫卡分區小的碎片。這個選項可以設置在高峰負荷,所以數據傾斜,當你流落後提高處理速度。時初始化卡夫卡在每個觸發消費者的成本,這可能會影響性能,如果你使用SSL連接到卡夫卡。

kafka.group.id

卡夫卡消費者組ID。

沒有設置

(可選的)組ID使用從卡夫卡在閱讀。小心地使用這個。默認情況下,每個查詢生成一個獨特的組ID讀取數據。這可以確保每個查詢都有自己的消費群體,沒有麵臨幹擾其他消費者一樣,因此可以閱讀所有分區的訂閱的主題。在某些情況下(例如,卡夫卡組的授權),您可能需要使用特定的授權組id讀取數據。您可以選擇設置組ID。然而,這個要特別小心,因為它可能導致不可預測的行為。

  • 並發運行查詢(包括批處理和流媒體)使用相同的組ID可能會互相幹擾,導致每個查詢隻讀數據的一部分。

  • 這也可能出現在查詢開始接二連三地/重新啟動。減少這樣的問題,設置卡夫卡消費者配置session.timeout.ms是非常小的。

startingOffsets

最早的,最新的

最新的

(可選的)查詢時的起點開始,要麼是“最早”,從最早的偏移量,或一個json字符串指定為每個TopicPartition的起始偏移量。在json, 2作為一個抵消最早可以用來參考,最新的1。注意:對於批處理查詢,最新(隱式或通過使用1以json)是不允許的。對於流媒體查詢,這隻適用於當開始一個新的查詢和恢複總是撿起從哪裏查詢。新發現的分區在查詢將從最早開始。

看到結構化流卡夫卡集成指南其他可選的配置。

模式卡夫卡記錄

卡夫卡記錄的模式是:

類型

關鍵

二進製

價值

二進製

主題

字符串

分區

int

抵消

時間戳

timestampType

int

關鍵價值總是反序列化為字節數組ByteArrayDeserializer。使用DataFrame操作(例如鑄造(“字符串”))顯式地反序列化鍵和值。

寫數據到卡夫卡

下麵是一個示例流寫入卡夫卡:

(dfwriteStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,“<服務器:ip >”)選項(“主題”,“<主題>”)開始())

磚還支持批處理寫語義下沉卡夫卡數據,如以下示例所示:

(df格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,“<服務器:ip >”)選項(“主題”,“<主題>”)保存())

配置卡夫卡結構化流的作家

重要的

在磚運行時的13.1及以上,一個更新版本的kafka-clients圖書館使用,使冪等默認寫道。如果卡夫卡水槽使用版本2.8.0或低於acl配置但沒有IDEMPOTENT_WRITE啟用時,寫失敗和錯誤消息org.apache.kafka.common.KafkaException:不能執行事務方法因為我們一個錯誤狀態

可以通過升級解決這個錯誤卡夫卡版本2.8.0以上或通過設置.option (“kafka.enable.idempotence”,“假”)雖然配置結構化流的作家。

模式提供給DataStreamWriter與卡夫卡的水槽。您可以使用以下字段:

列名

必需的或可選的

類型

關鍵

可選

字符串二進製

價值

要求

字符串二進製

可選

數組

主題

可選(忽略了如果主題是設置為作家選項)

字符串

分區

可選

INT

以下是常見的選項設置卡夫卡在寫:

選項

價值

默認值

描述

kafka.boostrap.servers

一個以逗號分隔的<主機:端口>

沒有一個

[要求]卡夫卡bootstrap.servers配置。

主題

字符串

沒有設置

(可選的)集所有行可以寫的主題。這個選擇將會重寫任何話題存在於數據的列。

includeHeaders

布爾

(可選的)是否包括卡夫卡標題行。

看到結構化流卡夫卡集成指南其他可選的配置。

檢索卡夫卡指標

請注意

在磚運行時8.1及以上。

你可以得到平均最小值和最大值的補償的數量背後的流媒體查詢最新抵消在所有的訂閱的主題avgOffsetsBehindLatest,maxOffsetsBehindLatest,minOffsetsBehindLatest指標。看到閱讀指標交互

請注意

在磚運行時9.1及以上。

獲得的字節總數估計查詢過程沒有消耗從訂閱的主題通過檢查的價值estimatedTotalBytesBehindLatest。這估計是基於批量加工的最後300秒。估計的時間框架是基於可以改變通過設置選項bytesEstimateWindowLength到一個不同的值。例如,將其設置為10分鍾:

df=(火花readStream格式(“卡夫卡”)選項(“bytesEstimateWindowLength”,“10 m”)#米幾分鍾,您還可以使用“600年代”600秒)

如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據流查詢進展儀表板選項卡:

{“源”:({“描述”:“KafkaV2訂閱(主題)”,“指標”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}

使用SSL連接磚卡夫卡

啟用SSL連接卡夫卡,聽從指示的融合性的文檔通過SSL加密和身份驗證。您可以提供描述的配置,前綴卡夫卡。,如選項。例如,您指定的信任存儲位置屬性kafka.ssl.truststore.location

磚建議你:

下麵的例子使用對象存儲位置和磚秘密啟用SSL連接:

df=(火花readStream格式(“卡夫卡”)選項(“kafka.bootstrap.servers”,)選項(“kafka.security.protocol”,“SASL_SSL”)選項(“kafka.ssl.truststore.location”,<信任存儲庫- - - - - -位置>)選項(“kafka.ssl.keystore.location”,<密鑰存儲庫- - - - - -位置>)選項(“kafka.ssl.keystore.password”,dbutils秘密得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <密鑰存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))選項(“kafka.ssl.truststore.password”,dbutils秘密得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <信任存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>)))