你好,
我有設置一個流過程,消費者從HDFS文件暫存目錄並寫入到目標位置。從另一個進程輸入目錄continuesouly得到文件。
假設文件生產商生產500萬條記錄發送到hdfs暫存目錄在5分鍾winodw 50個不同的文件。流過程中拿起文件(處理觸發設置為60秒)和處理他們所有人,但問題是它缺少2 - 3%記錄處理後輸入目錄的所有文件。
我檢查了檢查點源目錄,它顯示所有的文件和處理。
不知道還有什麼檢查。
@Prakash Chockalingam你能請幫助嗎?
數據集<行> dataDF = spark.readStream () .option(“9”,分隔符).option .option(“頭”,“假”)(“inferSchema”,“假”).option (“latestFirst”,“假”). schema (appConfig.getSchema ()) . csv (appConfig.getHdfsSourceDir ());
StreamingQuery查詢= dataDF.writeStream () .trigger (ProcessingTime。創建(60,TimeUnit.SECONDS)) .format(“鋪”).outputMode (OutputMode.Append ()) .partitionBy (appConfig.getPartitionedBy () .split (", ")) .option (“checkpointLocation appConfig.getHdfsNNUri appConfig.getHdfsOutputDir() +() + /檢查站).option(“壓縮”,appConfig.getSparkCompressType ()) .start (appConfig.getHdfsNNUri appConfig.getHdfsOutputDir () + ());query.awaitTermination ();
你好,
你是否檢查了micro-batch度量日誌嗎?這些指標將幫助您識別過程的記錄你的號碼和時間來處理數據。本文介紹的度量捕獲和如何使用它們知道你流工作。鏈接https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur..。
謝謝你!
你好,
你是否檢查了micro-batch度量日誌嗎?這些指標將幫助您識別過程的記錄你的號碼和時間來處理數據。本文介紹的度量捕獲和如何使用它們知道你流工作。鏈接https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur..。
謝謝你!