我運行過程中有4個步驟。
代碼:
def querying_dynamodb (start_date end_date): pitd_objects = [] wmp_file_meta_data = [] query_timestamp1 = time.time () query_resp = perform_multipart_accel_data_query (env, id、start_date end_date) #從dynamoDb遍曆查詢數據並把wmp_metadata列表。這將是用於避免數據重疊。如果len (query_resp) ! = 0:為響應query_resp: id_new = int(響應[" id "] [' N ']) wmp_file_path =響應[' file_path '] [S] accel_data_file_path =響應[' accel_data '] [S] ts =響應[“時間戳”][' N '] wmp_file_meta_data.append ((query_uuid、start_date end_date, id_new, wmp_file_path)):返回([],”)# dbutils.notebook.exit (“True”) query_timestamp2 = time.time () query_difference = (query_timestamp2 - query_timestamp1)返回(wmp_file_meta_data) def get_distinct_wmp_files (wmp_file_meta_data):列= [“uuid”、“start_date”,“end_date”、“id”、“wmp_file_path”] dataframe =火花。createDataFrame (wmp_file_meta_data列)table_name = ' wmp_metadata_temp_ + str (id) # dataframe轉化為三角洲表dataframe.persist (StorageLevel.MEMORY_AND_DISK) dataframe.createOrReplaceTempView (table_name) new_wmp_files =火花。sql (“SELECT *{}中不存在(選擇1從wmp_metadata_partitioned {}。id = wmp_metadata_partitioned。id和{}。wmp_file_path = wmp_metadata_partitioned.wmp_file_path)“.format (table_name、table_name table_name)) #不同買理財產品買文件,避免數據重複返回new_wmp_files def convert_to_pitd_wout_collect (wmp_file_meta_data):““”功能將買理財產品買文件PITD參數:wmp_files:列表輸出:PITD對象:列表”“”打印(“PITD轉換”)打印()pitd_objects = [] pitd_timestamp1 = time.time wmp_file_meta_data()文件:試題:#抓取PITD對象從s3和保存列表。# wmp_file_path =文件(“wmp_file_path”) ts = int (files.split (“/”) [1] .split (' . ') [0]) pitd = get_pitd_for_file_path (file_path =文件、data_retriever = s3_dr timestamp = ts, id = id) pitd_objects.append (pitd.to_dict ()) botocore.exceptions除外。ClientError e:如果e。反應(“錯誤”)(“代碼”)= = " 404 ":打印(“對象不存在。”)pitd_timestamp2 = time.time () pitd_difference = pitd_timestamp2 - pitd_timestamp1打印(" PITD轉換成功! ")返回(pitd_objects) def process_pitd_objects (time_data): section_frames =[]我,部分列舉(time_data):嚐試:td_df = pd.DataFrame.from_dict (time_data[節])td_df。指數= td_df。在dex + int(section) td_df[['a1', 'a2', 'a3', 'roll', 'pitch']] = td_df[['a1', 'a2', 'a3', 'roll', 'pitch']].astype('float64') section_frames.append(td_df) except Exception as e: pass if section_frames: complete_frame = pd.concat(section_frames) complete_frame["index"] = complete_frame.index complete_frame["id"] = id complete_frame["uuid"] = query_uuid return complete_frame def process_dataframes(pitd_objects, id, uuid): if pitd_objects: print("Processing PITD objects...") pitd_objects_rdd = sc.parallelize(pitd_objects) section_frames_rdd = pitd_objects_rdd.map(process_pitd_objects) print("Processing completed. Concatenating dataframes....") # Flatten the RDD of lists into an RDD of DataFrames result_df = section_frames_rdd.treeReduce(lambda x, y: pd.concat([x,y])) print("Concatenation completed. Dumping to delta table...") status = dump_accel_data(spark.createDataFrame(result_df)) if status: print("Dumped succesfully...") else: print("Dumping Failure.") return spark.createDataFrame(result_df) wmp_files_paths = querying_dynamodb(start_date, end_date) new_wmp_files = get_distinct_wmp_files(wmp_files_paths) # returns a pyspark dataframe wmp_file_list = new_wmp_files.rdd.map(lambda x: x.wmp_file_path).collect() # convert pyspark dataframe column (wmp_file_path) to a list pitd_objects = convert_to_pitd_wout_collect(wmp_file_list) process_dataframes(pitd_objects, id, query_uuid)
這個代碼是在一個筆記本,筆記本的多個(閱讀:數百)實例通過threadpool執行人在python中並行運行。我的火花碰撞數據太多了。我該怎樣才能提高代碼?
集群細節:司機:i3.4xlarge·工人:c4.4xlarge·4 - 8工人·點播和現貨·回落至11.3按需·LTS(包括Apache火花3.3.0,Scala 2.12)·us-east-1a(12-20 DBU)
@uzair穆斯塔法:我給一個框架考慮基於並行S3文件處理,火花的緩存功能和使用三角洲湖的時間旅行的能力,請看看這個可以幫助你開始
從pyspark.sql。函數從pyspark.sql進口坳。類型進口StructType、StructField StringType # Define S3的模式文件模式= StructType ([StructField(“時間戳”,StringType ()), StructField(“數據”,StringType()))) #讀S3的列表文件路徑從DynamoDB file_paths = read_file_paths_from_dynamo_db() #過濾器的文件路徑已經delta_table = DeltaTable處理。forPath(火花,“delta_table_path”) processed_files = delta_table.toDF () .select (file_path) .distinct () file_paths = f (f, f file_paths如果不是processed_files] #並行化文件處理file_paths_rdd = sc.parallelize (file_paths, len (file_paths)) data_rdd = file_paths_rdd。地圖(λf: (f, read_data_from_s3 (f))) data_df = data_rdd。toDF ([“file_path”、“數據”])。選擇(坳(“file_path”),坳(data.timestamp) .alias(“時間戳”),坳(data.data) .alias(“數據”))#寫數據到三角洲表data_df.write.format .mode(“δ”)(“追加”).save (“delta_table_path”)
在這個例子中,我們從DynamoDB讀取S3的列表文件路徑和過濾的文件已經通過查詢一個三角洲表處理。然後,我們並行化文件處理使用火花的並行化方法和映射函數。最後,我們結果寫入一個三角洲表使用寫方法。注意read_data_from_s3函數被認為是客戶提供的,並且應該被修改以並行處理S3數據。