這是我的代碼寫數據從三角洲表事件中心(從消費者團隊將消耗數據):
進口org.apache.spark.eventhubs._進口org.apache.spark.sql.streaming。觸發._進口org.apache.spark.sql.types._進口org.apache.spark.sql.functions._進口java.util。屬性進口com.microsoft.azure.eventhubs。{EventData,PartitionSender}進口org.apache.spark.eventhubs。EventHubsConf進口io.delta.tables._進口org.apache.spark.sql.streaming。觸發進口. io .PrintWriter進口java.time。ZonedDateTime進口java.time.format。DateTimeFormatter進口scala.concurrent.duration._進口java.nio.file。{路徑,文件}/ /配置Azure事件中心的細節瓦爾namespaceNameOut =“des - ent prod cus -流- eventhub - 001”瓦爾eventHubNameOut =“hvc-prodstats-output”瓦爾sasKeyNameOut =“作家”瓦爾sasKeyOut = = dbutils.secrets.get(範圍“deskvscope”、關鍵=“des-ent-prod-cus-stream-e venthub - 001作家”)/ /配置檢查點和壞數據路徑瓦爾checkpoint_dir_path =“/ mnt / hvc-wenco / prodstats /流/檢查站”瓦爾baddata_path =“/ mnt / hvc-wenco / prodstats /流/ Bad_data”/ /定義時間戳檢查點的道路瓦爾tbl_version_timestamp_path =“/ mnt / hvc - wenco / equip_status_history_table /檢查點/檢查站”/ /配置其他參數瓦爾MaxEvents=5000年/ /讀最後一個檢查點的時間戳瓦爾last_checkpoint_string = dbutils.fs.head (tbl_version_timestamp_path)/ /解析最後的檢查點時間戳瓦爾time_format =“yyyy-MM-dd HH: mm: ss.SSSz”瓦爾格式化程序=DateTimeFormatter.ofPattern (time_format)瓦爾last_checkpoint =ZonedDateTime.parse (last_checkpoint_string格式化程序)/ /建立連接事件中心瓦爾connStrOut =新com.microsoft.azure.eventhubs。ConnectionStringBuilder().setNamespaceName (namespaceNameOut) .setEventHubName (eventHubNameOut) .setSasKeyName (sasKeyNameOut) .setSasKey (sasKeyOut)瓦爾ehWriteConf =EventHubsConf(connStrOut.toString ())/ /創建一個流dataframe從三角洲表瓦爾InputStreamingDF=火花.readStream .option (“maxFilesPerTrigger”,1).option (“startingTimestamp”last_checkpoint_string) .option (“readChangeFeed”,“真正的”).table (“wencohvc.equip_status_history_table”)瓦爾dropPreTransform =InputStreamingDF.filter (InputStreamingDF(“_change_type”)=。=“update_preimage”)瓦爾operationTransform = dropPreTransform.withColumn (“操作”,當($“_change_type”= = =“插入”,2當($).otherwise (“_change_type”= = =“update_postimage”,4)))瓦爾transformedDF = operationTransform.withColumn (“DeletedIndicator”,當($“_change_type”= = =“刪除”,“Y”).otherwise (“N”))瓦爾finalDF = transformedDF.drop (“_change_type”,“_commit_version”,“_commit_timestamp”)/ /寫入事件中心重試和檢查點var重試=真正的varretryCount =0瓦爾maxRetries =3而(重試& & retryCount < maxRetries) {試一試{瓦爾流= finalDF .select (to_json(結構體(* / / *列列表).alias (“身體”).writeStream .format (“eventhubs”).options ehWriteConf.toMap .option (“checkpointLocation”checkpoint_dir_path) .trigger (觸發。AvailableNow重試=).start () stream.awaitTermination ()假}抓{情況下艾凡:異常= > retryCount + =1如果(retryCount < maxRetries) {瓦爾延遲=2。秒* retryCount println (“流嚐試retryCount美元失敗,重試$ {delay.toSeconds}秒……”)線程.sleep (delay.toMillis)}}}/ /寫檢查點瓦爾emptyDF =Seq((1).toDF (“>”)瓦爾checkpoint_timestamp = emptyDF.withColumn (“current_timestamp”current_timestamp())當代().getTimestamp (1)+“+ 00:00”dbutils.fs。把(tbl_version_timestamp_path checkpoint_timestamp.toString (),真正的)
問題是超時之前最後一個命令和最後一個檢查點命令不會運行。我已經嚐試重試機製,但是,它超時。從源數據量很大,我不想流複製數據通過運行筆記本一次又一次,如果沒有適當的檢查點發生。我怎麼解決這個問題? ?我希望工作正確運行和儲存最後的檢查點時間戳,所以拿起在下一次運行形式,但它超時之前最後一個命令。錯誤的是:
錯誤:一些流終止之前這命令可以完成!流嚐試1失敗,重試2秒……流嚐試2失敗,重試4秒……重試:布爾=真正的retryCount:Int=3maxRetries:Int=3錯誤:一些流終止之前這命令可以完成!命令花了0.04秒