跳轉到主要內容
Beplay体育安卓版本平台的博客

特性深潛水:水印在Apache火花結構化流

2022年8月22日 產品

分享這篇文章

關鍵的外賣

  • 水印幫助火花理解基於事件的處理進展,當窗口的總量和削減的聚合狀態
  • 加入數據流時,火花,默認情況下,使用一個單一的全球水印就清除狀態基於最小事件時間看到整個輸入流
  • 可以利用RocksDB集群內存和GC暫停來減少壓力
  • StreamingQueryProgressStateOperatorProgress對象包含關鍵信息水印如何影響你的流

介紹

在構建實時管道,一個團隊必須與現實是分布式數據攝入本質上是無序的。此外,在有狀態的背景下流媒體操作,團隊需要能夠正確跟蹤事件時間進展的數據流他們攝取適當的時間窗口的計算聚合和其他有狀態操作。我們可以求出所有這些使用結構化的流。

例如,讓我們說我們是一個團隊致力於建立一個管道來幫助我們公司做積極維護我們的采礦機租賃給我們的客戶。beplay体育app下载地址這些機器都需要運行在最佳狀態,所以我們在實時監控它們。我們需要在流數據上執行狀態聚合機器理解和識別問題。

這是我們需要利用結構化流和水印產生必要的有狀態聚合將幫助決定在預測為這些機器維護和更多。

水印是什麼?

一般來說,在處理實時流數據會有延誤事件時間和處理時間之間由於數據是如何吸收和整個應用程序的經驗是否停機等問題。由於這些潛在變量延遲,您所使用的引擎來處理這些數據需要有一些機製來決定何時關閉總windows和產生聚合的結果。

雖然自然傾向來彌補這些問題可能是使用一個固定的延遲掛鍾時間的基礎上,我們將展示在這個即將到來的例子為什麼這不是最好的解決方案。

解釋這個視覺讓我們看一個場景,我們接收數據在不同時期各地的專機→11:20我。我們在創建10分鍾暴跌窗口計算的溫度和壓力讀數的平均值,在窗口的時期。

在第一個圖片,我們有暴跌windows上午11:00觸發,十一10點到11點導致結果表顯示在相應的時間。第二批數據大約上午11時數據的事件時間10:53是這個被納入上午11的平均溫度和壓力計算→窗口,關閉在周日上午11點,不給出正確的結果。

可視化表示結構化流管道攝取批次的溫度和壓力數據

骨料,以確保得到正確的結果我們要生產,我們需要定義一個水印讓火花了解什麼時候關閉的總窗口並生成正確的結果。

在結構化的流媒體應用程序中,我們可以確保所有相關數據收集的我們要計算的聚合是使用一個功能叫做水印。從最基本的意義上說,通過定義一個水印火花結構化流然後知道當它吸收所有數據到一段時間,T(基於一組遲到的期望),以便它可以關閉和生產視窗化聚合時間戳T

第二視覺顯示了10分鍾的實現水印的效果和使用Append模式在火花結構化流。

10分鍾的水印效果的可視化表示當應用於結構化流管道。

與第一個場景:火花發射窗口的聚合前十分鍾每十分鍾(即發出的上午11上午11→窗口十一10點),火花現在等待關閉和輸出窗口的聚合的最大事件時間-指定的水印大於窗口的上界。

換句話說,火花需要等到看到最新活動的數據點的時候看到大於上午11 - 10分鍾發出的專機是→上午11總窗口。上午11時,沒有看到這個,所以它隻初始化引發的聚合計算的內部狀態。上午11時,仍不滿足此條件,但是我們有一個新的數據點10:53是內部狀態更新,而已沒有發出。最後通過11:20我引發了數據點的事件時間11:15既然11:15 - 10分鍾是上午11:05比上午11晚的專機是→上午11窗口可以釋放結果表。

這產生正確的結果通過適當地將數據基於定義的預期遲到水印。一旦結果發出相應的狀態從狀態存儲中刪除。

將水印納入你的管道

了解如何將這些水印到我們的結構化流管道,我們將探討這個場景步行通過一個實際的代碼示例基於我們的用例說明這個博客的引言部分。

假設我們從卡夫卡攝取所有的傳感器數據集群的雲,我們要計算溫度和壓力平均每十分鍾預計扭曲時間的十分鍾。結構化流管道與水印看起來像這樣:

PySpark

sensorStreamDF=火花\.readStream \.format \(“卡夫卡”).option (“kafka.bootstrap。服務器”、“host1:端口1,\ host2:端口2”).option(“訂閱”、“tempAndPressureReadings”) \.load ()sensorStreamDF=sensorStreamDF \.withWatermark (“eventTimestamp”、“十分鍾”)\.groupBy (窗口(sensorStreamDF。eventTimestamp, \“十分鍾”))avg(sensorStreamDF.temperaturesensorStreamDF.pressure)
              sensorStreamDF.writeStream.format(“δ”).outputMode(“追加”).option (“checkpointLocation”、“/δ/事件/ _checkpoints / temp_pressure_job /”)開始(“/δ/ temperatureAndPressureAverages”)

