我試圖連接到我的卡夫卡從火花,但得到一個錯誤:
卡夫卡版本:2.4.1
火花版本:3.3.0
我用jupyter筆記本下麵執行pyspark代碼:
' ' '
從pyspark.sql。功能導入*
從pyspark.sql。導入類型*
#導入庫
進口操作係統
從pyspark。sql進口SparkSession
操作係統。環境(“PYSPARK_SUBMIT_ARGS”) = '——包org.apache.spark: spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell”
sc = SparkSession.builder.appName (Pyspark_kafka) .getOrCreate ()
df = sc \
.readStream \
.format \(“卡夫卡”)
.option (“kafka.bootstrap。服務器”、“zonos.engrid.in: \ 9092”)
.option(“訂閱”、“ext_device-event_10121”) \
.option (“startingOffsets”、“最早”)\
.option (“endingOffsets”、“最新”)\
.load ()
' ' '
這給了我以下錯誤:
' ' '
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
AnalysisException回溯(最近調用最後)
< ipython -輸入- 18 - 409 - d93832e70 >在<模塊>
5 .option(“訂閱”,“ext_device-event_10121”) \
6 .option (“startingOffsets”,“最早”)\
- - - - - > 7 .option (“endingOffsets”、“最新”)\
8 .load ()
/ opt /火花/ spark-3.3.0-bin-hadoop3 / python / pyspark / sql /流。py負載(自我、路徑、格式、模式* *選項)
467年返回self._df (self._jreader.load(路徑))
468年:
- - > 469年返回self._df (self._jreader.load ())
470年
471 def json (
/usr/local/lib/python3.6/site-packages / py4j / java_gateway。py __call__(自我,* args)
1255年的答案= self.gateway_client.send_command(命令)
1256年return_value = get_return_value (
- > 1257年回答,自我。gateway_client,自我。target_id self.name)
1258年
1259年在temp_args temp_arg:
/ opt /火花/ spark-3.3.0-bin-hadoop3 / python / pyspark / sql /跑龍套。py在德科(*,* *千瓦)
194 #隱藏除了來自哪裏顯示non-Pythonic
195 # JVM異常消息。
- - > 196年提高從沒有轉換
197年:
198年籌集
AnalysisException:沒有找到數據來源:卡夫卡。請部署應用程序的部署部分按“結構化流+卡夫卡集成指南”。
' ' '
不知道怎麼了連接器或代碼請幫幫我。
謝謝
這是我如何配置運行PySpark (scala 2.12火花3.2.1)結構流與卡夫卡jupyter實驗室(需要下載2 spark-sql-kafka-0-10_2.12-3.2.1 jar文件。kafka-clients-2.1.1 jar。jar文件夾罐)
火花= SparkSession \
.builder \
. config(“火花。罐子”,os.getcwd() +”/罐/ spark-sql-kafka-0-10_2.12-3.2.1。jar“+”、“+ os.getcwd() + " \ /罐/ kafka-clients-2.1.1.jar”)
.appName \ (“Structured_Redpanda_WordCount”)
.getOrCreate ()
spark.conf.set (“spark.sql.shuffle。分區”,1)
在我的例子中解決,隻是下載jar。