這是簡單的步驟來複製它。注意,坳“foo”和“酒吧”隻是多餘的關口,確保dataframe不適合單個分區。
/ /生成一個隨機的df val蘭德= new scala.util。隨機val df = (1 - 3000)。地圖(i = >(蘭德。nextInt、“foo”* 50000 * 50000) .toSeq“酒吧”。toDF (“col1”、“foo”、“酒吧”).orderBy (desc (“col1”)) .cache() / /這是正確的結果df.orderBy (desc (“col1”)) .limit(5),告訴()/ *輸出的基準:+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - + | col1 | foo酒吧| | + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - + | 2146781842 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2146642633 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2145715082 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2136356447 | foofoofoofoofoofo…| barbarbarbarbarba……| | 2133539394 | foofoofoofoofoofo…| barbarbarbarbarba……|+----------+--------------------+--------------------+ */ // however it seems not true when I call limit().rdd.collect on the cached dataframe without order by again, show() and take() returns the correct results however rdd.collect doesn't df.limit(5).select("col1").show() /* this is correct +----------+ | col1| +----------+ |2146781842| |2146642633| |2145715082| |2136356447| |2133539394| +----------+ */ df.select("col1").take(5) /*this is also correct Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2136356447], [2133539394]) */ df.limit(5).select("col1").rdd.collect /* this is incorrect Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2133000691], [2130499969]) */
預計,調用緩存()將命令行嗎?也導致區別是什麼限製(5).rdd。收集與(5)和限製(5),告訴()?根據火花sql文檔應該是確定的。我失蹤嗎?
“限製
條款是用來限製返回的行數選擇聲明。總的來說,這一條款結合使用命令為了確保結果是決定性的。”
/ /附加是我的集群設置/ /運行時:11.3 LTS (scala 2.12,火花3.3.0)/ / 2 r5。超大+ 1 r5.2xlarge spark.sql。autoBroadcastJoinThreshold 1 spark.driver。extraJavaOptions -Xss16m spark.dynamicAllocation。真正的spark.shuffle.service啟用。真spark.sql.parquet.fs.optimized.committer.optimization-enabled真spark.sql.files啟用。ignoreCorruptFiles真正spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl spark.hadoop.mapreduce.use。spark.driver parallelmergepaths如此。maxResultSize spark.hadoop.fs.s3a.canned 64克。acl BucketOwnerFullControl spark.sql.shuffle。分區1200 spark.network.timeout 180 spark.sql。broadcastTimeout 30000 spark.hadoop.mapreduce.fileoutputcommitter.algorithm。版本2 spark.executor。extraJavaOptions -Xss16m spark.dynamicAllocation。executorIdleTimeout 1 s spark.default.parallelism 1200 spark.port。maxRetries 70 spark.dynamicAllocation。schedulerBacklogTimeout 1
@Jerry徐:
行為你看到預計,隨著緩存()操作並不能保證訂單的行緩存DataFrame。當你叫限製緩存(5)DataFrame沒有顯式orderBy(),火花執行引擎將選擇任何可用5行緩存,這未必是前5行原始訂單。當你調用顯示()或()後orderBy極限()和(),火花執行引擎將執行一個新的查詢和生成一個新的執行計劃,其中包括orderBy()條款,將執行正確的命令行。當你叫rdd.collect()沒有顯式的緩存DataFrame orderBy(),火花直接執行引擎將使用緩存數據,這可能不是正確命令。
val df = (1 - 3000)。地圖(i = >(蘭德。nextInt、“foo”* 50000 * 50000) .toSeq“酒吧”。toDF (“col1”、“foo”、“酒吧”).orderBy (desc (“col1”)) .cache ()
這將確保以正確的順序行緩存。希望這可以幫助!