我需要一些解決以下問題。
我們組json文件不斷aws s3,這些文件包含一個屬性的詳細信息。請注意1在這個json文件屬性可以有10 - 12行。附件是示例json文件。
我們需要閱讀這些文件流,然後我們需要創建壓實視圖的屬性,這意味著所有財產和行合並,創建一個單一的行/屬性。一旦完成,我們可以寫出,δ/ documentDB / DynamoDB流。
我試過下麵
def mount_s3_bucket (access_key secret_key、bucket_name mount_folder):
ACCESS_KEY_ID = access_key
SECRET_ACCESS_KEY = secret_key
ENCODED_SECRET_KEY = SECRET_ACCESS_KEY。替換(“/”、“% 2 f”)
打印(“安裝”,bucket_name)
試一試:
#卸載數據,以防它已經安裝。
dbutils.fs。卸載(/ mnt / % s % mount_folder)
除了:
#如果不能卸載它最有可能沒有安裝在第一個地方
打印(“目錄不卸載:”,mount_folder)
最後:
#最後,我們鬥山。
dbutils.fs。山(“s3a: / / % s: % s@ % s % (ACCESS_KEY_ID, ENCODED_SECRET_KEY bucket_name)“/ mnt / % s”% mount_folder)
# dbutils.fs。山(“s3a: / /”+ ACCESS_KEY_ID +”:“+ ENCODED_SECRET_KEY +“@”+ bucket_name mount_folder)
print (bucket_name“鬥”,“安裝”,mount_folder,“\ n”)
#設置AWS編程訪問憑證
從pyspark。sql進口SparkSession
從pyspark.sql。功能導入*
從pyspark.sql。導入類型*
火花= SparkSession.builder.appName (Comparis-data-stream-app) .getOrCreate ()
打印(會話創建的)
# JSONschema = StructType ([
# StructField(“用戶名”,StringType(),真的),
# StructField(“貨幣”,StringType(),真的),
# StructField(“量”,LongType(),真的),
#))
JSONschema = StructType ([
StructField (“id”, StringType(),假),
StructField (“address1 StringType(),真的,也沒有),
StructField (“address2 StringType(),真的,也沒有),
StructField(“城”,StringType(),真的,也沒有),
StructField(“狀態”,StringType(),真的,沒有),
StructField (“postalCode StringType(),真的,沒有),
StructField(“價格”,IntegerType(),真的,也沒有),
StructField(“尺寸”,IntegerType(),真的,沒有),
StructField(“房間”,IntegerType(),真的,也沒有),
StructField(“緯度”,DecimalType(),真的,沒有),
StructField(“液化天然氣”,DecimalType(),真的,沒有),
StructField (“hash_lat_lng StringType(),真的,也沒有),
StructField(“源”,StringType(),真的,也沒有),
StructField (“source_premium StringType(),真的,也沒有),
StructField(“時間戳”,TimestampType(),真的,也沒有),
StructField(“屬性”,StructType ([
StructField(“類型”,StringType ()),
StructField(“提升”,StringType ()),
StructField(“花園”,StringType ()),
StructField(“加熱”,StringType ()),
StructField (“washing_machine StringType ()),
StructField(“地板”,StringType ()),
StructField (“year_of_construction IntegerType ())
)))))
ds = (spark.readStream
. schema (JSONschema)
.format (json)
.option (“maxFilesPerTrigger”, 1)
.load (“/ mnt / raj-zuk-comparis-poc / * . json消息”))
flattened_df = (ds。withColumn (“property_type expr (“attributes.type”))
.withColumn (“property_lift expr (“attributes.lift”))
.withColumn (“property_garden expr (“attributes.garden”))
.withColumn (“property_heating expr (“attributes.heating”))
.withColumn (“property_washing_machine expr (“attributes.washing_machine”))
.withColumn (“property_floor expr (“attributes.floor”))
.withColumn (“property_year_of_construction expr (“attributes.year_of_construction”))
)
tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47
.groupBy(窗口(坳(“時間戳”)、“4小時”),(“源”)上校,上校(“hash_lat_lng”))
.agg (max(“時間戳”).alias(“時間戳”),
第一個(“address1”) .alias (“address1”),
第一個(“address2”) .alias (“address2”),
第一個(“城”).alias(“城市”),
第一個(“狀態”).alias(“狀態”),
第一個(“postalCode”) .alias (“postalCode”),
第一個(“價格”).alias(“價格”),
第一個(“大小”).alias(“大小”),
第一個(“房間”).alias(“房間”),
第一個(“property_type”) .alias (“property_type”),
第一個(“property_lift”) .alias (“property_lift”),
第一個(“property_garden”) .alias (“property_garden”),
第一個(“property_heating”) .alias (“property_heating”),
第一個(“property_washing_machine”) .alias (“property_washing_machine”),
第一個(“property_floor”) .alias (“property_floor”),
第一個(“property_year_of_construction”) .alias (“property_year_of_construction”),
第一個(“source_premium”) .alias (“source_premium”))
.orderBy(坳(“window.start”))
)
這之後我不知道如何寫這個嗎?你能建議什麼應該編寫或解決這個問題的正確方法嗎?