我運行一個龐大的曆史大約250 gb ~ 6毫升電話錄音文本(json讀入原始文本)從原始- >銅管道使用pyspark Azure磚。
源安裝存儲和不斷有文件添加和我們不刪除/檔案的來源。
我現在使用自動裝卸機和觸發可用。(參見下麵的代碼)
這是比例很好,我能處理所有的文件在當前配置在2小時。
觸發器可以現在打破了大量曆史批我的集群大小。
我開始遇到問題後,我又開始流曆史已經完成。
microbatch執行日誌狀態,我latestOffset我引發的一部分大約需要4140000或69分鍾獲得補償。
一旦瘋狂抵消時間完成addBatch隻需要幾秒鍾附加到目標。
基於集群和配置我可以處理大約1300 rec /秒,直到曆史(~ 6毫升文件)完成但是一旦我開始第二批流得到了閱讀最新的抵消和我在小於1 rec /秒的過程。
我已經嚐試了多種配置漫無目的地是否解決了問題,但無濟於事,似乎沒有人在這個問題上我的最後是張貼在這裏。
有一點要注意的是,基於數據我沒有平衡列分區,不需要一個為下遊轉換解決方案有或沒有人會為我工作。
這是當前配置的讀寫流....
#流讀取函數def read_stream_raw(火花:SparkSession rawPath: str) - > DataFrame:““從指定路徑讀取流參數- - - - - - - - - - -火花:SparkSession火花會話rawPath: str路徑目錄的文件返回- - - - - - - DataFrame DataFrame與一列“價值”類型str為每一行包含原始數據在原始文件中“”“kafka_schema =“字符串值”返回(火花.readStream .format .option (“cloudFiles (“cloudFiles”)。格式”、“文本”).option .option (“wholetext”、“true”) (“cloudFiles。maxBytesPerTrigger”、“10 g”) . schema(“字符串值”).load (rawPath)) rawDF = read_stream_raw(火花,rawPath = landingPath)
#轉換def transform_raw_to_bronze(火花:SparkSession,青銅:DataFrame) - > DataFrame:““從指定路徑讀取流和標記一些元數據參數- - - - - - - - - - -火花:SparkSession火花會話青銅:DataFrame火花df的回報- - - - - - - DataFrame DataFrame與額外的列標簽的更多信息”“df =(青銅.select(點燃(“/我的/雲存儲”).alias(“數據源”),current_timestamp () .alias (“ingesttime”),“價值”,current_timestamp () .cast .alias(“日期”)(“ingestdate”)) .withColumn (“input_filename input_file_name()))返回df bronzeDF = transform_raw_to_bronze(火花,rawDF)
def create_stream_writer (dataframe: dataframe檢查點:str,名字:str, partition_column: str = None,模式:str =“追加”)- > DataStreamWriter:““流寫入指定路徑參數- - - - - - - - - - - dataframe: dataframe火花dataframe檢查點:str獨特檢查點位置名稱:str流partition_column唯一的識別名稱:str =沒有列到分區的流模式:str = "附加“文稿模式流的回報- - - - - - - StreamWriter積極流”“stream_writer = (dataframe .writeStream .format(“δ”).outputMode(模式).option (“checkpointLocation檢查點).queryName(名字).trigger (availableNow = True))如果partition_column不是沒有:返回stream_writer.partitionBy (partition_column)返回stream_writer rawToBronzeWriter = create_stream_writer (dataframe = bronzeDF檢查點= bronzeCheckpoint模式=“追加”,name = " rawToBronze ")流= rawToBronzeWriter.start (bronzePath)
@Drew林格,這裏發生的事情是,目錄太大,做一個完整的掃描後,第二批處理需要時間,應在DBR 9.1 +並行。我認為你需要的是IncrementalListing在你的目錄。如果你沒有看到它,這部分我們的文檔應該幫助:https://docs.m.eheci.com/spark/latest/structured-streaming/auto-loader.html incremental-listing-1
我可以嚐試cloudFiles。useIncrementalListing“真實的”,然後手動指定回填間隔,如“一天”。
你的假設是正確的。文件目錄是巨大的和隻會變得更大。使用選項(“cloudFiles。useNotifications”、“真正的”)我必須配置Azure事件隊列存儲網格和基於文檔在這裏對嗎?
嗨@Drew林格,我不熟悉Azure。但是的看著下麵的好文章。我認為一些配置需要設置。
這可能是人們從Azure更好的評論
謝謝,
RK