嗨,夥計們,我要拚花(GCS位置)數據加載到Postgres數據庫(穀歌雲)。我們使用批量上傳數據到PG (spark-postgres庫)
前必要的——使用圖書館,我上傳jar集群
1 -https://mvnrepository.com/artifact/io.frama.parisni/spark-postgres/0.0.1
2 -https://mvnrepository.com/artifact/org.postgresql/postgresql
% scala
val數據= spark.read.format (“postgres”)
.option (“url”、“postgres jdbc:: / / IP端口/數據庫?用戶= USERNAME¤tSchema = SCHEMANAME”)
.option(“密碼”,“密碼”)
.option(“查詢”、“select * from表”)
.option(“分區”,4)
.option (“numSplits”, 5)
.option(“多行”,真的)
.load
但這是給錯誤- - - - - -
ClassNotFoundException:沒有找到數據來源:postgres。請找到包http://spark.apache.org/third-party-projects.html引起的:ClassNotFoundException: postgres.DefaultSource
P。S -因為我的數據是巨大的,我需要使用批量加載操作以來通過jdbc連接並通過光標插入批不是最佳的方法。
讓我知道如果有任何其他方法用於批量插入。
數據大小~ 40 GB的拚花
問候,
嗨@Kaniz Fatma, @Daniel薩哈-
從我身邊一些更新。
很多支安打,試驗後,psycopg2在我的例子中。
我們可以處理200 + GB數據與10節點集群(n2-highmem-4 32 GB內存4芯)和司機32 GB內存,與Runtime10.4.x-scala2.12 4核
花了接近100分鍾將整個數據加載到PG。
雖然我們需要改變鋪到csv和csv加載順序讀入PG
就像下麵的代碼片段
進口psycopg2
#創建一個連接到PostgreSQL數據庫
反對= psycopg2.connect(數據庫= " dbname”,用戶=“user-nm”,密碼=“密碼”,主機=“知識產權”,端口= " 5432 ")
#創建一個遊標對象
壞蛋= con.cursor ()
#打開CSV文件
files_to_load =dbutils.fs.ls(“dbfs: / dir /”)
而files_to_load:
file_path = files_to_load.pop .path (0)
如果file_path.endswith (. csv):
file_path = file_path.replace (“dbfs:”、“/ dbfs”)
打印(file_path)
張開f (file_path,“r”):
#使用copy_from函數來將數據裝載到PostgreSQL表
cur.copy_from (f,“pg_table_name”, 9 = "、")
#提交更改
con.commit ()
#關閉遊標和連接
con.close ()
如果你可以幫助一些問題
1 -有什麼方法我們可以調整代碼讀取鋪數據而不是. csv數據上麵的代碼(我們有數據最初拚花格式(磚三角洲的表),如果我們可以使用相同的原生格式,我們可以減少額外的處理)
2 -有什麼方法我們可以平行運行順序相反。
3 -其他技巧可以提高性能並采取更少的時間。
問候,