取消
顯示的結果
而不是尋找
你的意思是:

任務不是序列化:. io .NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

Sandesh87
新的貢獻者三世

我有getS3Object函數(json)位於aws s3對象

client_connect擴展對象序列化{val s3_get_path = " / dbfs / mnt / s3response“def getS3Objects (s3ObjectName:字符串,s3Client: AmazonS3):字符串s = {val objectKey = " $ {s3ObjectName} " val inputS3Stream = s3Client。getObject (myS3Bucket objectKey)。getObjectContent val inputS3String = IOUtils。toString (inputS3Stream,“utf - 8”) val filePath = s“$ {s3_get_path} / $ {objectKey} " val =新文件(filePath) val fileWriter = new fileWriter(文件)val bw = new BufferedWriter (fileWriter) bw.write (inputS3String) bw.close () fileWriter.close () inputS3String}}

攝取使用消息流框架

源流dataframe source_df的讀取從azure事件中心看起來像下麵

身體| |

| 8 c44f2715ab81c16ecb31d527e18465d.json ~ 2021-05-26 ~ 13-14-56 ~哦|

| a4f9e914c1a40e5828b0eb129b1234b2.json ~ 2022 = 05-09 ~ 15-12-22 ~ |

“身體”列包含字符串值由“~”分隔開的,第一個元素的對象id作為參數傳遞到getS3Object函數

該函數的第二個參數是S3client用於連接aws S3是一個可序列化的類內定義的。

最終類s3clientBuild()擴展了可序列化的{def s3connection (AccessKey:字符串,SecretKey: String) = {val clientRegion:區域=區域。US_EAST_1 val信譽= new BasicAWSCredentials (AccessKey SecretKey) AmazonS3ClientBuilder.standard () .withRegion (clientRegion) .withCredentials(新AWSStaticCredentialsProvider(信譽)).build ()}}

val AccessKey = dbutils.secrets。得到(=“範圍”,範圍鍵=“AccessKey-ID”)

val SecretKey = dbutils.secrets。得到(=“範圍”,範圍鍵=“AccessKey-Secret”)

寫流:

val streamWriter = source_df .writeStream .queryName .option (“Write_stream”) (“checkpointLocation”, chk_pt) .trigger(觸發器。ProcessingTime(3秒)).outputMode(“追加”).foreachBatch ((batchDF: DataFrame batchId:長)= > {batchDF.persist嚐試()val object_df = batchDF.select(分裂(坳(“身體”)、“~”).getItem (0)。as (“ObjectID”)) val df_response = object_df.repartition (2)。mapPartitions(迭代器= > {val api_connect = new s3clientBuild () val s3client = api_connect。s3connection (AccessKey SecretKey) val resp =迭代器。地圖(行= > {val name = cli_connector.getS3Objects (row.getString (0) s3client)(名稱)})resp}) .toDF(“價值”).select (from_json(“價值”.cast美元(“字符串”),MySchema)作為“字段”).select(“。*”美元)df_response.count () batchDF.unpersist()}與{成功(_)= >案件失敗(e) = >{扔e}})

然而我得到以下錯誤信息:-

任務不是序列化:. io .NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter序列化stack: - object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f) - field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter) - object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda)

該如何解決呢?

2回答2

werners1
尊敬的貢獻者三世

@Sandesh Puligundla嗯很難說,但我確信那是因為你寫的對象。

請記住,引發分布。

你可能想查看這些鏈接:

https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-s..。

https://stackoverflow.com/questions/40596871/how-spark-handles-object

不是一個實際的回答你的問題,對不起。但是這個錯誤很難查明(對我而言,希望一些好的程序員可以解決這個問題)。

匿名
不適用

嘿@Sandesh Puligundla

希望一切都好!

隻是想檢查如果你能解決你的問題,你會很高興分享解決方案或答案標記為最佳?其他的請讓我們知道如果你需要更多的幫助。

我們很想聽到你的聲音。

謝謝!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map