在dbx community edition,自動裝卸機使用s3山工作。s3山,自動裝卸機:
dbutils.fs。山(f”s3a: / / {access_key}: {encoded_secret_key} @ {aws_bucket_name}”, f“/ mnt / {mount_name}
從pyspark。sql從pyspark.sql進口SparkSession。功能導入from_json坳source_directory = ' dbfs: / mnt / s3-mnt / logs / aws美元/事/設備/數據的destination_directory =“dbfs: / mnt / s3-mnt /數據/戴維斯/δ/數據”checkpoint_path =“dbfs: / mnt / s3-mnt /數據/戴維斯/δ/ data_checkpoint”#轉向data_schema2在s3時間戳對象1682389110770 #添加ac.Timestamp抓取模式= data_schema2 streaming_query =(火花。readStream .format .option (“cloudFiles (“cloudFiles”)。格式”、“json”) .option (“cloudFiles。schemaEvolutionMode”、“救援”)# .option(“源”、“s3: / / joe-open /”) # .option (“cloudFiles。schemaLocation . schema(模式)”,checkpoint_path) .option (“rescuedDataColumn”、“_rescued_data”) .load (source_directory) .writeStream .format .option(“δ”)(“路徑”,destination_directory) .option (“checkpointLocation”, checkpoint_path) .option (“cloudFiles。schemaEvolutionMode”、“真實”).option (“mergeSchema”,“真正的”).trigger (availableNow = True) .start ()) streaming_query.awaitTermination ()
在高端試驗,它失敗了
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - StreamingQueryException回溯(最近稱去年)文件<命令- 3092456776679220 >:38 15模式= data_schema2 17 streaming_query =(火花。19 .option readStream 18 .format (“cloudFiles”) (“cloudFiles。形式at", "json") (...) 35 .start() 36 ) ---> 38 streaming_query.awaitTermination() File /databricks/spark/python/pyspark/sql/streaming/query.py:201, in StreamingQuery.awaitTermination(self, timeout) 199 return self._jsq.awaitTermination(int(timeout * 1000)) 200 else: --> 201 return self._jsq.awaitTermination() File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:168, in capture_sql_exception..deco(*a, **kw) 164 converted = convert_exception(e.java_exception) 165 if not isinstance(converted, UnknownException): 166 # Hide where the exception came from that shows a non-Pythonic 167 # JVM exception message. --> 168 raise converted from None 169 else: 170 raise StreamingQueryException: [STREAM_FAILED] Query [id = ba24256e-c098-4c9c-9672-a96898104770, runId = b9037af2-98b8-4669-944f-7559adac1b57] terminated with exception: The bucket in the file event `{"backfill":{"bucket":"dbfsv1-files","key":"mnt/s3-mnt/logs/$aws/things/device/data/1682993996652","size":12304,"eventTime":1682993997000}}` is different from expected by the source: `[s3 bucket name]`. ... NOTE: [s3 bucket name] is my scrubbing of the s3 bucket name.
這是什麼意思?我怎麼恢複從社會支付dbx半自動的?