這裏我們簡單地讀取卡夫卡,運用我們的轉換和聚合,然後寫出三角洲湖表將可視化和監控在磚SQL。輸出寫入表特定示例的數據是這樣的:

輸出流中定義查詢PySpark上麵的代碼示例

將水印我們首先需要確定兩個項目:

  1. 列代表傳感器的事件時間閱讀
  2. 數據的估計預期的時間傾斜

從上一個示例中,我們可以看到定義的水印.withWatermark ()方法與eventTimestamp列用作事件時間列和10分鍾來代表我們預計的時間傾斜。

PySpark

sensorStreamDF=sensorStreamDF \.withWatermark (“eventTimestamp”、“十分鍾”)\.groupBy (窗口(sensorStreamDF。eventTimestamp, \“十分鍾”))avg(sensorStreamDF.temperaturesensorStreamDF.pressure)

現在我們知道如何實現水印在結構化流管道,這將是重要的理解其他物品如流連接操作和管理狀態是影響水印。此外,當我們規模管道會有關鍵指標數據工程師需要了解和監控避免性能問題。我們將探討所有這些為我們深入研究水印。

水印在不同的輸出模式

在我們深入了解之前,重要的是要了解你所選擇的輸出模式影響水印設置的行為。

水印時隻能使用流媒體應用程序正在運行附加更新輸出模式。還有第三個輸出模式,完整的模式,在整個結果表寫入存儲。這種模式不能使用,因為它需要保存所有聚合數據,因此不能使用水印中間狀態。

這些輸出模式的含義在窗口的背景下聚合和水印是在“附加”模式聚合隻能生產一次,不能被更新。因此,一旦產生聚合,引擎可以刪除聚合的狀態,從而保持整個聚合狀態有界的。遲到的記錄——那些近似水印啟發式不適用(他們比水印延遲周期),因此必須下降了必要性——總一直在生產和總刪除狀態。

相反,“更新”模式下,總可以反複從第一條記錄開始和在每個接收記錄,因此水印是可選的。水印隻用於削減國家一次一些的引擎知道不再記錄總可以收到。一旦刪除狀態,再次任何遲到記錄必須被丟棄的合計價值已經丟失,不能被更新。

重要的是要了解狀態,後麵到達記錄,和不同的輸出模式可能導致不同的行為的應用程序運行在火花。這裏主要是添加和更新模式,一旦收到水印顯示所有的數據聚合時間窗,引擎可以將窗口狀態。append模式的總生產隻有在時間窗口的關閉加水印延遲在更新模式產生在每個更新窗口。

最後,通過增加水印延遲窗口你將導致管道等更長的時間數據和潛在下降較少的數據——更高的精度,而且更高的延遲生產總量。另一方麵,小水印延遲會導致較低的精度也較低的延遲生產總量。

窗口延遲長度 精度 延遲
時間延遲窗口 更高的精度 更高的延遲
更短的延遲窗口 低精度 更低的延遲

深入探討了水印

連接和水印

有幾個注意事項需要注意的在流媒體應用程序連接操作時,特別是當連接兩個流。假設我們的用例中,我們想要加入流數據集關於溫度和壓力讀數與附加價值被其他傳感器在機器。

有三個主要類型的stream-stream連接,可以實現結構化流:內、外,半連接。在流媒體應用程序連接的主要問題,你可能有一個完整的圖片的一側連接。給火花的理解在沒有未來的比賽預計類似於聚合,引發的問題需要理解早些時候沒有新行合並到在發射之前計算聚合。

允許火花處理這件事,我們可以利用水印和事件時間約束的聯接條件stream-stream加入。這種組合允許火花過濾後期記錄和修剪連接操作的狀態通過一個時間範圍條件的加入。我們證明這下麵的例子:

PySpark

sensorStreamDF=spark.readStream.format(“δ”)。(“sensorData”)tempAndPressStreamDF=spark.readStream.format(“δ”)。(“tempPressData”)sensorStreamDF_wtmrk=sensorStreamDF。withWatermark(“時間戳”、“5分鍾”)tempAndPressStreamDF_wtmrk=tempAndPressStreamDF。withWatermark(“時間戳”、“5分鍾”)joinedDF=tempAndPressStreamDF_wtmrk.alias (“t”)。加入(sensorStreamDF_wtmrk.alias (“s”),expr (“””年代。sensor_id = = t。sensor_id和年代。時間戳> = t。時間戳和年代。時間戳< = t。時間戳+間隔5分鍾”“”),joinType=“內心”)。withColumn (“sensorMeasure坳(“Sensor1”)+坳(“Sensor2”)) \.groupBy (窗口(坳(“t.timestamp”)、“十分鍾”))\.agg (avg(坳(“sensorMeasure”)) .alias (“avg_sensor_measure”),avg(坳(“溫度”)).alias (“avg_temperature”),avg(坳(“壓力”)).alias \ (“avg_pressure”))選擇(“窗口”、“avg_sensor_measure”、“avg_temperature”、“avg_pressure”)
              joinedDF.writeStream.format \(“δ”).outputMode \(“追加”).option (“checkpointLocation”、“/ /文件/檢查站”)\.toTable (“output_table”)

