最佳實踐:結構化流運動

本文描述了使用運動時的最佳實踐與三角洲湖流源和Apache火花結構化流。

亞馬遜運動數據流(KDS)是一個大規模可伸縮的和持久的實時數據流媒體服務。KDS不斷捕捉g的數據每秒從成千上萬的來源網站點擊流等數據庫事件流、金融交易、社交媒體提要,日誌,定位跟蹤事件。

KDS AWS流數據服務是一個流行的選擇,由於其易用性和serverless設置。運動數據流由個人吞吐量單位,稱為碎片,是基於shard-hours以及把負載單元。每個切分攝取能力估計有1000條記錄/秒,或1 mb /秒,輸出2 mb /秒的速度。

在KDS收集數據之後,您可以使用Apache火花的深度集成結構化與三角洲湖流媒體等應用程序日誌分析,點擊流分析和實時指標。你可以連續處理數據並將其存儲到三角洲表中。下圖描述了這些用例:一個典型的體係結構

運動三角洲架構圖

磚結構運動的流源

磚運行時包含一個開箱即用的運動的來源。這是一個專有的連接器在開源KDS,不可用。這個連接器不是基於動作的客戶端庫(氯化鉀)。運動源架構圖所示:

運動源架構圖

關鍵技術因素和最佳實踐

本節包括最佳實踐和故障排除信息使用與三角洲湖運動。

優化預取

運動源引發工作在一個後台線程運行預取定期運動數據和緩存的內存的火花執行人。緩存數據流查詢處理每個預取步驟完成之後,使數據進行處理。預取步驟顯著影響觀察到的端到端延時和吞吐量。您可以使用的選項控製性能在這一節中描述。

的默認設置shardsPerTask配置參數是5。在規模,然而,這可能需要大量的CPU核,所以設置10個可能是一個不錯的起點。然後根據流媒體工作負載的複雜性和數據量,你可以調整這個值基於集群的神經節指標(CPU、內存、網絡等等)。例如,中央處理器受限集群可能需要一個更小的值與更多的內核進行補償。

優化最小查詢延遲和最大的資源使用情況,使用以下計算:

數量CPU集群(在所有執行人)> =數量運動碎片/shardsPerTask

參數用於確定讀取的數據量/預取從運動中所描述的這個表。

選項

價值

默認的

描述

maxRecordsPerFetch

整數

10000年

獲取每個數量的記錄getRecordsAPI調用。

shardFetchInterval

持續時間字符串(2 m = 2分鍾)

1

要等多久才能更新的列表碎片(這是係統如何知道流已調整)。

minFetchPeriod

持續時間字符串

400毫秒

多長時間等之間的連續獲取的嚐試。這個設置可以幫助避免運動節流。200 ms是最低的,因為運動服務限製是5獲取/秒。

maxFetchRate

小數

1.0

最大的預取數據切分速度MB /秒。這個利率限製存取和避免運動節流。運動所允許的最大速度為2.0 MB /秒。

maxFetchDuration

持續時間字符串

十年代

多久之前緩衝預取的新的數據使其可用於處理

fetchBufferSize

字節字符串

20 gb

多少數據緩衝區為下一個觸發器。這是一個軟限製由於其作為一個停止條件,所以更多的數據可能是緩衝。

shardsPerTask

整數

5

有多少碎片每任務並行預取。

重要的

minFetchPeriod可以創建多個GetRecords API調用動作碎片,直到它擊中ReadProvisionedThroughputExceeded。如果發生異常,表明一個問題作為運動的連接器最大化利用碎片。

避免太多的速率限製錯誤造成的減速

連接器減少讀取的數據量運動一半每次遇到速度限製錯誤和記錄這一事件在日誌信息:“打極限。睡覺5秒。”

通常認為這些錯誤流被抓住了,但之後,你再也不應該看到這些錯誤。如果你這樣做,你可能需要調整從運動(通過增加容量)或調整預取選項

