從結構化流查詢故障中恢複

結構化流提供容錯和數據流查詢的一致性;使用磚工作流,您可以很容易地配置您的結構化流查詢自動重新啟動失敗。通過使檢查點流查詢,您可以重新啟動後查詢失敗。重新啟動查詢繼續失敗的一個。

使檢查點結構化流查詢

磚建議你總是指定checkpointLocation選擇一個雲存儲路徑在你開始查詢。例如:

streamingDataFramewriteStream格式(“鋪”)選項(“路徑”,“/道路/ /表”)選項(“checkpointLocation”,“/道路/ /表/ _checkpoint”)開始()

這個檢查點位置保存的所有基本信息,識別一個查詢。每個查詢都必須有一個不同的檢查點位置。多個查詢不應該相同的位置。有關更多信息,請參見結構化流編程指南

請注意

checkpointLocation大多數類型的輸出需要下沉,下沉,如內存,可以自動生成一個臨時檢查點位置當你不提供checkpointLocation。這些臨時檢查點位置不保證任何容錯或數據一致性保證和可能無法清理幹淨。避免潛在的缺陷總是指定一個checkpointLocation

配置結構化流工作重新啟動流查詢失敗

您可以創建一個磚工作你的筆記本或JAR流查詢和配置:

  • 總是使用一個新的集群。

  • 總是在失敗重試。

工作有緊密集成結構化流api和可以監控所有流查詢活躍在跑步。這個配置確保如果任何查詢的一部分失敗,工作自動終止運行查詢(以及其他所有)和啟動一個新的運行在一個新的集群。這種消遣筆記本或JAR代碼並重新啟動所有的查詢。這是最安全的方式返回到一個好的狀態。

請注意

  • 失敗的活動流查詢導致活動運行失敗,終止所有其他流媒體查詢。

  • 您不需要使用streamingQuery.awaitTermination ()spark.streams.awaitAnyTermination ()在你的筆記本。工作時自動防止運行完成流媒體查詢是活躍的。

  • 磚建議用工作代替運行%dbutils.notebook.run ()當策劃結構化流筆記本。看到運行一個磚筆記本從另一個筆記本

以下是推薦工作配置的一個示例。

  • 集群:設置這個總是使用一個新的集群和使用最新的火花版本(或至少2.1版本)。查詢開始引發2.1及以上的查詢和火花版本升級後可恢複。

  • 通知:設置這個如果你希望電子郵件通知失敗。

  • 時間表:不設置一個時間表

  • 超時:不設置一個超時。流媒體查詢無限期地為一個長的時間。

  • 最大並發運行:設置為1。同時隻能有一個實例,每個查詢活躍。

  • 重試:設置為無限的

看到創建和運行數據磚的工作理解這些配置。

更改後的恢複以結構化查詢流

有什麼變化限製在流查詢允許重啟之間相同的檢查點位置。這裏有一些種類的變化,要麼是不允許的,或者改變的影響並不明確。所有人:

  • 這個詞允許意味著你能做指定的改變,但是其效果的語義是否明確定義取決於查詢和改變。

  • 這個詞不允許意味著你不應該做指定的變化重新查詢與不可預測的錯誤可能會失敗。

  • 自衛隊代表一個流DataFrame /數據集生成sparkSession.readStream

類型的結構化查詢流的變化

  • 數量和類型的變化(即不同的源)的輸入源:這是不允許的。

  • 輸入源的參數的變化:這是否允許,是否定義良好的語義變化取決於源和查詢。這裏有一些例子。

    • 添加、刪除和修改的速度限製是允許的:

      火花readStream格式(“卡夫卡”)。選項(“訂閱”,“文章”)

      火花readStream格式(“卡夫卡”)。選項(“訂閱”,“文章”)。選項(“maxOffsetsPerTrigger”,…)
    • 修改訂閱的文章和文件通常不允許結果是不可預測的:spark.readStream.format(“卡夫卡”).option(“訂閱”,“文章”)spark.readStream.format(“卡夫卡”).option(“訂閱”,“newarticle”)

  • 改變輸出的類型:一些特定組合的水槽之間的變化是允許的。這需要在個案基礎上進行驗證。這裏有一些例子。

    • 文件沉到卡夫卡水槽是被允許的。卡夫卡隻會看到新的數據。

    • 卡夫卡水槽,水槽文件是不允許的。

    • 卡夫卡水槽改為foreach,反之亦然。

  • 參數的變化輸出下沉:這是否允許,是否定義良好的語義變化取決於水槽和查詢。這裏有一些例子。

    • 更改文件的輸出目錄水槽是不允許的:sdf.writeStream.format(“鋪”).option(“路徑”,“/ somePath”)sdf.writeStream.format(“鋪”).option(“路徑”,“/ anotherPath”)

    • 修改輸出主題是允許的:sdf.writeStream.format(“卡夫卡”).option(“主題”,“人類”)sdf.writeStream.format(“卡夫卡”).option(“主題”,“話題二”)

    • 更改用戶定義的foreach水槽(即ForeachWriter代碼)是允許的,但改變依賴於代碼的語義。

  • 投影的變化/過濾器/類似操作:有些情況下是允許的。例如:

    • 添加/刪除過濾器是允許的:sdf.selectExpr (“”)sdf.where (…) .selectExpr (“a”) .filter (…)

    • 與相同的輸出模式允許變化的預測:sdf.selectExpr (“stringColumn作為json) .writeStreamsdf.select (to_json (…)。as (json)) .writeStream

    • 預測的變化與不同的輸出模式是有條件地允許:sdf.selectExpr .writeStream (“a”)sdf.selectExpr .writeStream (b)是隻允許如果輸出水槽允許模式改變的“一個”“b”

  • 變化有狀態操作:一些流媒體業務查詢需要維護狀態數據,以不斷更新的結果。結構化流自動檢查點狀態數據容錯存儲(例如,DBFS AWS S3, Azure Blob存儲)和恢複後重新啟動。然而,這種假設狀態數據的模式仍然在重啟時相同。這意味著任何更改(添加、刪除或修改模式)的有狀態操作流查詢之間不允許重啟。這是有狀態的列表操作的模式重新啟動之間不應該被改變,以確保國家恢複:

    • 流媒體聚合:例如,sdf.groupBy (“a”) .agg (…)。任何變化在數量或類型的分組關鍵字或聚合是不允許的。

    • 流媒體重複數據刪除:例如,sdf.dropDuplicates (“”)。任何變化在數量或類型的分組關鍵字或聚合是不允許的。

    • Stream-stream加入:例如,sdf1.join (sdf2…)(即兩個輸入生成sparkSession.readStream)。變化的模式或等值連接列是不允許的。加入的變化類型(外部或內部)不允許的。其他的變化加入條件是不明確的。

    • 任意的有狀態操作:例如,sdf.groupByKey (…) .mapGroupsWithState (…)sdf.groupByKey (…) .flatMapGroupsWithState (…)。任何改變的模式定義的狀態和超時的類型是不允許的。任何改變在用戶定義的狀態映照函數是允許的,但語義效應的變化取決於用戶定義的邏輯。如果你真的想支持模式變化狀態,那麼您可以顯式編碼/解碼複雜狀態數據結構到字節使用一種編碼/解碼方案,支持模式遷移。舉個例子,如果你保存狀態Avro-encoded字節,那麼你可以改變查詢之間的Avro-state-schema重啟恢複二進製狀態。