嗨,我想mongo加載到s3使用pyspark 3.1.1通過閱讀他們拚花。
我的代碼片段:
df =火花\
.read \
.format \ (“mongo”)
.options (* * read_options) \
.load(模式=)
df = df.coalesce (64)
write_df_to_delta(火花,df s3_path)
read_count = df.count ()
s3_path inserted_df = read_delta_to_df(火花)
inserted_count = inserted_df.count ()
所有sparksession,蒙戈連接和s3路徑配置。我發現read_count inserted_df數不匹配,有一個缺口約300 - 1200行。但我寫δ沒有給我任何錯誤。我不知道為什麼出現這樣的情況?是什麼導致了它呢?
我可以看到形式農場主:“read_count”: 1373432,“inserted_count”: 1372492
def read_delta_to_df(
火花:SparkSession,
s3_path: str
)- > DataFrame:
log.info(“閱讀三角洲表從路徑{}df”.format (s3_path))
df =火花\
.read \
.format \(“δ”)
.load (s3_path)
返回df
def write_df_to_delta(
火花:SparkSession,
df: DataFrame,
s3_path: str,
模式:可選(str) =“覆蓋”,
partition_by:可選(聯盟[str、列表(str)]] =沒有
保留:可選(int) = 0
)- - - >沒有:
log.info(“寫作df三角洲表,{}“.format (s3_path))
df.printSchema ()
試一試:
df \
.write \
.format \(“δ”)
.mode \(模式)
.option (“overwriteSchema”,“真正的”)\
.save (
路徑= s3_path,
partitionBy = partition_by)
除了例外e:
日誌。錯誤(f”錯誤發生錯誤味精:{e}”)