取消
顯示的結果
而不是尋找
你的意思是:

你好專家——我是麵臨的一個技術問題磚SQL - if - else或CASE語句實現當試圖執行兩個獨立設置的查詢基於三角洲表的一個列的值。

Atul_Sharan
新的貢獻者二世

嗨,專家,

我閱讀一個管分隔的源文件第一行不包含數據但包含-替換或更新值顯示刷新類型如果全部刷新或插入。第二行是頭和實際列明智的數據從第三行開始。

我處理這個文件使用DF和保存數據到臨時三角洲表進行進一步處理。不過我麵臨問題if - else邏輯的實現,我可以執行兩個單獨的SQL操作基於三角洲的值替換或更新表列,我們可以使用t - SQL執行。

任何對社會/導致/可能的解決方案在這方麵將不勝感激。謝謝! !

1接受解決方案

接受的解決方案

User16826994569
新的貢獻者三世

你好阿,

基於數據我煮熟的代碼將讀取文件並生成一個dataframe刷新類型將被添加一個新列和其他列將被分成不同的列

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

#進口Libararies

從pyspark.sql。功能導入input_file_name、點燃、分裂,上校

#宣布UDF記錄進行排序(後來用於刪除標題)

def dfZipWithIndex (df,抵消= 1,colName =“rowId”):

“‘

列舉了dataframe行是本地訂單,就像rdd.ZipWithIndex(),但dataframe

和保護模式

:param df: dataframe來源

:param抵消:調整zipWithIndex()的索引

:param colName:索引列的名稱

“‘

new_schema = StructType (

[StructField (colName LongType(),真的)]#新添加字段在前麵

+ df.schema。字段#以前的模式

)

zipped_rdd = df.rdd.zipWithIndex ()

new_rdd = zipped_rdd。地圖(λ參數:((args[1] +偏移量)+列表(args [0])))

返回的火花。createDataFrame (new_rdd new_schema)

#讀取數據文件

df = sqlContext.read.format (com.databricks.spark.csv) .load (“/ FileStore / test.csv”)

#獲取第一行刷新類型

x =點燃(str (df.limit (1) .collect () [0] [0]))

#添加刷新類型作為一個新列

df = df.withColumn (“RefreshType”, x)

#刪除/過濾器從DATAFRAME刷新類型

df2 = dfZipWithIndex (df)

df2 = df2.filter (df2.rowId > 1) .drop (“rowId”)

df2.show ()

#分裂UDF

split_col = (df2分裂。_c0 ' \ \ | ',)

#最後分裂ID和名稱列並創建一個數據幀

df3 = df2.withColumn (“ID”, split_col.getItem (0)) \

.withColumn(“名字”,split_col.getItem (1)) \

.drop(坳(“_c0”))

df3.show ()

圖像

想法是,最後你可以保存這個dataframe到一個臨時表,然後它會方便你做的if - else語句。我目前的理解是,你可能會寫完整的原始數據集就像完整的數據到一個單獨的列成表(如下麵)

圖像

讓我知道如果我們有不同的理解。

在原帖子查看解決方案

5回複5

匿名
不適用

你好@Atul夏朗。我的名字是風笛手,我是一個主持人的磚。歡迎並感謝您的問題!讓我們給它一段時間,看看社區回應。如果有必要,我們會回到這個圈。

User16826994569
新的貢獻者三世

你好阿,

謝謝你的問題。進一步可以請你添加示例數據集作為一個例子。

我的理解是你的數據應該是這個樣子。

文件1

* * * * * *

取代

ID |名稱

1 |亞曆克斯

2 |詹姆斯

3 |史密斯

文件2

* * * * * *

UDPATE

ID |名稱

Alex Ho 1 |

2 |詹姆斯國王

幹杯

GS

User16826994569
新的貢獻者三世

你好阿,

基於數據我煮熟的代碼將讀取文件並生成一個dataframe刷新類型將被添加一個新列和其他列將被分成不同的列

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

#進口Libararies

從pyspark.sql。功能導入input_file_name、點燃、分裂,上校

#宣布UDF記錄進行排序(後來用於刪除標題)

def dfZipWithIndex (df,抵消= 1,colName =“rowId”):

“‘

列舉了dataframe行是本地訂單,就像rdd.ZipWithIndex(),但dataframe

和保護模式

:param df: dataframe來源

:param抵消:調整zipWithIndex()的索引

:param colName:索引列的名稱

“‘

new_schema = StructType (

[StructField (colName LongType(),真的)]#新添加字段在前麵

+ df.schema。字段#以前的模式

)

zipped_rdd = df.rdd.zipWithIndex ()

new_rdd = zipped_rdd。地圖(λ參數:((args[1] +偏移量)+列表(args [0])))

返回的火花。createDataFrame (new_rdd new_schema)

#讀取數據文件

df = sqlContext.read.format (com.databricks.spark.csv) .load (“/ FileStore / test.csv”)

#獲取第一行刷新類型

x =點燃(str (df.limit (1) .collect () [0] [0]))

#添加刷新類型作為一個新列

df = df.withColumn (“RefreshType”, x)

#刪除/過濾器從DATAFRAME刷新類型

df2 = dfZipWithIndex (df)

df2 = df2.filter (df2.rowId > 1) .drop (“rowId”)

df2.show ()

#分裂UDF

split_col = (df2分裂。_c0 ' \ \ | ',)

#最後分裂ID和名稱列並創建一個數據幀

df3 = df2.withColumn (“ID”, split_col.getItem (0)) \

.withColumn(“名字”,split_col.getItem (1)) \

.drop(坳(“_c0”))

df3.show ()

圖像

想法是,最後你可以保存這個dataframe到一個臨時表,然後它會方便你做的if - else語句。我目前的理解是,你可能會寫完整的原始數據集就像完整的數據到一個單獨的列成表(如下麵)

圖像

讓我知道如果我們有不同的理解。

Atul_Sharan
新的貢獻者二世

由於一噸Gurpreet建議方法有助於解決這個問題。感謝你的幫助! !

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map