跳轉到主要內容
工程的博客

結構化流:一年審查

2022年2月7日, 工程數據

分享這篇文章

,當我們進入2022年,我們想花點時間反思的很大進步流媒體在磚和Apache的火花™!2021年,工程團隊和開源貢獻者大量進步了三個目標:

  1. 更低的延遲和改進狀態流處理
  2. 提高磚的可觀察性和火花結構化流工作負載
  3. 提高資源配置和可伸縮性

最終,這些目標背後的動機是為了讓更多的團隊運行流負載在磚和火花,方便客戶操作關鍵任務同時生產流媒體應用程序數據磚和優化成本效率和資源使用。beplay体育app下载地址

目標# 1:更低的延遲和改進狀態處理

有兩個新的關鍵特性,專門針對降低延遲和有狀態操作,以及改善狀態api。第一個大型狀態是異步的檢查點操作,使在曆史上同步設計和更高的延遲。

異步的檢查點

新磚的異步檢查點”功能目標降低延遲很大有狀態操作。

在這個模型中,狀態更新寫入到雲存儲檢查點位置下microbatch開始之前。的優勢是,如果一個有狀態的流媒體查詢失敗,我們可以很容易的重新啟動查詢通過使用信息從最後一批成功完成。在異步模型,下一個microbatch不必等待狀態更新,改善整體的端到端延時microbatch執行。

在異步模型,下一個microbatch不必等待狀態更新,改善整體的端到端延時microbatch執行。

您可以了解更多關於該特性在即將到來的深潛博客,並嚐試在磚運行時10.3及以上。

任意狀態算子的改進

在更之前的帖子結構中,我們介紹了任意狀態處理流媒體(平麵)MapGroupsWithState。這些運營商提供了很大的靈活性,使聚合之外更高級的有狀態操作。我們已經介紹了改進這些操作符:

  • 允許初始狀態,避免了需要再處理你所有的流數據。
  • 使簡單邏輯測試提供了一個新的TestGroupState接口,允許用戶創建的實例GroupState和訪問內部所設定的值,簡化單元測試的狀態轉換函數。

允許初始狀態

讓我們先從以下flatMapGroupswithState算子:

def flatMapGroupsWithState [S:編碼器,U:編碼器)(outputMode: outputMode,timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset [K S]) (函數:(K,迭代器[V], GroupState [S])= >迭代器(U))

這個自定義狀態函數保持運行計數的水果。

val fruitCountFunc =(關鍵:字符串價值觀:迭代器(字符串),狀態:GroupState [RunningCount])= >{val數= state.getOption.map (_.count) .getOrElse l (0) + valList.sizestate.update (RunningCount(計數)迭代器(關鍵,count.toString))}

在這個示例中,我們指定該操作符的初始狀態為某些水果:通過設置起始值

val fruitCountInitialDS:數據集(字符串RunningCount)] = > ((“蘋果”,RunningCount (1)),(“橙色”,RunningCount (2)),(“芒果”,RunningCount (5)),).toDS ()val fruitCountInitial = initialState.groupByKey (x= >x._1) .mapValues (_._2)
              fruitStream.groupByKey (x= >x).flatMapGroupsWithState(更新GroupStateTimeout。NoTimeout fruitCountInitial) (fruitCountFunc)

簡單的邏輯測試

你也可以現在測試狀態更新使用TestGroupState API。

進口org.apache.spark.sql.streaming._進口org.apache.spark.api.java.Optional測試(“flatMapGroupsWithState狀態更新函數”){varprevState = TestGroupState.create [UserStatus] (optionalState = Optional.empty [UserStatus],timeoutConf = GroupStateTimeout.EventTimeTimeout,batchProcessingTimeMs = 1 l,eventTimeWatermarkMs = Optional.of(1升),hasTimedOut =)val標識:字符串=……val行動:迭代器(UserAction) =…
              斷言(prevState.hasUpdated !)
              updateState(標識、行動、prevState)
              斷言(prevState.hasUpdated)
              }

你可以找到這些,更多的例子磚的文檔

本機支持會話窗口

結構化流媒體介紹了能力基於聚合在事件時間窗口使用滾動或滑動窗口,這兩個是固定長度的窗戶。在火花3.2中,我們介紹的概念會話窗口,允許動態窗口長度。這曆史上需要自定義狀態使用flatMapGroupsWithState運營商。

使用動態間隙的一個例子:

#定義會話窗口動態基於差距持續時間eventTypesession_window expr=session_window(事件。時間戳,\(events.eventType==類型1、5秒”)\(events.eventType==“type2”、“20秒”)\.otherwise(5分鍾)#集團的數據通過會話窗口用戶標識,計算出數量每一個集團windowedCountsDF=事件\.withWatermark(“時間戳”,“十分鍾”)\.groupBy(事件。userID, session_window_expr) \()

