取消
顯示的結果
而不是尋找
你的意思是:

從MongoDB Structered Streamin阿特拉斯無法正確解析JSON

sharonbjehome
新的因素

你好,

我有一個表在MongoDB阿特拉斯,我試圖不斷讀到內存,然後將最終寫文件。然而,當我看內存中的表沒有正確的模式。

代碼:

從pyspark.sql。類型進口StructType、LongType StringType IntegerType

從pyspark進口SparkContext

從pyspark。流進口StreamingContext

從pyspark。sql進口SparkSession

從pyspark.sql。功能導入*

火花= SparkSession。構建器\

.appName \ (“pdm_messagesStream”)

. config(“火花。瓶”、“org.mongodb.spark: mongo-spark-connector: 10.0.5”) \

.getOrCreate ()

readSchema = (StructType () \

閥門(_id, StringType ()) \

閥門(deviceToken, StringType ()) \

閥門(‘消息’,StringType ()) \

閥門(消息id, StringType ()) \

閥門(createdAt, StringType ()) \

閥門(createdAtEpochSeconds, StringType ())

)

dataStreamWriter =(火花。readStream \

.format \ (“mongodb”)

.option (“spark.mongodb.connection。uri”、“mongodb + srv: / / xxxx@ * * * / ? retryWrites = true&readPreference = secondary&readPreferenceTags = nodeType: ANALYTICS&w \ =多數”)

.option (“spark.mongodb。數據庫”、“數據”)\

.option (“spark.mongodb。收集”、“消息”)\

.option (“forceDeleteTempCheckpointLocation”,“真正的”)\

. schema (readSchema)

.load () \

.writeStream \

.format \(“內存”)

.queryName \(“信息”)

.trigger(連續=“1秒”)

)

查詢= dataStreamWriter.start ()

結果從spark.table (“pdm_messages”),告訴(截斷= False):

image.png任何幫助將不勝感激。

謝謝

沙龍

1回複1

Debayan
尊敬的貢獻者三世
尊敬的貢獻者三世

嗨@sharonbjehome,這必須通過支持徹底檢查票,你跟進

:https://docs.m.eheci.com/external-data/mongodb.html

同時,請檢查與mongodb支持,這是在工作嗎?

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map