跳轉到主要內容
公司博客上

優化用戶定義函數與Apache火花™和R在現實世界中:與明尼蘇達雙城伸縮節場景分析第2部分

通過<一個data-external-link="true" href="//m.eheci.com/www/blog/author/rafi-kurlansik" aria-label="Rafi Kurlansik">Rafi Kurlansik,<一個data-external-link="true" href="//m.eheci.com/www/blog/author/tushar-madan" aria-label="Tushar Madan">印度央行馬丹和<一個data-external-link="true" href="//m.eheci.com/www/blog/author/hector-leano" aria-label="Hector Leano">赫克托耳Leano

2020年7月21日<一個data-external-link="true" href="//m.eheci.com/www/blog/category/company/customers">beplay体育app下载地址

分享這篇文章

介紹

在第1部分中我們談論棒球如何操作明尼蘇達雙子隊想要運行20 k模擬1500萬曆史球- 3000億模擬更準確地評估球員的性能。想法很簡單:如果1500萬年曆史音高勾勒出球員的形象表現,從每個球員分布3000億個模擬場地會銳化圖像和提供更可靠的估值。這些數據會影響訓練和人事決策的目的,產生更多的獲勝,進而為俱樂部的收入。

所有的腳本和機器學習模型來生成和得分數據寫在R .即使使用多線程運行這些腳本包在R估計需要3.8年處理所有的模擬。用戶定義函數(udf)在Apache火花和磚我們能夠減少數據處理時間2 - 3天3000億模擬曆史數據集,近乎實時的遊戲數據。通過啟用實時遊戲球得分,這對雙胞胎正在尋求最終優化基於遊戲陣容和戰略決策條件,例如,選擇最好的投手,鑒於麵糊,天氣,局,和速度+旋轉讀數從投手的最後一投。

結合R包的巨大生態係統的可伸縮性和並行性火花,udf威力不僅在體育,跨行業的用例。除了我們的模型推理的用例雙胞胎,考慮下麵的應用程序:

  • 生成銷售預測為成千上萬的消費者產品使用時間序列包<一個target="_blank" href="https://facebook.github.io/prophet/" rel="noopener noreferrer">先知
  • 模擬數以百計的金融投資組合的表現
  • 模擬運輸安排車隊的車輛
  • 找到最好的模型適合通過搜索成千上萬的hyperparameters並行執行

令人興奮和誘人的這些應用程序,他們的權力是有代價的。問人都試過,他們會告訴你,實現一個UDF可以優雅地規模也非常具有挑戰性。這是由於需要有效地管理核心和記憶的集群,以及它們之間的張力。關鍵是結構的工作,火花可以線性擴展。

在這篇文章中我們踏上一段旅程如何工藝udf。成功取決於理解存儲、火花、R和它們之間的交互。

理解udf火花和R

一般來說,當你使用SparkR或sparklyr R Scala代碼翻譯成,引發的母語。在這些情況下,R過程是有限的火花的司機節點集群,而其他的集群在Scala中完成任務。用戶定義的函數,但是,每個工人提供一個R的過程,讓你一個R函數應用到每個集團分區並行的火花DataFrame之前返回結果。

火花是如何安排呢?你可以看到下麵的控製流圖中很明顯。

火花udf,每組可以應用R函數或分區並行的火花DataFrame然後返回結果。

每個任務的火花將創建一個臨時R會話在每個工人,序列化R關閉,然後跨集群分發UDF。當R會話在每個職工活動,R的全功率生態係統內可以利用UDF。當R代碼執行完會話終止,和結果發送回火花上下文。你可以了解更多關於這個通過觀察<一個target="_blank" href="https://www.youtube.com/watch?v=VvhcFlfiQ08" rel="noopener noreferrer">在這裏而在<一個target="_blank" href="//m.eheci.com/www/blog/2020/06/01/vectorized-r-i-o-in-upcoming-apache-spark-3-0.html" rel="noopener noreferrer">這博客文章。

正確udf

“…你會如果你避免做最快的工作放在第一位。”[1]

現在,我們理解了udf是如何執行的基本知識,讓我們仔細看看係統中潛在的瓶頸,如何消除它們。基本上有四個關鍵領域來理解在編寫這些功能:

  1. 數據源
  2. 數據傳輸的火花
  3. 火花和R之間數據傳輸
  4. R過程

