我解決它使用函數monotonically_increasing_id和邏輯設置列名。
這樣做是必要的Java 1.8,因為提高誤差函數的收集()在Java 11。
df = df。withColumn(“指數”,F.monotonically_increasing_id())關口= df。列值= df。過濾器(指數= 0).collect() #定義跳過行我在範圍(len(關口)):如果關口[我]! =“指數”:df = df.select (df.columns)。withColumnRenamed(關口[我],值[0][我])
根據文檔的
spark.read.csv (…)
的路徑
參數可以是一個字符串抽樣:
路徑:str列表字符串或字符串列表,輸入路徑(s),或抽樣的字符串存儲CSV行。
,你可以使用
spark.sparkContext.textFile (…)
結合zipWithIndex (…)
執行必要的行過濾。把東西一起這可能看起來如下:
n_skip_rows = ?row_rdd =火花。sparkContext .textFile (your_csv_file) \ .zipWithIndex () \ .filter(λ行:行[1]> = n_skip_rows) \ . map(λ行:行[0])df = spark_session.read。csv (row_rdd…)
希望有幫助。