取消
顯示的結果
而不是尋找
你的意思是:

火花3.3.0連接卡夫卡問題

avnish26
新的貢獻者二世

我試圖連接到我的卡夫卡從火花,但得到一個錯誤:

卡夫卡版本:2.4.1

火花版本:3.3.0

我用jupyter筆記本下麵執行pyspark代碼:

' ' '

從pyspark.sql。功能導入*

從pyspark.sql。導入類型*

#導入庫

進口操作係統

從pyspark。sql進口SparkSession

操作係統。環境(“PYSPARK_SUBMIT_ARGS”) = '——包org.apache.spark: spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell”

sc = SparkSession.builder.appName (Pyspark_kafka) .getOrCreate ()

df = sc \

.readStream \

.format \(“卡夫卡”)

.option (“kafka.bootstrap。服務器”、“zonos.engrid.in: \ 9092”)

.option(“訂閱”、“ext_device-event_10121”) \

.option (“startingOffsets”、“最早”)\

.option (“endingOffsets”、“最新”)\

.load ()

' ' '

這給了我以下錯誤:

' ' '

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

AnalysisException回溯(最近調用最後)

< ipython -輸入- 18 - 409 - d93832e70 >在<模塊>

5 .option(“訂閱”,“ext_device-event_10121”) \

6 .option (“startingOffsets”,“最早”)\

- - - - - > 7 .option (“endingOffsets”、“最新”)\

8 .load ()

/ opt /火花/ spark-3.3.0-bin-hadoop3 / python / pyspark / sql /流。py負載(自我、路徑、格式、模式* *選項)

467年返回self._df (self._jreader.load(路徑))

468年:

- - > 469年返回self._df (self._jreader.load ())

470年

471 def json (

/usr/local/lib/python3.6/site-packages / py4j / java_gateway。py __call__(自我,* args)

1255年的答案= self.gateway_client.send_command(命令)

1256年return_value = get_return_value (

- > 1257年回答,自我。gateway_client,自我。target_id self.name)

1258年

1259年在temp_args temp_arg:

/ opt /火花/ spark-3.3.0-bin-hadoop3 / python / pyspark / sql /跑龍套。py在德科(*,* *千瓦)

194 #隱藏除了來自哪裏顯示non-Pythonic

195 # JVM異常消息。

- - > 196年提高從沒有轉換

197年:

198年籌集

AnalysisException:沒有找到數據來源:卡夫卡。請部署應用程序的部署部分按“結構化流+卡夫卡集成指南”。

' ' '

不知道怎麼了連接器或代碼請幫幫我。

謝謝

2回答2

Nghiaht1
新的貢獻者三世

這是我如何配置運行PySpark (scala 2.12火花3.2.1)結構流與卡夫卡jupyter實驗室(需要下載2 spark-sql-kafka-0-10_2.12-3.2.1 jar文件。kafka-clients-2.1.1 jar。jar文件夾罐)

火花= SparkSession \

.builder \

. config(“火花。罐子”,os.getcwd() +”/罐/ spark-sql-kafka-0-10_2.12-3.2.1。jar“+”、“+ os.getcwd() + " \ /罐/ kafka-clients-2.1.1.jar”)

.appName \ (“Structured_Redpanda_WordCount”)

.getOrCreate ()

spark.conf.set (“spark.sql.shuffle。分區”,1)

weldermartins
尊敬的貢獻者

在我的例子中解決,隻是下載jar。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map