太多數據導致寫入磁盤

如果你有一個突然上升運動流,分配緩衝區容量可能填滿緩衝區不夠快速清空添加新數據。

在這種情況下,引發泄漏從緩衝塊磁盤和減緩處理,影響流性能。這個事件日誌中出現這樣的信息:

。/ log4j.txt: 879546:20/03/0217:15:04信息BlockManagerInfo:更新kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c磁盤上10.0.208.13:43458(電流大小:88年。4 MB,原始大小:00 B)

為了解決這個問題,試著增加集群內存容量(添加更多的節點或增加內存每個節點),或調整配置參數fetchBufferSize

使S3 VPC端點

確保所有S3交通AWS網絡上的路由,磚建議您啟用S3 VPC的端點。

在S3編寫任務

掛任務會導致流批處理持續時間長,從而導致難以跟上輸入流。在這種情況下,磚建議啟用引發猜測。確保任務不太積極,終止優化分位數和乘數仔細設置。一個好的起點是設置spark.speculation.multiplier3spark.speculation.quantile0.95

延遲問題而管理狀態RocksDB由於緩慢S3寫道

一個常見的場景在維護狀態操作在你流查詢大型垃圾收集停頓時間,進而引入延遲,導致批量執行時間延長。這通常發生在保持數百萬鍵的狀態。在這些情況下,而不是在JVM內存維護狀態,考慮使用RocksDB作為狀態存儲在本機內存或磁盤。狀態更改自動傳播到結構化流檢查站。然而,你可以觀察延遲當RocksDB寫道這些檢查點S3由於潛在的S3節流。盡量減少spark.sql.shuffle.partitions(默認200)文件寫的數量降到最低。你也可以嚐試優化多部分上傳閾值(spark.hadoop.fs.s3a.multipart.size默認1048576000字節),減少並發S3寫道。

監視流媒體應用程序

監控流媒體應用,磚建議使用引發的流查詢監聽器實現。

可觀測的指標是名為任意聚合函數,可以定義在一個查詢(DataFrame)。一旦執行DataFrame達到完成點(即完成批量查詢或達到流媒體時代)命名事件包含的指標數據處理自上次完成點。

你可以觀察這些指標通過附加一個偵聽器火花會話。偵聽器依賴於執行模式:

  • 批處理模式:使用QueryExecutionListener

    QueryExecutionListener查詢完成時被調用。訪問指標使用QueryExecution.observedMetrics地圖。

  • 流,或者micro-batch:使用StreamingQueryListener

    StreamingQueryListener被稱為流查詢完成後一個時代。訪問指標使用StreamingQueryProgress.observedMetrics地圖。磚不支持連續執行流。

例如:

/ /觀察行數(rc)和錯誤行數(erc)的流數據集瓦爾observed_ds=ds觀察(“my_event”,(點燃(1))。作為(“鋼筋混凝土”),(美元“錯誤”)。作為(“倫理委員會”))observed_dswriteStream格式(“…”)。開始()/ /使用偵聽器監控指標火花addListener(StreamingQueryListener(){覆蓋defonQueryProgress(事件:QueryProgressEvent):單位={事件進步observedMetrics得到(“my_event”)。foreach{= >/ /觸發如果錯誤的數量超過5%瓦爾num_rows=木屐()(“鋼筋混凝土”)瓦爾num_error_rows=木屐()(“倫理委員會”)瓦爾=num_error_rowstoDouble/num_rows如果(>0.05){/ /觸發警報}}}})

你也可以通過UI監控指標。如果您使用的是磚運行時7.0或以上,使用流選項卡中火花UI

刪除和重新創建一個流

如果你刪除,然後重新創建一個流,您必須使用一個新的檢查點位置和目錄。

重新切分

結構化流支持重新切分。在這種情況下,增加碎片的數量就足夠了。你不需要換流或創建臨時交通流轉移。

了解更多

亞馬遜運動