取消
顯示的結果
而不是尋找
你的意思是:

管道工作流老兄

apiury
新的貢獻者三世

這就跟你問聲好!我有一個問題。我使用一個自動裝卸機攝取數據從原始三角洲湖,但是當我的管道開始,我想管道隻適用於新的數據。數據自動裝卸機吸入到三角洲湖,但是現在,我怎麼能區分新數據從舊嗎?

1接受解決方案

接受的解決方案

我有問題想攝取自動裝卸機作為一個批處理成dataframe。它主要用於直接寫一個表或流。我認為最好的方法是自動裝載到青銅然後做一個火花。讀到dataframe變換,然後寫/插入與spark.sql表

在原帖子查看解決方案

9回複9

etsyal1e2r3
貢獻者

你可以添加一個列,給它一個價值的天日期競選新添加的數據與selectExpr在自動裝卸機()函數。Itd這個樣子……

從pyspark.sql。功能導入current_timestamp spark.readStream.format (cloudFiles) \ .option (“cloudFiles。形式at", "json") \ # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "") \ .load("") \ .selectExpr( "*", "current_timestamp() as `Date_Pulled`", )

apiury
新的貢獻者三世

但是為什麼添加天天列?我認為自動裝卸機保持跟蹤的新文件。我的問題是,我怎麼能隻處理新的文件。我和二進製數據應用轉換,但我不想把它應用的所有數據。

自動裝卸機跟蹤文件所以它隻讀取一次,防止重複。如果你做一個計數之前和之後自動裝卸機每次你隻看到它添加新數據。現在你有@timestamp列?我不確定你的邏輯是什麼樣子的管道,但如果你有一個時間戳或date_pulled列可以過濾管道塔爾查詢的數據不存在在下表中管道通過檢查它在過去的時間戳/ date_pilled數據。但是如果你隻是抓住所有的數據到dataframe你可以做一個插入新表,更新現有記錄(如果你想)和插入新的。我隻能猜測你的邏輯是什麼樣子雖然沒有更多的信息:slightly_smiling_face:

apiury
新的貢獻者三世

檢查下表中的數據,還不退出並應用轉換不使用自動裝卸機是一樣的第一次再攝取嗎?例如,我有二進製數據(pcap文件格式),在銅層。我想將pcap轉換為csv格式和攝取銀層,但我不希望每次過程整個數據,所以隻有新文件的到來。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map