1。數據來源:最小化存儲I / O

第一步是計劃在存儲數據是如何組織的。許多R用戶可用於使用平麵文件,但一個關鍵原則是隻有攝取必要的UDF是什麼正常執行。很大一部分工作將I / O和存儲,如果您的數據是目前在一個非文件格式(如CSV)火花可能需要閱讀整個數據集到內存中。這可能是緩慢和低效的,尤其是如果你不需要所有的文件的內容。

為此我們建議以一個可伸縮的格式保存數據<一個target="_blank" href="//m.eheci.com/docs/delta/index.html" rel="noopener noreferrer">三角洲湖。三角洲加速攝入到火花分區中的數據存儲、優化這些分區的大小,並創建一個二級指標<一個target="_blank" href="//m.eheci.com/docs/delta/optimizations/file-mgmt.html" rel="noopener noreferrer">z值。綜上所述,這些特性有助於限製需要訪問的數據量在一個UDF。

所以如何?想象我們分區棒球球場在存儲的數據類型和直接引發閱讀行瀝青類型=“弧線球”。使用三角洲我們可以跳過攝入所有行與其他類型。這減少掃描的數據可以加快閱讀非常——如果隻有10%的你的數據包含曲球球,你可以有效地跳過閱讀你90%的數據到內存中!

通過使用一個存儲層三角洲湖和一個分區策略,對應於UDF將被處理的數據,你會奠定了堅實的基礎,消除一個潛在的瓶頸。

2 a。數據傳輸在火花:優化分區大小在內存中

分區大小的內存可能會影響性能的特征工程/ ETL管道主要包括UDF本身。一般來說,當火花必須執行<一個target="_blank" href="//m.eheci.com/www/glossary/what-are-transformations" rel="noopener noreferrer">廣泛的轉換就像一個加入集團跨集群,數據必須重組。洗牌分區的數量的默認設置是任意設置為200,意義洗牌操作時火花DataFrame中的數據是分布在200個分區。

這可以創建低效率取決於您的數據的大小。如果您的數據集是小,200個分區可能over-parallelizing工作,造成不必要的調度開銷用很少的數據和任務。如果數據集很大你不可能under-parallelizing和有效地使用資源集群。

一般的經驗法則,保持調整分區的大小在128 - 200 mb之間將最大化並行性,同時避免溢出到磁盤的數據。確定有多少洗牌分區應該是,最長使用的火花UI工作洗牌讀大小排序。大洗牌階段讀的大小除以128 mb到達最優分區數量為你工作。然後你可以設置spark.sql.shuffle.partitions在SparkR配置是這樣的:

sparkR。會話(sparkConfig = (spark.sql.shuffle列表。分區= " 400 "))

積極進行重新分區的火花DataFrame也是影響該設置,因為它需要一個洗牌。我們會看到,這種行為可用於管理內存壓力係統的其他部分之間的垃圾收集和數據傳輸等引發和R。

2 b。數據傳輸在火花:垃圾收集和集群規模

當你有一個大數據問題可以容易采用蠻力方法,達到最大的工作類型,但解決方案可能不是那麼簡單。垃圾收集在Java虛擬機(JVM)<一個target="_blank" href="https://www.elastic.co/blog/a-heap-of-trouble" rel="noopener noreferrer">失去控製的當有大型對象不再使用的內存中。使用非常大的工人會加劇這個問題因為有更多的空間來創建大型對象放在第一位。管理對象在內存的大小從而解決方案體係結構的一個重要的考慮因素。

適合這份工作我們發現一些大型的工人或許多小工人沒有執行以及許多中型工人。大量工人會產生過多的垃圾收集導致工作掛下去,而小工人隻會耗盡內存。

為了解決這個問題我們逐漸增加工人的大小,直到我們最終在中間範圍的內存和CPU<一個target="_blank" href="https://www.elastic.co/blog/a-heap-of-trouble" rel="noopener noreferrer">JVM的垃圾收集可以優雅地處理。我們也重新分區輸入火花DataFrame UDF和增加了分區。措施都有效地管理對象的大小在JVM和幫助保持垃圾收集不到10%的總工作時間為每個火花執行人。如果我們想進更多的記錄,我們可以簡單地添加更多的中型工人集群,提高輸入的分區DataFrame以線性方式。

