取消
顯示的結果
而不是尋找
你的意思是:

我整個代碼驅動節點上運行,我想讓我的代碼運行在工作節點的內存驅動節點不是筋疲力盡。請告訴我改進我的代碼。我經常引發崩潰時,把數據從s3是巨大的。

uzairm
新的貢獻者三世

我運行過程中有4個步驟。

  1. 查詢s3文件路徑從發電機數據庫根據用戶給出的某些參數。(功能由客戶提供,隻需要導入)。返回一個列表的文件
  2. 檢查這些文件路徑已經查詢。得到不同的文件和δ表附加到一個文件。
  3. 早些時候從s3獲取數據文件路徑查詢(功能由客戶提供,隻需要導入,並給文件路徑作為參數)。返回一個對象列表,其中的關鍵是“時間戳”,值為“pd.DataFrame”。
  4. 我連接的所有dataframes列表中的所有對象並將它附加到dataframe三角洲表。

代碼:

def querying_dynamodb (start_date end_date): pitd_objects = [] wmp_file_meta_data = [] query_timestamp1 = time.time () query_resp = perform_multipart_accel_data_query (env, id、start_date end_date) #從dynamoDb遍曆查詢數據並把wmp_metadata列表。這將是用於避免數據重疊。如果len (query_resp) ! = 0:為響應query_resp: id_new = int(響應[" id "] [' N ']) wmp_file_path =響應[' file_path '] [S] accel_data_file_path =響應[' accel_data '] [S] ts =響應[“時間戳”][' N '] wmp_file_meta_data.append ((query_uuid、start_date end_date, id_new, wmp_file_path)):返回([],”)# dbutils.notebook.exit (“True”) query_timestamp2 = time.time () query_difference = (query_timestamp2 - query_timestamp1)返回(wmp_file_meta_data) def get_distinct_wmp_files (wmp_file_meta_data):列= [“uuid”、“start_date”,“end_date”、“id”、“wmp_file_path”] dataframe =火花。createDataFrame (wmp_file_meta_data列)table_name = ' wmp_metadata_temp_ + str (id) # dataframe轉化為三角洲表dataframe.persist (StorageLevel.MEMORY_AND_DISK) dataframe.createOrReplaceTempView (table_name) new_wmp_files =火花。sql (“SELECT *{}中不存在(選擇1從wmp_metadata_partitioned {}。id = wmp_metadata_partitioned。id和{}。wmp_file_path = wmp_metadata_partitioned.wmp_file_path)“.format (table_name、table_name table_name)) #不同買理財產品買文件,避免數據重複返回new_wmp_files def convert_to_pitd_wout_collect (wmp_file_meta_data):““”功能將買理財產品買文件PITD參數:wmp_files:列表輸出:PITD對象:列表”“”打印(“PITD轉換”)打印()pitd_objects = [] pitd_timestamp1 = time.time wmp_file_meta_data()文件:試題:#抓取PITD對象從s3和保存列表。# wmp_file_path =文件(“wmp_file_path”) ts = int (files.split (“/”) [1] .split (' . ') [0]) pitd = get_pitd_for_file_path (file_path =文件、data_retriever = s3_dr timestamp = ts, id = id) pitd_objects.append (pitd.to_dict ()) botocore.exceptions除外。ClientError e:如果e。反應(“錯誤”)(“代碼”)= = " 404 ":打印(“對象不存在。”)pitd_timestamp2 = time.time () pitd_difference = pitd_timestamp2 - pitd_timestamp1打印(" PITD轉換成功! ")返回(pitd_objects) def process_pitd_objects (time_data): section_frames =[]我,部分列舉(time_data):嚐試:td_df = pd.DataFrame.from_dict (time_data[節])td_df。指數= td_df。在dex + int(section) td_df[['a1', 'a2', 'a3', 'roll', 'pitch']] = td_df[['a1', 'a2', 'a3', 'roll', 'pitch']].astype('float64') section_frames.append(td_df) except Exception as e: pass if section_frames: complete_frame = pd.concat(section_frames) complete_frame["index"] = complete_frame.index complete_frame["id"] = id complete_frame["uuid"] = query_uuid return complete_frame def process_dataframes(pitd_objects, id, uuid): if pitd_objects: print("Processing PITD objects...") pitd_objects_rdd = sc.parallelize(pitd_objects) section_frames_rdd = pitd_objects_rdd.map(process_pitd_objects) print("Processing completed. Concatenating dataframes....") # Flatten the RDD of lists into an RDD of DataFrames result_df = section_frames_rdd.treeReduce(lambda x, y: pd.concat([x,y])) print("Concatenation completed. Dumping to delta table...") status = dump_accel_data(spark.createDataFrame(result_df)) if status: print("Dumped succesfully...") else: print("Dumping Failure.") return spark.createDataFrame(result_df) wmp_files_paths = querying_dynamodb(start_date, end_date) new_wmp_files = get_distinct_wmp_files(wmp_files_paths) # returns a pyspark dataframe wmp_file_list = new_wmp_files.rdd.map(lambda x: x.wmp_file_path).collect() # convert pyspark dataframe column (wmp_file_path) to a list pitd_objects = convert_to_pitd_wout_collect(wmp_file_list) process_dataframes(pitd_objects, id, query_uuid)

