你好,
我用joblib多處理在我們的一個過程。日誌工作好(除了怪異py4j錯誤我抑製)除了在多處理的時候。還如何抑製其他錯誤,我總是收到DB——也許有一些指導嗎?謝謝
2023-04-17 11:12:32[信息]——開始多處理2023-04-17 11:12:33[信息]——異常而發送命令。回溯(最近調用最後):文件“/磚/火花/ python / lib / py4j-0.10.9.5-src.zip / py4j / clientserver。py”, 503行,在send_command self.socket.sendall (command.encode (“utf - 8”)) ConnectionResetError: [Errno 104]連接重置同行在處理上述例外,另一個例外發生:回溯(最近的電話最後):文件”/磚/火花/ python / lib / py4j-0.10.9.5-src.zip / py4j / java_gateway。py”, 1038行,send_command反應= connection.send_command(命令)文件“/磚/火花/ python / lib / py4j-0.10.9.5-src.zip / py4j / clientserver。py”, 506行,send_command提高Py4JNetworkError (py4j.protocol。Py4JNetworkError:當發送廣場時發生錯誤:[1、4、9、16、25)
導入日誌從joblib平行進口,進口操作係統從進口睡眠時間延遲def setup_logging (): logging.basicConfig(=日誌級別。信息,格式= " % (asctime) s [% (levelname) s]——%(消息)年代”,datefmt =“Y % - % - % d % H: % m: % s”) logging.getLogger () .setLevel (logging.INFO) pyspark_log = logging.getLogger (pyspark) pyspark_log.setLevel (logging.WARNING) logging.getLogger (py4j) .setLevel (logging.WARNING) def calculate_square(數量):睡眠(1)#模擬一項非常耗時的任務結果=數量* * 2 logging.INFO (f”廣場{號碼}{結果}”)返回結果setup_logging () logging.INFO (f“開始多處理”)#數字計算廣場數字列表=[1、2、3、4、5)#使用joblib多處理n_jobs = os.cpu_count =()結果平行(n_jobs = n_jobs)(延遲(calculate_square) (num)在數字num)打印(f“廣場:{結果}”)
@Sam G:
似乎相關的問題是py4j圖書館使用的火花,而不是相關joblib或多處理。錯誤消息顯示網絡錯誤,而Python之間發送一個命令過程和運行的Java虛擬機(JVM)火花。
抑製錯誤消息,您可以將以下代碼行添加到您的setup_logging功能:
logging.getLogger (py4j) .setLevel (logging.ERROR) logging.getLogger (py4j.java_gateway) .setLevel (logging.ERROR)
這將設置日誌級別的py4j py4j。java_gateway模塊錯誤,這將會抑製他們的日誌消息。
在多處理有關問題日誌,你可以嚐試使用QueueHandler和QueueListener從子進程發送日誌消息回到父進程,可以正常登錄。這裏有一個例子:
導入日誌從joblib平行進口,進口操作係統在多處理進口隊列延遲current_process從日誌記錄。處理程序導入QueueHandler, QueueListener進口睡眠def setup_logging (): logging.basicConfig(=日誌級別。信息,格式= " % (asctime) s [% (levelname) s]——%(消息)年代”,datefmt =“Y % - % - % d % H: % m: % s”) logging.getLogger () .setLevel (logging.INFO) pyspark_log = logging.getLogger (pyspark) pyspark_log.setLevel (logging.WARNING) logging.getLogger (py4j) .setLevel (logging.ERROR) logging.getLogger (py4j.java_gateway) .setLevel (logging.ERROR) #創建一個隊列處理程序和偵聽器登錄子進程隊列(1)queue_handler log_queue = = QueueHandler (log_queue) queue_listener = QueueListener (log_queue, logging.getLogger()) #啟動隊列監聽器在一個單獨的線程queue_listener.start () def calculate_square(數字,log_queue):睡眠(1)#模擬一項非常耗時的任務結果=數量* * 2 logging.INFO (f”[{current_process () . name}]廣場{號碼}{結果}”)log_queue.put (f”[{current_process () . name}]廣場{號碼}{結果}”)返回結果setup_logging () logging.INFO (f“開始多處理”)#數字計算廣場數字列表=[1、2、3、4、5)#使用joblib多處理n_jobs = os.cpu_count() #為登錄子進程創建一個隊列log_queue =隊列(1)# log_queue創建延遲函數作為參數def delayed_func (num):返回延遲(calculate_square) (num log_queue)結果=平行(n_jobs = n_jobs) (delayed_func (num)在數字num) #停止隊列監聽器一旦所有子進程已經完成了日誌queue_listener.stop()打印(f“廣場:{結果}”)
這段代碼創建了一個隊列處理程序和偵聽器登錄子進程,並通過隊列calculate_square函數作為參數。日誌功能平方計算當地的記錄器和隊列,然後處理的偵聽器中運行的主要過程。
注意,隊列偵聽器應該停止一旦所有子進程已經完成了日誌記錄,這是在示例代碼使用queue_listener.stop()方法。