嘿,社區成員
我新磚,並構建一個簡單的DLT pipleine加載數據從S3和運行一個隔離森林預測來檢測異常。該模型已經被存儲在注冊中心模型。管道的代碼:
@dlt.table
def trucklocation ():
回報(
spark.readStream
.format (“cloudFiles”)
.option (“cloudFiles。格式”、“json”)
.option (“cloudFiles。在ferColumnTypes", True)
.load (f“{來源}/ trucklocation”)
.select (
F.current_timestamp () .alias (“processing_time”),
“*”
)
)
loaded_model_udf = mlflow.pyfunc。spark_udf(火花,model_uri = model_uri)
@dlt.table
def velocity_predictions ():
回報(
dlt.read (“trucklocation”)
.withColumn(“預測”,loaded_model_udf (struct(*地圖(坳(“速度”)))))
)
管道與以下錯誤:錯誤
org.apache.spark。SparkException:工作階段失敗而終止:任務0階段804.0失敗了4次,最近的失敗:在舞台上失去了任務0.3 804.0 (TID 1285)(10.55.136.232執行人0):org.apache.spark.api.python。PythonException:“AttributeError:“IsolationForest對象沒有屬性n_features_”。完整回溯如下:
回溯(最近的電話):
文件“/ local_disk0 / .ephemeral_nfs / env / pythonenv - a35783aa d900 - 4 - f51 - 9233 f8eb37babc87 / lib / python3.9 /網站/ mlflow / pyfunc / __init__。py”, 1293行,udf
os.kill (scoring_server_proc。pid signal.SIGTERM)
文件“/ local_disk0 / .ephemeral_nfs / env / pythonenv - a35783aa d900 - 4 - f51 - 9233 f8eb37babc87 / lib / python3.9 /網站/ mlflow / pyfunc / __init__。在_predict_row_batch py”, 1080行
結果= predict_fn (pdf)
文件“/ local_disk0 / .ephemeral_nfs / env / pythonenv - a35783aa d900 - 4 - f51 - 9233 f8eb37babc87 / lib / python3.9 /網站/ mlflow / pyfunc / __init__。在batch_predict_fn py”, 1274行
返回loaded_model.predict (pdf)
文件“/ local_disk0 / .ephemeral_nfs / env / pythonenv - a35783aa d900 - 4 - f51 - 9233 f8eb37babc87 / lib / python3.9 /網站/ mlflow / pyfunc / __init__。py”, 427行,在預測
返回self._predict_fn(數據)
文件“/磚/ python / lib / python3.9 /網站/ sklearn /合奏/ _iforest。py”, 314行,在預測
is_inlier [self.decision_function (X) < 0) = 1
文件“/磚/ python / lib / python3.9 /網站/ sklearn /合奏/ _iforest。在decision_function py”, 347行
返回self.score_samples self.offset_ (X)
文件“/磚/ python / lib / python3.9 /網站/ sklearn /合奏/ _iforest。在score_samples py”, 379行
如果自我。n_features_ ! = X.shape [1]:
AttributeError:“IsolationForest”對象沒有屬性“n_features_”
我試著運行直接預測這種方式和它工作得很好:
loaded_model_udf = mlflow.pyfunc。spark_udf(火花,model_uri = model_uri)
df_location = spark.read.format (json)。選項(“inferSchema”,“真正的”).load (s3path)
df = df_location。withColumn(“預測”,loaded_model_udf (struct(*地圖(坳(“速度”)))))
任何幫助管道將失敗的原因是感激。