我難以有效地讀取和解析Pyspark大量流文件!
上下文這是流的模式在JSON文件,我正在讀。空格編輯為保密的目的。
根|——location_info:數組(nullable = true) | |——元素:結構(containsNull = true) | | |——restaurant_type:字符串(nullable = true) | | | | | | | | |——other_data:數組(nullable = true) | | | |——元素:結構(containsNull = true) | | | | | - other_data_1字符串(nullable = true) | | | | |——other_data_2:字符串(nullable = true) | | | | |——other_data_3:字符串(nullable = true) | | | | |——other_data_4:字符串(nullable = true) | | | | |——other_data_5:字符串(nullable = true) | | | | | |——緯度:字符串(nullable = true) | | |——經度:字符串(nullable = true) | | |——時區:字符串(nullable = true) |——restaurant_id:字符串(nullable = true)
當前讀取和解析的方法(但工作時間太長)
s3: / / bucket_name /生/ 2020/03/05/04 /文件-流- 6 - 2020 - 03 - 05 - 04 - 01 - 04 - 123 - b978 - 2 - e2b - 5672 fa243fs4aeb4
。因此我讀它在作為JSON Pyspark(不確定我會讀它在什麼呢?)' ' '
#閱讀多個文件的dir source_df_1 = spark.read.json (sc.wholeTextFiles (file_path / *) . values ()。flatMap(λx: x .replace ({”restaurant_id ', ' \ n {“restaurant_id) .split (' \ n '))) #在這裏爆炸restaurant_id,和嵌套數據exploded_source_df_1 = source_df_1.select(坳(“restaurant_id”),爆炸(坳(location_info)) .alias (location_info)) #通過SQL操作:這將解決這個問題解析exploded_source_df_1.createOrReplaceTempView (“result_1”)subset_data_1 =火花。sql(“選擇restaurant_id、location_infos.latitude location_infos.longitude, location_infos。從result_1“時區)
' ' '
我希望幫助的東西:. values ()。flatMap(λx: x。取代(“{”restaurant_id”、“\ n”{restaurant_id”)
本身就是一種行動如果我叫堅持()結束時似乎重做整個讀?您可以參考這個線程如何我來到這個解決方案首先:鏈接。非常感謝你的時間
我感興趣的是看到別人想出了。目前我使用Json。normalize()然後采取任何額外的嵌套聲明和使用一個循環- > re-combine拉出來。