目標2:提高流媒體工作負載的可觀測性

StreamingQueryListener異步API允許您監控中查詢SparkSession和自定義回調函數查詢狀態,進展,和終止事件,理解背壓和推理的瓶頸在microbatch仍具有挑戰性。磚8.1運行時,StreamingQueryProgress對象數據源特定的背壓指標報告卡夫卡,運動,三角洲湖自動加載程序流源。

度量提供了卡夫卡的一個例子:

{“源”:[{“描述”:“KafkaV2訂閱(主題)”,“指標”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},})}

磚運行時8.3引入了實時度量來幫助理解的表現RocksDB狀態存儲和調試狀態操作的性能。這些也可以幫助識別目標的工作負載為異步檢查點。

一個新的狀態存儲度量的例子:

{“id”:“6774075 e - 8869 - 454 b - ad51 - 513 be86cfd43”,“runId”:“3 d08104d-d1d4-4d1a-b21e-0b2e1fb871c5”,“batchId”:7,“stateOperators”:[{“numRowsTotal”:20000000,“numRowsUpdated”:20000000,“memoryUsedBytes”:31005397,“numRowsDroppedByWatermark”:0,“customMetrics”:{“rocksdbBytesCopied”:141037747,“rocksdbCommitCheckpointLatency”:2,“rocksdbCommitCompactLatency”:22061年,“rocksdbCommitFileSyncLatencyMs”:1710年,“rocksdbCommitFlushLatency”:19032年,“rocksdbCommitPauseLatency”:0,“rocksdbCommitWriteBatchLatency”:56155年,“rocksdbFilesCopied”:2,“rocksdbFilesReused”:0,“rocksdbGetCount”:40000000,“rocksdbGetLatency”:21834年,“rocksdbPutCount”:1,“rocksdbPutLatency”:56155599000,“rocksdbReadBlockCacheHitCount”:1988年,“rocksdbReadBlockCacheMissCount”:40341617,“rocksdbSstFileSize”:141037747,“rocksdbTotalBytesReadByCompaction”:336853375,“rocksdbTotalBytesReadByGet”:680000000,“rocksdbTotalBytesReadThroughIterator”:0,“rocksdbTotalBytesWrittenByCompaction”:141037747,“rocksdbTotalBytesWrittenByPut”:740000012,“rocksdbTotalCompactionLatencyMs”:21949695000,“rocksdbWriterStallLatencyMs”:0,“rocksdbZipFileBytesUncompressed”:7038年}}),“源”:[{}),“沉”:{}}

目標# 3:提高資源配置和可伸縮性

流自動定量與達美住表(DLT)

去年數據+人工智能峰會上,我們宣布三角洲生活表以聲明的方式,這是一個框架,該框架允許您構建和協調數據管道,和主要摘要需要配置集群和節點類型。我們把這進一步引入一個智能自動定量流管道,改進了現有的解決方案磚優化的自動定量。這些好處包括:

新算法利用新背壓指標調整集群大小來更好地處理場景中有流工作負載的波動,最終導致更好地使用集群。

雖然現有的自動定量解決退休節點隻有在空閑,新的DLT Autoscaler將主動關閉時選定的節點利用率低,同時保證不會有失敗的任務由於關閉。

  • 更好地利用集群:
  • 主動的優雅的工人停工:

目前正在編寫,此功能私人預覽。有關更多信息,請伸出您的帳戶的團隊。

Trigger.AvailableNow

在結構化流、觸發器允許一個用戶定義的時間流查詢的數據處理。這些觸發類型可以micro-batch(默認),固定間隔micro-batch (Trigger.ProcessingTime (“ ”),一次性micro-batch (Trigger.Once)和連續(Trigger.Continuous)。

磚10.1運行時介紹了一種新型的觸發;Trigger.AvailableNowthat is similar to Trigger.Once but provides better scalability. Like Trigger Once, all available data will be processed before the query is stopped, but in multiple batches instead of one. This is supported for Delta Lake and Auto Loader streaming sources.

例子:

spark.readStream.format(“δ”).option (“maxFilesPerTrigger”,“1”).load (inputDir)之類.writeStream觸發(Trigger.AvailableNow)checkpointDir .option (“checkpointLocation”)開始()

總結

在2022年即將來臨之際,我們將繼續加快創新結構化流,進一步提高性能,減少延遲和實現新的和令人興奮的功能。全年的更多信息,請繼續關注!

免費試著磚

相關的帖子

看到所有工程數據的帖子
Baidu
map