使用下麵的代碼:
參看= {}
df = spark.readStream.format (eventhubs) .options (* * conf) .load ()
dataDF = df.select(坳(“身體”).cast(“字符串”))
data = dataDF.select (json_tuple(坳(“身體”),“表”、“op_type”、“記錄”、“op_ts”)) \
.toDF(“表”、“op_type”、“記錄”、“op_ts”)
final_data = data.withColumn (“records_json from_json(坳(“記錄”),reqSchema))
final_data = final_data.select (
*(坳(“records_json。”+ field).alias(field) for field in reqSchema.fieldNames()],
坳(“op_type”),
坳(“op_ts”))
final_data.orderBy(坳(op_ts) .desc ())
final_data = final_data.dropDuplicates ([primaryKey])
final_data = final_data.distinct ()
final_data = final_data.drop (final_data.op_ts)
final_data = final_data.drop (final_data.op_type)
final_data.coalesce (1)。writeStream \
.format \“鋪”
.outputMode \(“追加”)
.option (“checkpointLocation”, checkPoint_url) \
.trigger \(一旦= True)
.start rawFilePath_url \
.awaitTermination ()