取消
顯示的結果
而不是尋找
你的意思是:

數據從azure獲得閱讀時錯過了事件使用火花流中心

Rishi045
新的貢獻者二世

你好,

我麵臨一個問題的數據被錯過了。

我在讀azure事件的數據中心和壓扁後json數據我將它存儲在檢查機關文件,然後使用另一個磚筆記本δ表上執行合並操作通過添加一些etl列。

然而在某個地方的記錄越來越想念。

我已經安排了工作,每小時運行一次。

有人能幫幫我。

11日回複11

werners1
尊敬的貢獻者三世

你土地的事件中心數據未處理數據湖?如果是這樣,你可以檢查如果一切。
如果是這樣:檢查下一步等等。
如果你不保存原始數據,和一些運氣你仍在活動中心。

Rishi045
新的貢獻者二世

沒有數據處理之前datalake著陸

Rishi045
新的貢獻者二世

我找不到那些丟失的記錄數據湖

Rishi045
新的貢獻者二世

使用下麵的代碼:

參看= {}
df = spark.readStream.format (eventhubs) .options (* * conf) .load ()
dataDF = df.select(坳(“身體”).cast(“字符串”))
data = dataDF.select (json_tuple(坳(“身體”),“表”、“op_type”、“記錄”、“op_ts”)) \
.toDF(“表”、“op_type”、“記錄”、“op_ts”)
final_data = data.withColumn (“records_json from_json(坳(“記錄”),reqSchema))
final_data = final_data.select (
*(坳(“records_json。”+ field).alias(field) for field in reqSchema.fieldNames()],
坳(“op_type”),
坳(“op_ts”))
final_data.orderBy(坳(op_ts) .desc ())
final_data = final_data.dropDuplicates ([primaryKey])
final_data = final_data.distinct ()
final_data = final_data.drop (final_data.op_ts)
final_data = final_data.drop (final_data.op_type)
final_data.coalesce (1)。writeStream \
.format \“鋪”
.outputMode \(“追加”)
.option (“checkpointLocation”, checkPoint_url) \
.trigger \(一旦= True)
.start rawFilePath_url \
.awaitTermination ()
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map