這個代碼是在一個筆記本,筆記本的多個(閱讀:數百)實例通過threadpool執行人在python中並行運行。我的火花碰撞數據太多了。我該怎樣才能提高代碼?

集群細節:司機:i3.4xlarge·工人:c4.4xlarge·4 - 8工人·點播和現貨·回落至11.3按需·LTS(包括Apache火花3.3.0,Scala 2.12)·us-east-1a(12-20 DBU)

2回答2

匿名
不適用

@uzair穆斯塔法:我給一個框架考慮基於並行S3文件處理,火花的緩存功能和使用三角洲湖的時間旅行的能力,請看看這個可以幫助你開始

從pyspark.sql。函數從pyspark.sql進口坳。類型進口StructType、StructField StringType # Define S3的模式文件模式= StructType ([StructField(“時間戳”,StringType ()), StructField(“數據”,StringType()))) #讀S3的列表文件路徑從DynamoDB file_paths = read_file_paths_from_dynamo_db() #過濾器的文件路徑已經delta_table = DeltaTable處理。forPath(火花,“delta_table_path”) processed_files = delta_table.toDF () .select (file_path) .distinct () file_paths = f (f, f file_paths如果不是processed_files] #並行化文件處理file_paths_rdd = sc.parallelize (file_paths, len (file_paths)) data_rdd = file_paths_rdd。地圖(λf: (f, read_data_from_s3 (f))) data_df = data_rdd。toDF ([“file_path”、“數據”])。選擇(坳(“file_path”),坳(data.timestamp) .alias(“時間戳”),坳(data.data) .alias(“數據”))#寫數據到三角洲表data_df.write.format .mode(“δ”)(“追加”).save (“delta_table_path”)

在這個例子中,我們從DynamoDB讀取S3的列表文件路徑和過濾的文件已經通過查詢一個三角洲表處理。然後,我們並行化文件處理使用火花的並行化方法和映射函數。最後,我們結果寫入一個三角洲表使用寫方法。注意read_data_from_s3函數被認為是客戶提供的,並且應該被修改以並行處理S3數據。

Vartika
主持人
主持人

嗨@uzair穆斯塔法

謝謝你發布你的問題在我們的社區!我們很高興幫助你。

做@Suteja卡努裏人的回答有幫助嗎?如果是這樣,你會很高興它標記為最好?

這將有助於其他社區成員在未來可能也有類似的問題。謝謝你的參與,讓我們知道如果你需要任何進一步的援助!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map