你好,
我一個批處理過程中配置工作流失敗由於jdbc Postgres數據庫超時。
我檢查了JDBC連接配置和似乎工作當我查詢一個表和做df.show()在這個過程中,它顯示所獲取的數據。看來這個問題並非來自那裏。
我試著幾個配置集群級別,但還是同樣的問題。
conf我嚐試:
火花。掌握當地spark.databricks.cluster (* 4)。概要singleNode spark.executor。4000年代3600年代heartbeatInterval spark.network.timeout
知道,在同一過程中,還有另一個連接mysql數據庫上似乎沒有明顯的工作問題。
DB是GCP雲托管在一個sql和我們的磚平台是豐富的。Beplay体育安卓版本
告訴我如果你有暗示配置水平數據磚,此刻也知道這個流程運行在另一個地方pyspark VM。
Herafter加我:
Py4JJavaError:調用o1829.checkpoint時發生一個錯誤。:org.apache.spark。SparkException:工作階段失敗而終止:任務33階段58.0失敗了4次,最近的失敗:在舞台上失去了任務33.3 58.0 (TID 673)(司機- 656749566 d - lxcst執行人司機):org.postgresql.util。PSQLException:連接請求失敗。org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl (ConnectionFactoryImpl.java: 331)在org.postgresql.core.ConnectionFactory.openConnection org.postgresql.jdbc.PgConnection (ConnectionFactory.java: 49)。< init > (PgConnection.java: 223) org.postgresql.Driver.makeConnection (Driver.java: 400) org.postgresql.Driver.connect (Driver.java: 259) org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection (BasicConnectionProvider.scala: 49) org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create (ConnectionProvider.scala: 102) org.apache.spark.sql.jdbc.JdbcDialect。anonfun createConnectionFactory美元1美元(JdbcDialects.scala: 123) org.apache.spark.sql.jdbc.JdbcDialect。anonfun createConnectionFactory美元$ 1 $改編(JdbcDialects.scala: 119) org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute (JDBCRDD.scala: 277) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 372) org.apache.spark.rdd.RDD.iterator (RDD.scala: 336) org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 60) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 372) org.apache.spark.rdd.RDD.iterator (RDD.scala: 336) o
嗨@Rama克裏希納N,
我使用一個類的方法get_table_from_db遵循:
類ExtractData: def __init__(自我,db:枚舉,* * kwargs):自我。db_name = db.DB_NAME。自我價值。db =自我。kwargs = kwargs自我。火花= init_spark_session (kwargs (“app_name”), * * kwargs)自我。loaded_tables = {} def get_table_from_db(自我,table_name):火花= init_spark_session(自我。kwargs (“app_name”), * * self.kwargs)返回spark.read.format (jdbc) .options (* * self.kwargs [f " {self.db_name} _options "]) \ .option(“數據表”,table_name) .load ()
kwargs包含連接憑證
init_spark_session是效仿
def init_spark_session (spark_app_name, * * kwargs):““SQL火花實例化:param spark_app_name:: param kwargs::返回:“”“called_method = SparkSession。配置= SparkConf builder()為關鍵,kwargs價值(“spark_config”) . items (): conf.set(關鍵。取代('“',”),值)返回called_method.appName (spark_app_name) . config(參看= conf) .enableHiveSupport () .getOrCreate ()
連接參數:
mysql_options:{主機= $ = $ {VSA_DATABASE} {VSA_HOST}數據庫url = " jdbc: mysql: / / $ {VSA_options.host} " / " $ {VSA_options.database}”?zeroDateTimeBehavior = convertToNull com.mysql.jdbc“司機=”。司機“用戶密碼= $ {VSA_USER} = $ {VSA_PASSWORD}} post_options:{主機= ${主機}端口= $ = ${數據庫}{港口}數據庫url = " jdbc: postgresql: / / $ {CDB_options.host}”:“$ {CDB_options.port} " / " $ {CDB_options。數據庫}司機= " org.postgresql。司機“用戶密碼= $ {user} = ${密碼}}
嗨@Fred Foucart,
上麵的代碼看起來不錯。你可以嚐試使用以下代碼。
spark.read \
.format (jdbc) \
.option (" url " f " jdbc: postgresql: / /{主機}/{數據庫}”)\
.option(“司機”,“org.postgresql.Driver”) \
.option \(“用戶”,用戶名)
.option \(“密碼”,密碼)
.option(“數據表”,<表>)\
.option \ (“fetchsize”, 5000)
.load ()
在桌子上是巨大的情況下,你可以試著與並行讀取。
#參數
table_name = " <表名稱> "
partitionColumn = " <主鍵數值列> "
下界= 1
upperBound = 10000 <行總數> #我們的表包含十億行! ! !
fetchsize = 1000
num_partitions = 20 #做一些數學多少片應該分區的數據
和閱讀(總記錄/分區)即10000/20 = 500行每個線程
#讀
source_df = spark.read \
.format (jdbc) \
.option (" url " f " jdbc: postgresql: / /{主機}/{數據庫}”)\
.option(“司機”,“org.postgresql.Driver”) \
.option \(“用戶”,用戶名)
.option \(“密碼”,密碼)
.option(“數據表”,source_table) \
.option (“partitionColumn”, partitionColumn) \
.option(“下界”,下界)\
.option (“upperBound”, upperBound) \
.option \ (“numPartitions”,分區)
.option (“fetchsize fetchsize) \
.load ()