砸我們的頭對這個一段時間,但我認為這更多的火花問題比磚,希望得到你的想法。基本要點是:
不知怎麼的,盡管我們隻是從DF2選擇一列在第二種情況下,它會導致什麼似乎是一個dropDuplicates函數的重新評估,重新評估以不同的順序,選擇後我們看到其他行比我們看到之前的選擇。可重複的結果對我們是很重要的在這種情況下,價值的otherColumn用作標識符刪除重複的家屬從另一個表的基礎上重複父母的。
一些其他事情似乎影響結果
改變這些條件似乎影響記錄我們看到在選擇但還沒有看到任何足以提供一個一致的解釋。
我可以假設,dropDuplicates是一個懶惰的函數,最終可能運行多個時候使用DF整個管道,dropDuplicates的行為在很大程度上是未定義的,至於它保持什麼。運行窗口函數分配一個行號在原始查詢和過濾,而不是使用dropDuplicates似乎提供一致的行為,但很難接受dropDuplicates很大程度上是無法在這種情況下,如果沒有一定的確認何時和為什麼。
我對你的問題是:
這是因為懶惰評估。
在分配DF2 DF.dropDuplicates ([' someColumn ']),沒有行動是在後台執行。事實上,沒有數據加載在DF2除非執行一個動作。
接下來,執行行動顯示()或()顯示,在DF dropDuplicate轉換執行,然後存儲在DF2。這是懶惰的評價是如何工作的。
您調用的次數行動,在後台新鮮轉換將被執行。因為有兩個不同的值someColumn值“1”,它會隨機挑選otherColumn值。
懶惰evalutaion方法有助於火花在過濾或執行某個操作時跳過不必要的轉換。
顯示器(DF2.select (col (otherColumn)))顯示(DF2)
如何避免呢?
好吧,如果你想為otherColumn總是有一個獨特的價值,那麼你應該包括另一個相應的列來過濾數據。一個很常見的解決方案是使用時間戳,比如插入或更新時間戳與記錄相關聯。
然而,如果你沒有任何這樣的列和你要確保每次使用相同的值從DF2整個代碼,然後使用持續()操作。它將持續dataframe穿越的內容操作後第一次計算。
從pyspark。storagelevel進口storagelevel DF2 = DF1.drop_duplicates ([' someCol ']) .persist (StorageLevel.MEMORY_AND_DISK)
偉大的回答@Aman Sehgal。我還收到了另一個答案來自@Ryan Chynoweth我將粘貼在這裏:
1)你以前見過這樣的,如果是這樣的話,你能提供任何有關嗎?
是的這發生由於延遲執行的火花,由於數據集分布的。專門和dropDuplicates基本上保持這行返回第一,可以改變如果行不同節點和/或超過1分區。有一個操作發生的複製是不保證使用此函數。
2)這是一個單純的火花或底層框架的行為,或者可能這與磚運行時我們在做什麼?
這種行為不是特定於火花(一般或差),但更相關的方式dropDuplicates被創建。同樣,df.first()函數簡單地接受過行返回第一和在分布式數據。