取消
顯示的結果
而不是尋找
你的意思是:

Dataframe行失蹤write_to_delta read_from_delta

mimezzz
貢獻者

嗨,我想mongo加載到s3使用pyspark 3.1.1通過閱讀他們拚花。

我的代碼片段:

df =火花\

.read \

.format \ (“mongo”)

.options (* * read_options) \

.load(模式=)

df = df.coalesce (64)

write_df_to_delta(火花,df s3_path)

read_count = df.count ()

s3_path inserted_df = read_delta_to_df(火花)

inserted_count = inserted_df.count ()

所有sparksession,蒙戈連接和s3路徑配置。我發現read_count inserted_df數不匹配,有一個缺口約300 - 1200行。但我寫δ沒有給我任何錯誤。我不知道為什麼出現這樣的情況?是什麼導致了它呢?

我可以看到形式農場主:“read_count”: 1373432,“inserted_count”: 1372492

def read_delta_to_df(

火花:SparkSession,

s3_path: str

)- > DataFrame:

log.info(“閱讀三角洲表從路徑{}df”.format (s3_path))

df =火花\

.read \

.format \(“δ”)

.load (s3_path)

返回df

def write_df_to_delta(

火花:SparkSession,

df: DataFrame,

s3_path: str,

模式:可選(str) =“覆蓋”,

partition_by:可選(聯盟[str、列表(str)]] =沒有

保留:可選(int) = 0

)- - - >沒有:

log.info(“寫作df三角洲表,{}“.format (s3_path))

df.printSchema ()

試一試:

df \

.write \

.format \(“δ”)

.mode \(模式)

.option (“overwriteSchema”,“真正的”)\

.save (

路徑= s3_path,

partitionBy = partition_by)

除了例外e:

日誌。錯誤(f”錯誤發生錯誤味精:{e}”)

1接受解決方案

接受的解決方案

mimezzz
貢獻者

所以我認為我這裏已經揭開了謎底:grinning_face:這是與保留配置。通過設置retentionEnabled真實rention小時0,我們有點失去了幾行第一個文件他們錯了文件從上節課和剛吸塵。進一步閱讀,請參見:https://learn.microsoft.com/en-us/azure/databricks/kb/delta/data-missing-vacuum-parallel-write

在原帖子查看解決方案

8回答說8

匿名
不適用

一般來說,避免rm表是一個好主意δ的事務日誌可以防止最終一致性問題在大多數情況下;然而,當您刪除並重新創建一個表在很短的時間內,不同版本的事務日誌可以閃爍的存在。

相反,我建議使用事務元素提供的三角洲。例如,要覆蓋一個表中的數據,您可以:

df。write.format(“δ”).mode(“覆蓋”).save(“/δ/事件”)

嗨@May Olszewski謝謝回複。我使用的模式是“覆蓋”開始了,我忘了把它放在上麵演示代碼抱歉的預定義的。還有其他建議嗎?之前我也做了vacume該目錄編寫新的增量表

Debayan
尊敬的貢獻者三世
尊敬的貢獻者三世

嗨@mime liu以外你還有其他錯誤消息報道的?

嗨Debayan thruout沒有沒有錯誤的報道

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map