取消
顯示的結果
而不是尋找
你的意思是:

Dataframe行失蹤write_to_delta read_from_delta

mimezzz
貢獻者

嗨,我想mongo加載到s3使用pyspark 3.1.1通過閱讀他們拚花。

我的代碼片段:

df =火花\

.read \

.format \ (“mongo”)

.options (* * read_options) \

.load(模式=)

df = df.coalesce (64)

write_df_to_delta(火花,df s3_path)

read_count = df.count ()

s3_path inserted_df = read_delta_to_df(火花)

inserted_count = inserted_df.count ()

所有sparksession,蒙戈連接和s3路徑配置。我發現read_count inserted_df數不匹配,有一個缺口約300 - 1200行。但我寫δ沒有給我任何錯誤。我不知道為什麼出現這樣的情況?是什麼導致了它呢?

我可以看到形式農場主:“read_count”: 1373432,“inserted_count”: 1372492

def read_delta_to_df(

火花:SparkSession,

s3_path: str

)- > DataFrame:

log.info(“閱讀三角洲表從路徑{}df”.format (s3_path))

df =火花\

.read \

.format \(“δ”)

.load (s3_path)

返回df

def write_df_to_delta(

火花:SparkSession,

df: DataFrame,

s3_path: str,

模式:可選(str) =“覆蓋”,

partition_by:可選(聯盟[str、列表(str)]] =沒有

保留:可選(int) = 0

)- - - >沒有:

log.info(“寫作df三角洲表,{}“.format (s3_path))

df.printSchema ()

試一試:

df \

.write \

.format \(“δ”)

.mode \(模式)

.option (“overwriteSchema”,“真正的”)\

.save (

路徑= s3_path,

partitionBy = partition_by)

除了例外e:

日誌。錯誤(f”錯誤發生錯誤味精:{e}”)

8回答說8

Hubert_Dudek1
尊敬的貢獻者三世

代碼是正確的。我能想象的唯一問題是,s3_path,左(像一些丟失的分區)。我認為這將是更好地注冊三角洲metastore和使用.write.table (“table_name”),而不是使用的路徑。

嗨@Hubert杜德克謝謝你的回複,是的,也許值得一試,我也考慮刪除格式(“δ”)如果問題持續下去,診斷是否這是一個delta相關問題

mimezzz
貢獻者

還沒找到一個答案,剛從假期回來。如果我發現任何事業將繼續挖掘將更新。

mimezzz
貢獻者

所以我認為我這裏已經揭開了謎底:grinning_face:這是與保留配置。通過設置retentionEnabled真實rention小時0,我們有點失去了幾行第一個文件他們錯了文件從上節課和剛吸塵。進一步閱讀,請參見:https://learn.microsoft.com/en-us/azure/databricks/kb/delta/data-missing-vacuum-parallel-write

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map