取消
顯示的結果
而不是尋找
你的意思是:

幫助集成流管道與AWS服務S3和λ

matty_f
新的貢獻者二世

大家好,我是試圖建立一個三角洲生活表管道吸入gzip壓縮檔案作為他們上傳到S3。檔案包含2專有格式的文件,和一個需要確定如何解析。一旦文件內容解析,字典產生和數組寫入表中

我設法把這件工作使用UDF(使用BytesIO、tarfile和自定義解析庫包裝主機二進製)。我不確定我應該執行這樣的沉重的計算在一個UDF,尤其是接觸UDF和執行shell命令。如果有更合適的解決方案,請讓我知道下麵(當前代碼)

一旦檔案內容被解析並轉換為行,他們被寫入三角洲住表。從那裏,有多個表,所有讀取和過濾數據。當出現新的匹配行,數據需要轉發Lambda函數進行進一步處理,另一塊將非結構化數據轉換為結構化的專有技術

我不能確定如何實現這一目標。我的第一個想法是用“foreachBatch”寫入SQS或運動基礎設施,我自己必須提供。從那裏,我還將配置Lambda函數來處理數據,然後把結果返回到另一個水槽,磚就可以讀取。

的這一切,我不知道如何管理版本控製ETL作業,這樣如果我部署新代碼ETL、數據磚知道哪些下遊資產過時,可能需要重新生成過程中保留下來,但我可能會保存這個新主題一旦我有管道運行

提前謝謝!

代碼壓縮和解析zipfiles

(注:這是大多數人工作當我做一個定製的運行在一個筆記本,但是我還沒有試過添加筆記本管道)

row_schema = ArrayType (StructType ([StructField (“foo”, StringType ()), StructField(“酒吧”,IntegerType ()),))) @udf (returnType = row_schema) def extract_rows(內容):#解壓存檔檔案= tarfile.open (fileobj = io.BytesIO(內容))#解析存檔內容……#提取變量的行數從解析內容……@dlt。表def my_table():文件= (spark.readStream.format .option (“cloudFiles (“cloudFiles”)。格式”、“binaryFile”) .load (s3: / /…))返回文件。選擇(extract_rows (files.content) .alias(“行”)).select(爆炸(“行”)).select(“上校*”)

0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map