取消
顯示的結果
而不是尋找
你的意思是:

順序dataframe不是願望後調用緩存()和限製()

jerry-xu-sa”id=
新的貢獻者二世

這是簡單的步驟來複製它。注意,坳“foo”和“酒吧”隻是多餘的關口,確保dataframe不適合單個分區。

/ /生成一個隨機的df val蘭德= new scala.util。隨機val df = (1 - 3000)。地圖(i = >(蘭德。nextInt、“foo”* 50000 * 50000) .toSeq“酒吧”。toDF (“col1”、“foo”、“酒吧”).orderBy (desc (“col1”)) .cache() / /這是正確的結果df.orderBy (desc (“col1”)) .limit(5),告訴()/ *輸出的基準:+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - + | col1 | foo酒吧| | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - + | 2146781842 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2146642633 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2145715082 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2136356447 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2133539394 | foofoofoofoofoofo…| barbarbarbarbarba……|+----------+--------------------+--------------------+ */ // however it seems not true when I call limit().rdd.collect on the cached dataframe without order by again, show() and take() returns the correct results however rdd.collect doesn't df.limit(5).select("col1").show() /* this is correct +----------+ | col1| +----------+ |2146781842| |2146642633| |2145715082| |2136356447| |2133539394| +----------+ */ df.select("col1").take(5) /*this is also correct Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2136356447], [2133539394]) */ df.limit(5).select("col1").rdd.collect /* this is incorrect Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2133000691], [2130499969]) */

預計,調用緩存()將命令行嗎?也導致區別是什麼限製(5).rdd。收集與(5)和限製(5),告訴()?根據火花sql文檔應該是確定的。我失蹤嗎?

“限製

條款是用來限製返回的行數選擇聲明。總的來說,這一條款結合使用命令為了確保結果是決定性的。”

/ /附加是我的集群設置/ /運行時:11.3 LTS (scala 2.12,火花3.3.0)/ / 2 r5。超大+ 1 r5.2xlarge spark.sql。autoBroadcastJoinThreshold 1 spark.driver。extraJavaOptions -Xss16m spark.dynamicAllocation。真正的spark.shuffle.service啟用。真spark.sql.parquet.fs.optimized.committer.optimization-enabled真spark.sql.files啟用。ignoreCorruptFiles真正spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl spark.hadoop.mapreduce.use。spark.driver parallelmergepaths如此。maxResultSize spark.hadoop.fs.s3a.canned 64克。acl BucketOwnerFullControl spark.sql.shuffle。分區1200 spark.network.timeout 180 spark.sql。broadcastTimeout 30000 spark.hadoop.mapreduce.fileoutputcommitter.algorithm。版本2 spark.executor。extraJavaOptions -Xss16m spark.dynamicAllocation。executorIdleTimeout 1 s spark.default.parallelism 1200 spark.port。maxRetries 70 spark.dynamicAllocation。schedulerBacklogTimeout 1

2回答2

匿名
不適用

@Jerry徐:

行為你看到預計,隨著緩存()操作並不能保證訂單的行緩存DataFrame。當你叫限製緩存(5)DataFrame沒有顯式orderBy(),火花執行引擎將選擇任何可用5行緩存,這未必是前5行原始訂單。當你調用顯示()或()後orderBy極限()和(),火花執行引擎將執行一個新的查詢和生成一個新的執行計劃,其中包括orderBy()條款,將執行正確的命令行。當你叫rdd.collect()沒有顯式的緩存DataFrame orderBy(),火花直接執行引擎將使用緩存數據,這可能不是正確命令。

val df = (1 - 3000)。地圖(i = >(蘭德。nextInt、“foo”* 50000 * 50000) .toSeq“酒吧”。toDF (“col1”、“foo”、“酒吧”).orderBy (desc (“col1”)) .cache ()

這將確保以正確的順序行緩存。希望這可以幫助!

Vidula_Khanna”id=
主持人”id=
主持人

嗨@Jerry徐

謝謝你的問題!幫助你更好的,請花一些時間來檢查答案,讓我知道它是否最適合您的需要。

請幫助我們選擇最好的解決方案通過點擊“選擇最佳”如果它。

您的反饋將幫助我們確保我們提供最好的服務給你。謝謝你!

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map