取消
顯示的結果
而不是尋找
你的意思是:

scala中的模式定義幫助筆記本在磚! ! ! ! ! ! ! 1

Ruby8376
貢獻者

我構建模式傳入avro文件(json消息)並創建一個最終dataframe。構建的模式看起來好按提供json示例消息但是我在所有字段空值。有人看這段代碼,能告訴我如果我做錯了什麼?

這是json消息:{
“schemaVersion”: 4
“時間戳”:“2021 - 10 - 05 - t08:39:03.201 + 05:30”,
“messageState”:“新”,
“eventId”: 28901年,
“eventTimeStamp”:“2021 - 10 - 05 - t08:39:03.174 + 05:30”,
" machineDetail ": {
:“serialNumber ERS00075”,
“名稱”:“TRK15”
},
" serviceMeterHours ": {
“價值”:754年,
“單位”:“人力資源”
},
" eventDetail ": {
“名稱”:“測試機維修事件激活2554”,
“描述”:“TEST-Jacket水機油溫度低的警告”,
“類型id”:“1”,
“typeDescription”:“低”,
“嚴重程度”:“1”,
“severityDescription”:“維護”
},
" failureModeDetail ": {
“id”: 21日
“描述”:“數據不穩定的,間歇性的或不正確的。”
},
“durationSeconds”: 18894.0,
“寬容”:{
“觸發”:{
“價值”:635.56,
“理由”:“高”,
“單位”:“t”
},
“最”:{
“價值”:433.94,
“理由”:“高”,
“單位”:“t”
}
},
" sourceDetails ": {
“id”: 279年,
“描述”:“報警模塊# 1”
},
" positionDetails ": {
"全球":{
“緯度”:33.200424,
“朗”:435.99,
“弱電”:69.0
}
}
}

我的代碼:

進口 org apache 火花 sql 類型 _

def buildSchema (): StructType = {
返回 StructType ()
閥門( “數據” , StructType ()
閥門( “schemaVersion” , IntegerType )
/ /閥門(“時間戳”,StringType)
/ /閥門(messageState StringType)
/ /閥門(eventId LongType)
/ /閥門(eventTimeStamp StringType)
/ /閥門(“machineDetail”,新的StructType ()
/ /閥門(serialNumber StringType)
/ /閥門(“名字”,StringType)
/ /)
/ /閥門(“serviceMeterHours”,新的StructType ()
/ /閥門(“價值”,IntegerType)
/ /閥門(“單位”,StringType)
/ /)
/ /閥門(“eventDetail”,新的StructType ()
/ /閥門(“名字”,StringType)
/ /閥門(“類型id”, StringType)
/ /閥門(typeDescription StringType)
/ /閥門(“嚴重程度”,StringType)
/ /閥門(“描述”,StringType)
/ /閥門(severityDescription StringType)
/ /)
/ /閥門(“failureModeDetail”,新的StructType ()
/ /閥門(“id”, IntegerType)
/ /閥門(“描述”,StringType)
/ /)
/ /閥門(“durationSeconds”,倍增式)
/ /閥門(“寬容”,新的StructType ()
/ /閥門(“觸發”,新的StructType ()
/ /閥門(“價值”,倍增式)
/ /閥門(“原因”,StringType)
/ /閥門(“單位”,StringType)
/ /)
/ /閥門(“壞”,新的StructType ()
/ /閥門(“價值”,倍增式)
/ /閥門(“原因”,StringType)
/ /閥門(“單位”,StringType)
/ /)
/ /)
/ /閥門(“sourceDetails”,新的StructType ()
/ /閥門(“id”, IntegerType)
/ /閥門(“描述”,StringType)
/ /)
/ /閥門(“positionDetails”,新的StructType ()
/ /閥門(“全球”,新的StructType ()
/ /閥門(“緯度”,倍增式)
/ /閥門(“朗”,倍增式)
/ /閥門(“弱電”,倍增式)
/ /)
)
)
}
進口 org apache 火花 sql DataFrame
進口 org apache 火花 sql 功能 {。 上校 , 爆炸 , from_json , 子字符串 }
進口 org apache 火花 sql 類型 _
進口 org apache 火花 sql 功能 {。 callUDF , from_json , 子字符串 }

瓦爾 mySchema = buildSchema ()

def healthdataeventStream ( : DataFrame , 模式 : StructType :disappointed_face: DataFrame = {
瓦爾 hdestream =流
.withColumn ( “分區” substring (input_file_name (), - 6 , 1 ))
.withColumn ( “身體” ,from_json(坳( “身體” ).cast ( “字符串” ),模式))
.withColumn ( “數據” 坳( “Body.data” ))

瓦爾 flattenedData = hdestream.select (
美元 “Data.schemaVersion” ,
美元 “Data.timeStamp” ,
美元 “Data.messageState” ,
美元 “Data.eventId” ,
美元 “Data.eventTimeStamp” ,
美元 “Data.machineDetail.serialNumber” ,
美元 “Data.machineDetail.name” ,
美元 “Data.serviceMeterHours.value” ,
美元 “Data.serviceMeterHours.unit” ,
美元 “Data.eventDetail.name” ,
美元 “Data.eventDetail.description” ,
美元 “Data.eventDetail.typeId” ,
美元 “Data.eventDetail.typeDescription” ,
美元 “Data.eventDetail.severity” ,
美元 “Data.eventDetail.severityDescription” ,
美元 “Data.failureModeDetail.id” ,
美元 “Data.failureModeDetail.description” ,
美元 “Data.durationSeconds” ,
美元 “Data.tolerance.trigger.value” 作為 ( “tolerance_trigger_value” ),
美元 “Data.tolerance.trigger.reason” 作為 ( “tolerance_trigger_reason” ),
美元 “Data.tolerance.trigger.unit” 作為 ( “tolerance_trigger_unit” ),
美元 “Data.tolerance.worst.value” 作為 ( “tolerance_worst_value” ),
美元 “Data.tolerance.worst.reason” 作為 ( “tolerance_worst_reason” ),
美元 “Data.tolerance.worst.unit” 作為 ( “tolerance_worst_unit” ),
美元 “Data.sourceDetails.id” ,
美元 “Data.sourceDetails.description” ,
美元 “Data.positionDetails.global.lat” ,
美元 “Data.positionDetails.global.lon” ,
美元 “Data.positionDetails.global.elv”
)

flattenedData
}

0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map