取消
顯示的結果
而不是尋找
你的意思是:

如何保持數據的基於時間的局部集群在加入嗎?

Erik_L
因素二世

我有一群來自不同數據源的數據幀。他們都是時間序列數據的列時間戳,這是一個int32 Unix時間戳。我可以加入他們一起和另一個列join_idx這基本上是一個整數索引命令時間戳。當我加入這些數據幀在一起,生成的數據幀出故障了,其結果非常緩慢之間查詢。我需要能夠鬥時代non-rolling windows範圍查詢點的一部分。有沒有辦法,我可以使用磚定位基於時間戳將收集的數據?

df_all = (df_1。加入(df_2 =(“時間戳”,“join_idx”),如何=“內在”). join (df_3, =(“時間戳”,“join_idx”),如何=“內在”). join (df_4, =(“時間戳”,“join_idx”),如何=“內在”).withColumn (“id”,點燃(id))) .writeTo (joined_table) .createOrReplace ()

1接受解決方案

接受的解決方案

匿名
不適用

@Erik路易:

有效執行之間的查詢在你加入了數據幀,你可以試著用桶裝基於時間戳列的數據。用桶裝確保數據與時間戳是存儲在同一個文件,進行範圍查詢更快的火花可以跳過文件查詢的範圍之外。

這裏有一個例子如何鬥你的數據:

從pyspark.sql。功能導入桶#桶上的數據時間戳列到100桶num_buckets = 100 bucket_col =“timestamp_bucket”df_bucketed = df_all。withColumn (bucket_col桶(“時間戳”,num_buckets)) #寫鬥數據表df_bucketed.write.mode(“覆蓋”)。bucketBy (num_buckets bucket_col) .sortBy(“時間戳”).saveAsTable (bucketed_table) #查詢桶從pyspark.sql數據使用範圍查詢。從pyspark.sql窗口導入窗口。導入函數row_number w = Window.orderBy(“時間戳”)df_query = (spark.table .filter (“bucketed_table”)(“時間戳> = {start_time}和時間戳< = {end_time} ") .withColumn (“row_num row_number () .over (w)) .filter (“row_num {start_row}和{end_row} "))

在這個例子中,我們第一桶上的數據時間戳列成100桶使用桶函數。然後我們把桶使用bucketBy和sortBy數據表。最後,我們在桶上執行一係列查詢使用過濾和row_number數據。

請注意,您將需要調整大小的桶的數量,以適應您的數據和時間戳你查詢的範圍。您可能還需要嚐試不同的用桶裝策略為你的用例找到最佳的性能。

在原帖子查看解決方案

3回複3

匿名
不適用

@Erik路易:

如果數據幀有不同的時區,您可以使用磚的時區轉換函數將它們轉換為一個共同的時區。您可以使用from_utc_timestamp或to_utc_timestamp

函數將時間戳列UTC時間戳,然後使用date_format函數,將它轉換為一個時間戳字符串在一個特定的時區。

例如,如果您的數據幀在不同時區的時間戳,可以使用下麵的代碼來將它們轉換為一個共同的時區:

從pyspark.sql。功能導入from_utc_timestamp、to_utc_timestamp date_format #時間戳列轉換為UTC時間戳df_1_utc = df_1。withColumn (timestamp_utc, from_utc_timestamp (df_1。時間戳,df_1.timezone)) df_2_utc = df_2。withColumn (timestamp_utc, from_utc_timestamp (df_2。時間戳,df_2.timezone)) # Convert to a timestamp string in a particular time zone df_1_localized = df_1_utc.withColumn('timestamp_local', date_format(to_utc_timestamp(df_1_utc.timestamp_utc, 'America/Los_Angeles'), 'yyyy-MM-dd HH:mm:ss')) df_2_localized = df_2_utc.withColumn('timestamp_local', date_format(to_utc_timestamp(df_2_utc.timestamp_utc, 'America/Los_Angeles'), 'yyyy-MM-dd HH:mm:ss')) # Join the data frames on the localized timestamp column df_all = ( df_1_localized.join(df_2_localized, on=['timestamp_local', 'join_idx'], how="inner") .join(df_3, on=['timestamp_local', 'join_idx'], how="inner") .join(df_4, on=['timestamp_local', 'join_idx'], how="inner") .withColumn("id", lit(id)) ).writeTo("joined_table").createOrReplace()

這段代碼將時間戳列使用from_utc_timestamp UTC時間戳,然後將其轉換為一個局部使用to_utc_timestamp和date_format時間戳字符串。最後,它連接上的數據幀本地化時間戳列。

Erik_L
因素二世

我的道歉——似乎標題誤導你。我欣賞和徹底回答另一個問題,但我的問題是如何保持數據在本地文件集群效率之間的查詢。

匿名
不適用

@Erik路易:

有效執行之間的查詢在你加入了數據幀,你可以試著用桶裝基於時間戳列的數據。用桶裝確保數據與時間戳是存儲在同一個文件,進行範圍查詢更快的火花可以跳過文件查詢的範圍之外。

這裏有一個例子如何鬥你的數據:

從pyspark.sql。功能導入桶#桶上的數據時間戳列到100桶num_buckets = 100 bucket_col =“timestamp_bucket”df_bucketed = df_all。withColumn (bucket_col桶(“時間戳”,num_buckets)) #寫鬥數據表df_bucketed.write.mode(“覆蓋”)。bucketBy (num_buckets bucket_col) .sortBy(“時間戳”).saveAsTable (bucketed_table) #查詢桶從pyspark.sql數據使用範圍查詢。從pyspark.sql窗口導入窗口。導入函數row_number w = Window.orderBy(“時間戳”)df_query = (spark.table .filter (“bucketed_table”)(“時間戳> = {start_time}和時間戳< = {end_time} ") .withColumn (“row_num row_number () .over (w)) .filter (“row_num {start_row}和{end_row} "))

在這個例子中,我們第一桶上的數據時間戳列成100桶使用桶函數。然後我們把桶使用bucketBy和sortBy數據表。最後,我們在桶上執行一係列查詢使用過濾和row_number數據。

請注意,您將需要調整大小的桶的數量,以適應您的數據和時間戳你查詢的範圍。您可能還需要嚐試不同的用桶裝策略為你的用例找到最佳的性能。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map