3所示。火花和R之間數據傳輸

下一步要考慮的是如何引發和R之間傳遞數據。我們確定了兩個潛在的瓶頸——總體I / O和相應的(反)序列化進程之間發生的。

首先,隻輸入必需的UDF正常執行。類似於我們如何優化I / O讀取從存儲、過濾輸入火花DataFrame隻包含那些列所必需的UDF。如果我們的火花DataFrame 30列,其中我們隻需要4個UDF,相應的數據子集,轉而使用它作為輸入。這將加快執行通過減少I / O和相關的(反)序列化。

如果你適當地輸入數據子集和仍有出現內存不足的問題,實現分區可以幫助控製多少火花之間傳輸數據和R .例如,應用一個UDF 200 gb的數據在100個分區將導致2 gb的數據發送到R在每個任務。如果我們分區的數量增加到200使用的重新分區()函數從SparkR,然後1 GB將被發送到R在每個任務。多個分區的權衡是更多的(反)序列化任務JVM和R之間,但更少的數據在每個任務和隨後的內存壓力。

你可能認為一個典型的14 gb RAM引發工人能夠處理一個2 gb的數據分區空閑空間,但實際上你需要至少30 gb RAM如果你想避免出現內存不足的錯誤!這是一個粗魯的覺醒對於許多開發人員試圖在火花開始使用udf和R,並可能導致成本飆升。為什麼工人們需要多少內存?

事實是,火花和R表示數據在內存中完全不同。從火花R轉移數據,必須創建一個副本,然後轉換成一個內存中的R可以使用的格式。回想一下,在上麵的UDF架構圖中,對象需要序列化和反序列化每次他們之間移動兩個上下文。這是緩慢而產生巨大的內存開銷,與工人們通常需要一個數量級比預期更大的內存。

我們可以減輕這個瓶頸代替兩個不同的內存格式與一個使用Apache箭頭。箭頭是為了快速有效地不同係統之間傳輸數據——比如火花和R -通過使用柱狀格式類似於拚花。這消除了時間在序列化/反序列化以及增加的內存開銷。看到都並不少見<一個target="_blank" href="https://arrow.apache.org/blog/2019/01/25/r-spark-improvements/" rel="noopener noreferrer">10 - 100 x當工作負載比較有和沒有箭頭。不用說,使用這些優化使用udf時是至關重要的。火花3.0將包括支持<一個target="_blank" href="https://spark.apache.org/docs/3.0.0-preview/sparkr.html" rel="noopener noreferrer">箭頭SparkR,你可以安裝箭頭為R磚遵循指令<一個target="_blank" href="https://github.com/marygracemoesta/R-User-Guide/blob/master/Developing_on_Databricks/Customizing.md" rel="noopener noreferrer">在這裏。同樣值得注意的是有一個類似的開箱即用的優化器可用<一個target="_blank" href="//m.eheci.com/www/blog/2018/08/15/100x-faster-bridge-between-spark-and-r-with-user-defined-functions-on-databricks.html" rel="noopener noreferrer">SparkR磚上運行時

4所示。R的R過程:管理特性

每種語言都有自己的怪癖,現在我們將注意力轉向R本身。認為你UDF的函數可以應用數百或數千倍的工人,所以是值得注意R是如何使用資源。

調優場上得分函數中我們確定了加載模型對象和命令觸發R<一個target="_blank" href="https://adv-r.hadley.nz/names-values.html" rel="noopener noreferrer">copy-on-modify工作行為作為潛在的瓶頸。這是怎麼回事?讓我們更仔細地檢查這兩個,開始加載模型對象。

如果你曾與R的時間足夠長,你可能知道很多R包<一個target="_blank" href="https://stackoverflow.com/questions/6543999/why-is-caret-train-taking-up-so-much-memory" rel="noopener noreferrer">包括訓練數據模型對象的一部分。這是好當數據很小,但當數據增長它可以成為一個重大的問題——一些球場的模型幾乎是2 gb大小的!在我們的案例中相關的開銷從內存加載和刪除這些模型在每個執行UDF有限規模3億行,離3000億年3個數量級。

此外,我們發現該模型被保存為培訓的一部分功能:

