取消
顯示的結果
而不是尋找
你的意思是:

閱讀從AWS運動監測日誌

哈裏森
新的因素

如果你有AWS CloudWatch訂閱日誌寫出AWS運動,運動流是base64編碼和監測日誌GZIP壓縮。我們麵臨的挑戰是如何解決,在pyspark能夠讀取數據。

我們可以創建一個UDF處理數據是在使用火花結構化流

進口 zlib

# #創建UDF來壓縮數據

def decompress_func (x):
試一試 :
返回 zlib.decompress (x, zlib。MAX_WBITS | 32 ).decode ( “utf - 8” )
除了 異常 作為 艾凡:
返回 str (e) #返回錯誤消息,而不是沒有

#注冊UDF
udf_decompress = udf (decompress_func)

DataFrame從運動

運動=火花。readStream \
格式 ( “運動” )\
.option ( “streamName” stream_name) \
.option ( “地區” , 地區 )\
.option ( “roleArn” ,角色)\
.option ( “initialPosition” , “最早” )\
.load ()

#應用減壓
運動= kinesis.withColumn ( “uncompressed_data” ,udf_decompress(坳( “數據” ))).drop ( “partitionKey” , “數據” , “shardId” , “sequenceNumber” ).withColumn ( “LOG_DATE” to_date ( “approximateArrivalTimestamp” ))
請確認取代stream_name、地區和角色代碼中實際值特定於您的AWS環境。提供的代碼使用火花從AWS運動結構化流中讀取數據,解壓縮使用decompress_func UDF GZIP壓縮數據,並添加一個新列命名uncompressed_data DataFrame包含解壓數據。不必要的列是下降,添加一個新列LOG_DATE approximateArrivalTimestamp通過提取日期
0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map