你好阿,
基於數據我煮熟的代碼將讀取文件並生成一個dataframe刷新類型將被添加一個新列和其他列將被分成不同的列
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
#進口Libararies
從pyspark.sql。功能導入input_file_name、點燃、分裂,上校
#宣布UDF記錄進行排序(後來用於刪除標題)
def dfZipWithIndex (df,抵消= 1,colName =“rowId”):
“‘
列舉了dataframe行是本地訂單,就像rdd.ZipWithIndex(),但dataframe
和保護模式
:param df: dataframe來源
:param抵消:調整zipWithIndex()的索引
:param colName:索引列的名稱
“‘
new_schema = StructType (
[StructField (colName LongType(),真的)]#新添加字段在前麵
+ df.schema。字段#以前的模式
)
zipped_rdd = df.rdd.zipWithIndex ()
new_rdd = zipped_rdd。地圖(λ參數:((args[1] +偏移量)+列表(args [0])))
返回的火花。createDataFrame (new_rdd new_schema)
#讀取數據文件
df = sqlContext.read.format (com.databricks.spark.csv) .load (“/ FileStore / test.csv”)
#獲取第一行刷新類型
x =點燃(str (df.limit (1) .collect () [0] [0]))
#添加刷新類型作為一個新列
df = df.withColumn (“RefreshType”, x)
#刪除/過濾器從DATAFRAME刷新類型
df2 = dfZipWithIndex (df)
df2 = df2.filter (df2.rowId > 1) .drop (“rowId”)
df2.show ()
#分裂UDF
split_col = (df2分裂。_c0 ' \ \ | ',)
#最後分裂ID和名稱列並創建一個數據幀
df3 = df2.withColumn (“ID”, split_col.getItem (0)) \
.withColumn(“名字”,split_col.getItem (1)) \
.drop(坳(“_c0”))
df3.show ()
想法是,最後你可以保存這個dataframe到一個臨時表,然後它會方便你做的if - else語句。我目前的理解是,你可能會寫完整的原始數據集就像完整的數據到一個單獨的列成表(如下麵)
讓我知道如果我們有不同的理解。
你好阿,
基於數據我煮熟的代碼將讀取文件並生成一個dataframe刷新類型將被添加一個新列和其他列將被分成不同的列
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
#進口Libararies
從pyspark.sql。功能導入input_file_name、點燃、分裂,上校
#宣布UDF記錄進行排序(後來用於刪除標題)
def dfZipWithIndex (df,抵消= 1,colName =“rowId”):
“‘
列舉了dataframe行是本地訂單,就像rdd.ZipWithIndex(),但dataframe
和保護模式
:param df: dataframe來源
:param抵消:調整zipWithIndex()的索引
:param colName:索引列的名稱
“‘
new_schema = StructType (
[StructField (colName LongType(),真的)]#新添加字段在前麵
+ df.schema。字段#以前的模式
)
zipped_rdd = df.rdd.zipWithIndex ()
new_rdd = zipped_rdd。地圖(λ參數:((args[1] +偏移量)+列表(args [0])))
返回的火花。createDataFrame (new_rdd new_schema)
#讀取數據文件
df = sqlContext.read.format (com.databricks.spark.csv) .load (“/ FileStore / test.csv”)
#獲取第一行刷新類型
x =點燃(str (df.limit (1) .collect () [0] [0]))
#添加刷新類型作為一個新列
df = df.withColumn (“RefreshType”, x)
#刪除/過濾器從DATAFRAME刷新類型
df2 = dfZipWithIndex (df)
df2 = df2.filter (df2.rowId > 1) .drop (“rowId”)
df2.show ()
#分裂UDF
split_col = (df2分裂。_c0 ' \ \ | ',)
#最後分裂ID和名稱列並創建一個數據幀
df3 = df2.withColumn (“ID”, split_col.getItem (0)) \
.withColumn(“名字”,split_col.getItem (1)) \
.drop(坳(“_c0”))
df3.show ()
想法是,最後你可以保存這個dataframe到一個臨時表,然後它會方便你做的if - else語句。我目前的理解是,你可能會寫完整的原始數據集就像完整的數據到一個單獨的列成表(如下麵)
讓我知道如果我們有不同的理解。