我想火車和優化一個隨機森林。起初集群處理垃圾收集好,但幾小時後,集群分解垃圾收集顯著上升。
train_df有6365018條記錄的大小與31列數。在分裂data_df訓練和測試之前dataframes(都是引發dataframes)我寫一個檢查點位置,優化分區的數量與火花。sql(”“優化delta.位置”" ")和閱讀dataframe再次。
有人能幫我與進一步優化垃圾收集的方法嗎?我能想到的減少max_depth參數,但我努力完全理解這個問題。
#從pyspark.ml特性改變。從pyspark.ml進口MinMaxScaler特性。進口StringIndexer特性,從pyspark.ml VectorAssembler #模型和管道。分類從pyspark.ml進口RandomForestClassifier。評估從pyspark進口MulticlassClassificationEvaluator。毫升從pyspark.ml進口管道。優化進口CrossValidator ParamGridBuilder num_partitions = data_df.rdd.getNumPartitions()並行性= np.floor ((n_cores-16) / num_partitions) .astype (int) train_df test_df = data_df.randomSplit ((。8。2)種子= 42)index_output_cols = [x +“指數”在categorical_cols string_indexer = StringIndexer (inputCols = categorical_cols outputCols = index_output_cols handleInvalid =“跳過”)numeric_cols =[字段(字段數據類型)train_df。dtypes如果(((數據類型= =“雙”)| (“int”數據類型))&(字段! = "標簽")))assembler_inputs = index_output_cols + numeric_cols vec_assembler = VectorAssembler (inputCols = assembler_inputs outputCol =“特性”)標量= MinMaxScaler (inputCol =“特性”,outputCol =“ScaledFeatures”)階段= (string_indexer vec_assembler,標量)管道管道=階段(階段)pipeline_data = = pipeline.fit (train_df) scaled_train_data = pipeline_data.transform (train_df) .cache () scaled_test_data = pipeline_data.transform (test_df) .cache () max_depth_choices = [5、10、20] n_estimator_choices = [80、100、120] subset_choices = (“0.1”、“0.3”、“0.5”] impurtiy_choices =(“基尼”,“熵”)評估者= MulticlassClassificationEvaluator (labelCol =“標簽”,predictionCol =“預測”,metricName = f1)分類器= RandomForestClassifier (featuresCol =“ScaledFeatures labelCol =“標簽”,種子= 42,maxBins = max_cat_size) #創建網格,適用於火花隨機森林param_grid = (ParamGridBuilder () .addGrid(分類器。maxDepth max_depth_choices) .addGrid(分類器。numTrees n_estimator_choices) .addGrid(分類器。featureSubsetStrategy subset_choices) .addGrid(分類器。雜質,impurtiy_choices) .build mlflow ())。start_run(嵌套= True, run_name = CrossValidator): mlflow。autolog(log_models=True, log_model_signatures=False) cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid, numFolds=3, seed=42, parallelism=parallelism) cv_model = cv.fit(scaled_train_data) mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))
緩存昂貴,想保存數據到內存和磁盤(id沒有更多的空間留在內存)。我知道,在理論上,它應該改善,但它可以使事情變得更糟。我隻會把
scaled_train_data = pipeline_data.transform (train_df)
scaled_test_data = pipeline_data.transform (test_df)
然後我將分析分區的數量和規模scaled_train_data scaled_test_data然後重新分區,所以每個分區將在100年和200 MB。分區的數量我將乘法器的核心員工(例如,16核心的工人,我們有64個分區150 MB)。
分析使用:
df.rdd.getNumPartitions
df.rdd.partitions.length
df.rdd.partitions.size
重新分區,使用:
scaled_train_data = pipeline_data.transform (train_df) .repartition (optimal_number)
scaled_test_data = pipeline_data.transform (test_df) .repartition (optimal_number)