但是,與上麵的示例中,將會有,每個流可能需要不同的時間扭曲的水印。在這個場景中,火花有一個處理多個水印策略定義。火花維護一個全球水印基於最慢的流,以確保安全的最高金額時不丟失的數據。

開發人員確實有能力改變這種行為改變spark.sql.streaming.multipleWatermarkPolicy馬克思;然而,這意味著慢流的數據會被刪除。

看到全方位的連接操作,需要或者可以利用水印檢查本節火花的文檔。

監視和管理流與水印

當管理一個流媒體查詢,火花可能需要管理數以百萬計的密鑰和保持狀態,默認狀態存儲,數據磚集群可能不是有效的。你可能會開始看到內存利用率高,然後再垃圾收集停頓時間。這些都將阻礙結構化流媒體應用程序的性能和可伸縮性。

這是RocksDB進來的地方。您可以利用RocksDB在磚通過使它像火花配置:

spark.conf。(“spark.sql.streaming.stateStore.providerClass”,“com.databricks.sql.streaming.state.RocksDBStateStoreProvider”)

這將允許集群運行結構的流媒體應用程序利用RocksDB本機內存中可以更有效地管理國家和利用本地磁盤/ SSD,而不是保持在內存中所有狀態。

跟蹤內存使用量和垃圾收集指標之外,還有其他關鍵指標和指標應收集和跟蹤處理水印和結構化流。您可以查看訪問這些指標StreamingQueryProgressStateOperatorProgress對象。看看我們如何使用這些文檔的例子在這裏

StreamingQueryProgress對象中,有一個方法叫做“eventTime”,可以調用將返回馬克斯,最小值,avg,水印時間戳。前三個是最大值、最小值、平均事件時間在觸發。最後一個觸發器中使用的水印。

縮寫StreamingQueryProgress對象的例子

{“id”:“f4311acb - 15 - da - 4 - dc3 - 80 - b2 - acae4a0b6c11”,“eventTime”:{“平均”:“2021 - 02 - 14 - t10:56:06.000z”,“馬克斯”:“2021 - 02 - 14 - t11:01:06.000z”,“最小值”:“2021 - 02 - 14 - t10:51:06.000z”,“水印”:“2021 - 02 - 14 - t10:41:06.000z”},“stateOperators”:[{“operatorName”:“stateStoreSave”,“numRowsTotal”:7,“numRowsUpdated”:0,“allUpdatesTimeMs”:205年,“numRowsRemoved”:0,“allRemovalsTimeMs”:233年,“commitTimeMs”:15182年,“memoryUsedBytes”:91504年,“numRowsDroppedByWatermark”:0,“numShufflePartitions”:200年,“numStateStoreInstances”:200年,“customMetrics”:{“loadedMapCacheHitCount”:4800年,“loadedMapCacheMissCount”:0,“stateOnCurrentVersionSizeBytes”:25680年}}}

這些信息可以用來調和的結果表中的數據流查詢輸出,也可用於驗證所使用的水印是為了eventTime時間戳。這可以成為重要當你加入的數據流在一起。

在有StateOperatorProgress對象numRowsDroppedByWatermark指標。這個指標會顯示多少行被認為太遲是包含在有狀態的聚合。注意,這個指標是測量行了事後的而不是原始輸入行,所以數量不精確,但可以表明有後期數據被刪除。這一點,結合從StreamingQueryProgress對象的信息,可以幫助開發人員判斷水印是否正確配置。

多個聚合、流媒體和水印

結構化流查詢剩餘的限製之一是鏈接多個狀態運營商(如聚合流連接的)在一個流媒體查詢。這種限製的一個單一的全球水印有狀態聚合在磚是我們正在研究解決方案,並將在未來幾個月釋放更多信息。看看我們的博客上項目光速了解更多:項目光速:更快、更簡單的流處理與Apache火花(m.eheci.com)

結論

與結構化流和水印在磚上,組織,像上麵描述的用例,可以構建彈性實時應用程序,確保指標由實時聚合被準確計算即使數據不是正確命令或準時。更多地了解如何構建實時應用程序數據磚,磚代表聯係。

免費試著磚
看到所有產品的帖子
Baidu
map