取消
顯示的結果
而不是尋找
你的意思是:

錯誤”. lang。NoSuchMethodError: org.apache.spark.sql。AnalysisException“在寫數據事件流的中心。它工作正常,如果我寫磚表到另一個數據

Rahul_Tiwary
新的貢獻者二世

進口org.apache.spark.sql._

進口scala.collection.JavaConverters._

進口com.microsoft.azure.eventhubs._

進口java.util.concurrent._

進口scala.collection.immutable._

進口org.apache.spark.eventhubs._

進口scala.concurrent.Future

進口scala.concurrent.ExecutionContext.Implicits.global

進口org.apache.spark.sql.streaming。{OutputMode,觸發}

進口scala.concurrent.duration._

var testConnectionstr = "連接字符串"

val參數= EventHubsConf (testConnectionstr) .setMaxEventsPerTrigger (5)

val df = spark.readStream.format(“δ”).table (“gold.redemption”)

val ds = df

.selectExpr (“RedemptionId”、“ProgramId”、“ClaimsPK_CL_FILEID”)

.writeStream

.format (“eventhubs”)

.options (parameters.toMap)

.option (“startingOffsets”、“最新”)

.option (“checkpointLocation”、“路徑/ /檢查點/ dir”)

.start ()

錯誤日誌——

ava.lang。NoSuchMethodError: org.apache.spark.sql.AnalysisException。< init > (Ljava / lang / String; Lscala /選項;Lscala /選項;Lscala /選項;Lscala /選項;)V

在org.apache.spark.sql.eventhubs.EventHubsWriter。美元anonfun validateQuery 2美元(EventHubsWriter.scala: 53)

scala.Option.getOrElse (Option.scala: 189)

org.apache.spark.sql.eventhubs.EventHubsWriter .validateQuery美元(EventHubsWriter.scala: 53)

org.apache.spark.sql.eventhubs.EventHubsWriter .write美元(EventHubsWriter.scala: 70)

org.apache.spark.sql.eventhubs.EventHubsSink.addBatch (EventHubsSink.scala: 39)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runBatch美元17美元(MicroBatchExecution.scala: 805)

在org.apache.spark.sql.execution.SQLExecution。美元anonfun withCustomExecutionEnv 8美元(SQLExecution.scala: 240)

org.apache.spark.sql.execution.SQLExecution .withSQLConfPropagated美元(SQLExecution.scala: 388)

在org.apache.spark.sql.execution.SQLExecution。美元anonfun withCustomExecutionEnv 1美元(SQLExecution.scala: 187)

org.apache.spark.sql.SparkSession.withActive (SparkSession.scala: 973)

org.apache.spark.sql.execution.SQLExecution .withCustomExecutionEnv美元(SQLExecution.scala: 142)

org.apache.spark.sql.execution.SQLExecution .withNewExecutionId美元(SQLExecution.scala: 338)

org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runBatch美元16美元(MicroBatchExecution.scala: 803)

org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 320)

在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 318美元)

org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)

org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch (MicroBatchExecution.scala: 803)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStreamWithListener美元5美元(MicroBatchExecution.scala: 339)

org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution (MicroBatchExecution.scala: 904)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStreamWithListener美元2美元(MicroBatchExecution.scala: 336)

在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)

.options (parameters.toMap)

.option (“startingOffsets”、“最新”)

.option (“checkpointLocation”、“路徑/ /檢查點/ dir”)

.start ()

2回答2

Kaniz
社區經理
社區經理

嗨@Rahul Tiwary,你能重新審視檢查點目錄的路徑在實際代碼?

Gepap
新的貢獻者二世

dataframe寫需要有以下模式:

專欄|類型- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -身體(必需)|字符串或二進製partitionId(*可選)|字符串partitionKey(*可選)|字符串

這對我來說工作(pyspark版本):

df。withColumn(“身體”,F。to_json (F.struct (* df.columns)選項= {“ignoreNullFields”:假}))\ .select(身體)\ .write \ .format (eventhubs) \ .options (* * ehconf) \ .save ()

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map