你好,
我一個批處理過程中配置工作流失敗由於jdbc Postgres數據庫超時。
我檢查了JDBC連接配置和似乎工作當我查詢一個表和做df.show()在這個過程中,它顯示所獲取的數據。看來這個問題並非來自那裏。
我試著幾個配置集群級別,但還是同樣的問題。
conf我嚐試:
火花。掌握當地spark.databricks.cluster (* 4)。概要singleNode spark.executor。4000年代3600年代heartbeatInterval spark.network.timeout
知道,在同一過程中,還有另一個連接mysql數據庫上似乎沒有明顯的工作問題。
DB是GCP雲托管在一個sql和我們的磚平台是豐富的。Beplay体育安卓版本
告訴我如果你有暗示配置水平數據磚,此刻也知道這個流程運行在另一個地方pyspark VM。
Herafter加我:
Py4JJavaError:調用o1829.checkpoint時發生一個錯誤。:org.apache.spark。SparkException:工作階段失敗而終止:任務33階段58.0失敗了4次,最近的失敗:在舞台上失去了任務33.3 58.0 (TID 673)(司機- 656749566 d - lxcst執行人司機):org.postgresql.util。PSQLException:連接請求失敗。org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl (ConnectionFactoryImpl.java: 331)在org.postgresql.core.ConnectionFactory.openConnection org.postgresql.jdbc.PgConnection (ConnectionFactory.java: 49)。< init > (PgConnection.java: 223) org.postgresql.Driver.makeConnection (Driver.java: 400) org.postgresql.Driver.connect (Driver.java: 259) org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection (BasicConnectionProvider.scala: 49) org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create (ConnectionProvider.scala: 102) org.apache.spark.sql.jdbc.JdbcDialect。anonfun createConnectionFactory美元1美元(JdbcDialects.scala: 123) org.apache.spark.sql.jdbc.JdbcDialect。anonfun createConnectionFactory美元$ 1 $改編(JdbcDialects.scala: 119) org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute (JDBCRDD.scala: 277) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 372) org.apache.spark.rdd.RDD.iterator (RDD.scala: 336) org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 60) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 372) org.apache.spark.rdd.RDD.iterator (RDD.scala: 336) o
你好,
我終於應用以下方法,添加一個持續從DB dataframe當抓取數據:
的數據量小,它的工作原理。
def get_table_from_db(自我,table_name) - > pyspark.sql。DataFrame: df = self.spark.read.format (jdbc)。選項(“badRecordsPath”、“/ tmp / badRecordsPath”)。選項(* * self.kwargs [f " {self.db_name} _options "]) \ .option(“數據表”,table_name) .load () .persist df (StorageLevel.DISK_ONLY)回報