取消
顯示的結果
而不是尋找
你的意思是:

火花StreamingQuery不是處理所有數據從源目錄

RajaLakshmanan
新的因素

你好,

我有設置一個流過程,消費者從HDFS文件暫存目錄並寫入到目標位置。從另一個進程輸入目錄continuesouly得到文件。

假設文件生產商生產500萬條記錄發送到hdfs暫存目錄在5分鍾winodw 50個不同的文件。流過程中拿起文件(處理觸發設置為60秒)和處理他們所有人,但問題是它缺少2 - 3%記錄處理後輸入目錄的所有文件。

我檢查了檢查點源目錄,它顯示所有的文件和處理。

不知道還有什麼檢查。

@Prakash Chockalingam你能請幫助嗎?

數據集<行> dataDF = spark.readStream () .option(“9”,分隔符).option .option(“頭”,“假”)(“inferSchema”,“假”).option (“latestFirst”,“假”). schema (appConfig.getSchema ()) . csv (appConfig.getHdfsSourceDir ());

StreamingQuery查詢= dataDF.writeStream () .trigger (ProcessingTime。創建(60,TimeUnit.SECONDS)) .format(“鋪”).outputMode (OutputMode.Append ()) .partitionBy (appConfig.getPartitionedBy () .split (", ")) .option (“checkpointLocation appConfig.getHdfsNNUri appConfig.getHdfsOutputDir() +() + /檢查站).option(“壓縮”,appConfig.getSparkCompressType ()) .start (appConfig.getHdfsNNUri appConfig.getHdfsOutputDir () + ());query.awaitTermination ();

1接受解決方案

接受的解決方案

jose_gonzalez
主持人
主持人

你好,

你是否檢查了micro-batch度量日誌嗎?這些指標將幫助您識別過程的記錄你的號碼和時間來處理數據。本文介紹的度量捕獲和如何使用它們知道你流工作。鏈接https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur..。

謝謝你!

在原帖子查看解決方案

3回複3

Kaniz
社區經理
社區經理

嗨@RajaLakshmanan !我的名字叫Kaniz,和我是一個技術主持人。很高興認識你,謝謝你的問題!看看你的同行在論壇上先回答你的問題。否則我們將很快跟進與回複。

jose_gonzalez
主持人
主持人

你好,

你是否檢查了micro-batch度量日誌嗎?這些指標將幫助您識別過程的記錄你的號碼和時間來處理數據。本文介紹的度量捕獲和如何使用它們知道你流工作。鏈接https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur..。

謝謝你!

User16763506586
貢獻者

如果有幫助,你嚐試運行Left-Anti加入源和彙來確定任務記錄,是否提供的記錄是在匹配模式

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map