我建立一個青銅表與CDF-enables這些步驟:
df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。schemaLocation”, < schema_loc >) \ .option (“cloudFiles。格式”、“json”) \ .option (“cloudFiles。在ferColumnTypes", "true") \ .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \ .option("cloudFiles.includeExistingFiles", "true") \ .load() df.writeStream \ .format("delta") \ .trigger(once = True) \ .option("mergeSchema", "true") \ .option("checkpointLocation", )
創建表銅。mytable使用三角洲位置“<文件位置>”;ALTER TABLE銅牌。mytable TBLPROPERTIES(δ。enableChangeDataFeed = true);
df = spark.readStream.format (cloudFiles) \ .option (“cloudFiles。schemaLocation”, schema_loc) \ .option (“cloudFiles。格式”、“json”) \ .option (“cloudFiles。在ferColumnTypes", "true") \ .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \ .option("cloudFiles.includeExistingFiles", "true") \ .load() df.createOrReplaceTempView("bronze_company_info_dataset") sql_query = "INSERT INTO bronze.mytable TABLE bronze_dataset" spark.sql(sql_query)
/磚/火花/ python / pyspark / instrumentation_utils。py在包裝器(* args, * * kwargs) 46開始= time.perf_counter() 47個試題:- - - - - - > 48 res = func (* args, * * kwargs) 49記錄器。function_name log_success (50 module_name class_name, time.perf_counter()——開始,簽名/磚/火花/ python / pyspark / sql /會話。py在sql(自我,sqlQuery, * * kwargs) 1117 sqlQuery =格式化程序。形式at(sqlQuery, **kwargs) 1118 try: -> 1119 return DataFrame(self._jsparkSession.sql(sqlQuery), self) 1120 finally: 1121 if len(kwargs) > 0: /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args) 1319 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name)