你好,
我隻是開始使用火花,我有一個中等大小的DataFrame S3創建從整理csv (88列,860 k行),似乎是在一個合理的時間(使用SaveMode.Append)插入Postgres。在一個天真的實現中,插入這DataFrame呈現的順序5個小時完成以下環境:
代碼:
進口org.myorg.model.MyType
進口org.apache.spark.sql。DataFrame進口org.apache.spark.sql.functions._
進口scala.util。{成功,嚐試}
val s3Path = s”s3a: / /道路/ / / * . csv文件”
/ /讀取組華禾投資統一DataFrame val csvFrame: DataFrame = sqlContext.read.format .option (“com.databricks.spark.csv”)(“頭”,“真正的”).load (s3Path)
/ /流程/ DataFrame清理數據抽樣(MyType)與一些行處理邏輯val processFn:行= >嚐試(MyType) = MyType.fromRow _ val tempRdd:抽樣(MyType) = csvFrame.map (processFn) .collect{成功案例(行)= >行}
/ /拚接幀與填料的元數據列val df = sqlContext.createDataFrame (tempRdd) .withColumn(“希”,點燃(“my_hash”)) .withColumn (“created_at”, current_timestamp)
val jdbcUrl = s " jdbc: postgresql: / / my-rds-host.rds.amazonaws.com: 5432 / my_db ?用戶= my_user&password = my_password&stringtype =未指明的“
val connectionProperties = new java.util.Properties ()
df。寫.mode (SaveMode.Append) .jdbc (url = jdbcUrl表= " my_schema。my_table”, connectionProperties = connectionProperties)
測試這個插入對有限樣本大小DataFrame(使用.limit (n)combinator)產生這些不科學的數量/不徹底,更大的樣本量似乎截然不同(但長)執行時間:
觀察的RDS實例監測顯示幾乎平均CPU利用率3%以上,平均13編寫運維/秒。
相反,寫作整個DataFrame回到CSV DBFS和spark-csv或引發表saveAsTable產生更容易:~ 40秒。
我還沒有嚐試優化目標RDS安裝(新鮮供應),但似乎這種情況下應該主要是盒子。我的其他替代方法可能是輸出到CSV和使用Postgres的副本,但會很好的為這個“工作”。
道歉如果這是一個信息分享太多了——我很困惑為什麼這可能是行為很奇怪的實現大多改編自文檔。我丟失的東西明顯之類可怕的隱性喜歡網絡約束嗎?
以防有人好奇我是如何在這工作,我最終下降到Postgres JDBC和使用直接從火花CopyManager複製行:
https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84
嗯,不知道為什麼片段可能並不適用於您的設置。有沒有可能你正在運行一個PostgreSQL版本不兼容的複製命令片段?是我跑PostgreSQL 9.5。在RDS x。
平,我認為有一個隱式轉換(https://github.com/scala/scala/blob/v2.10.5/src/library/scala/collection/TraversableOnce.scala L389,允許我把迭代器(數組(字節))迭代器(字節)。IDE可以抱怨,或者你可能有殘疾但工作在一個筆記本上引發1.6.1。