進口
org
。
apache
。
火花
。
sql
。
DataFrame
進口
org
。
apache
。
火花
。
sql
。
功能
{。
上校
,
爆炸
,
from_json
,
子字符串
}
進口
org
。
apache
。
火花
。
sql
。
類型
。
_
進口
org
。
apache
。
火花
。
sql
。
功能
{。
callUDF
,
from_json
,
子字符串
}
瓦爾
mySchema
= buildSchema ()
def
healthdataeventStream (
流
:
DataFrame
,
模式
:
StructType
DataFrame
= {
瓦爾
hdestream
=流
.withColumn (
“分區”
substring (input_file_name (), -
6
,
1
))
.withColumn (
“身體”
,from_json(坳(
“身體”
).cast (
“字符串”
),模式))
.withColumn (
“數據”
坳(
“Body.data”
))
瓦爾
flattenedData
= hdestream.select (
美元
“Data.schemaVersion”
,
美元
“Data.timeStamp”
,
美元
“Data.messageState”
,
美元
“Data.eventId”
,
美元
“Data.eventTimeStamp”
,
美元
“Data.machineDetail.serialNumber”
,
美元
“Data.machineDetail.name”
,
美元
“Data.serviceMeterHours.value”
,
美元
“Data.serviceMeterHours.unit”
,
美元
“Data.eventDetail.name”
,
美元
“Data.eventDetail.description”
,
美元
“Data.eventDetail.typeId”
,
美元
“Data.eventDetail.typeDescription”
,
美元
“Data.eventDetail.severity”
,
美元
“Data.eventDetail.severityDescription”
,
美元
“Data.failureModeDetail.id”
,
美元
“Data.failureModeDetail.description”
,
美元
“Data.durationSeconds”
,
美元
“Data.tolerance.trigger.value”
。
作為
(
“tolerance_trigger_value”
),
美元
“Data.tolerance.trigger.reason”
。
作為
(
“tolerance_trigger_reason”
),
美元
“Data.tolerance.trigger.unit”
。
作為
(
“tolerance_trigger_unit”
),
美元
“Data.tolerance.worst.value”
。
作為
(
“tolerance_worst_value”
),
美元
“Data.tolerance.worst.reason”
。
作為
(
“tolerance_worst_reason”
),
美元
“Data.tolerance.worst.unit”
。
作為
(
“tolerance_worst_unit”
),
美元
“Data.sourceDetails.id”
,
美元
“Data.sourceDetails.description”
,
美元
“Data.positionDetails.global.lat”
,
美元
“Data.positionDetails.global.lon”
,
美元
“Data.positionDetails.global.elv”
)
flattenedData
}