trainmodel無意中儲蓄這樣一個模型可以拖一個大量其他對象R環境,導致它的大小。作為結果在那裏幾個選項削減這些模型將加載它們UDF高效而離開自己的能力預測完好無損。根據你R包使用培訓,您可以使用名為<一個目標=“平等”href=“https://github.com/tidymodels/butcher”rel=“noopener noreferrer”>屠夫</一個>直接削減現有的模型屬性要求預測。你自己也可以這樣做- - - - - -我們嚐試- - - - - -但看到混合的結果發現自己挖在深層嵌套的R對象模型包源代碼在哪裏膨脹是來自。推薦!更好的使用一個包設計處理這個問題可能的。另一種方式減少模型的大小進行再培訓一個子集最初的特性。如果你這可能是值得的交易一點點精度巨大的規模。鑒於我們有分別保存模型對象培訓函數無論如何,這樣做是有意義的重新培訓。保存模型外一個函數再培訓一個子集特性減少了瀝青模型結果的大小2GB93年MB,94年%減少。一個詞序列化R磁盤:盡管saveRDS ()readRDS ()標準的R序列化對象存儲,我們發現速度增強快速保存()qload ()<一個目標=“平等”href=“https://github.com/traversc/qs”rel=“noopener noreferrer”>qs</一個>(快速序列化)包。此外,讀取當地的存儲一般的速度比雲存儲。利用讀取當地的存儲,添加下麵的邏輯你的UDF:<精準醫療>結果最後,消除複製- - - - - -- - - - - -修改行為,我們使用了<一個目標=“平等”href=“https://cran.r-project.org/web/packages/data.table/”rel=“noopener noreferrer”>data.table</一個>模型推理在UDF。循環分配的預測一個我們的輸出dataframe創建多個副本,data.frame模型對象本身,消耗額外的內存導致R過程窒息。值得慶幸的是data.tablewe were able修改的地方:<精準醫療>data_table [pitch_outcome:=預測(模型、data_table類型=“響應”)]

通過有效地模型對象加載到內存和消除copy-on-modify行為我們精簡R UDF的執行。

測試、調試和監控

了解如何開始測試、調試和監控UDF是了解係統的工作方式一樣重要。這裏有一些建議開始。

用於測試,最好開始通過簡單地計算行每組或分區的數據,確保每一行流經UDF。然後將輸入數據的一個子集,開始慢慢地引入額外的邏輯,結果你回來,直到你得到一個工作原型。最後,慢慢地擴大執行通過添加更多的行,直到你提交的所有人。

有些時候,你幾乎肯定會打擊錯誤和需要調試。如果錯誤是你會看到火花的控製台驅動程序的堆棧跟蹤。如果錯誤是R代碼在UDF,這將在驅動程序的輸出。你需要打開火花UI和檢查stderr從工人日誌看到R的堆棧跟蹤的過程。

一旦你有一個錯誤自由UDF,您可以監視執行使用<一個target="_blank" href="//m.eheci.com/docs/clusters/clusters-manage.html" rel="noopener noreferrer">Ganglia指標。Ganglia包含詳細信息,集群是如何被利用,可以提供有價值的線索,一個瓶頸可能說謊。對於場上得分管道,我們使用Ganglia幫助診斷多個問題——我們看到CPU空閑時間爆炸模型對象太大時,和消除交換內存利用率,當我們增加了工人的大小。

把它放在一起

在這一點上我們已經討論了I / O和資源管理當讀到火花,在工人之間的火花,R, R。讓我們學到了什麼和比較非架構是什麼樣子與一個優化:

沒有優化,典型的UDF架構火花和R是固有的低效率,顯著增加運行仿真的時間和成本。

的技術和三角洲湖一樣,Apache箭頭和磚運行時,您可以優化每個主要組件的UDF火花和R架構,從而增加任何模擬的性能和容量。

結論

在這篇文章的開始,我們說的關鍵製作一個UDF與火花和R是結構的工作,火花可以線性擴展。在本帖裏,我們了解了如何擴展模型得分UDF通過優化每個主要組件體係結構的三角洲湖——從存儲數據,數據傳輸的火花,R和火花之間的數據傳輸,最後R過程本身。盡管所有udf有所不同,R開發人員現在應該相信自己的能力在這個模式,避免重大障礙。

免費試著磚
看到所有beplay体育app下载地址 的帖子
Baidu
map