亞馬遜運動
結構化流的運動連接器是包含在磚運行時。
驗證與亞馬遜運動
與運動進行身份驗證,我們使用亞馬遜的默認憑據提供程序鏈默認情況下。我們建議啟動你的磚集群實例配置文件,可以訪問動作。如果你想使用鍵訪問,您可以使用選項提供awsAccessKey
和awsSecretKey
。
你也可以假設一個我的角色使用roleArn
選擇。您可以選擇指定外部IDroleExternalId
和一個會話名稱roleSessionName
。為了承擔的角色,您可以啟動集群與權限承擔角色或通過提供訪問密鑰awsAccessKey
和awsSecretKey
。cross-account認證,我們推薦使用roleArn
假定的角色,然後可以通過你的磚AWS帳戶。cross-account認證的更多信息,請參閱委托銀行登錄在AWS帳戶使用我的角色。
請注意
運動源需要ListShards
,GetRecords
,GetShardIterator
權限。如果你遇到亞馬遜:訪問否認
異常,檢查你的用戶或概要文件有這些權限。看到控製訪問亞馬遜運動數據流資源使用我為更多的細節。
模式
記錄的模式是:
列 |
類型 |
---|---|
partitionKey |
字符串 |
數據 |
二進製 |
流 |
字符串 |
shardId |
字符串 |
sequenceNumber |
字符串 |
approximateArrivalTimestamp |
時間戳 |
使用DataFrame操作(鑄造(“字符串”)
udf)顯式地反序列化數據
列。
配置
重要的
在磚運行時13.1及以上的,你可以使用Trigger.AvailableNow
與運動。看到攝取動作記錄作為增量的批處理。
這裏有最重要的配置指定讀取的數據。
選項 |
價值 |
默認的 |
描述 |
---|---|---|---|
streamName |
一個以逗號分隔的名稱。 |
(沒有一個所需的參數) |
訂閱流的名字。 |
地區 |
指定流區域。 |
在本地解決地區 |
流中定義的區域。 |
端點 |
地區的運動數據流。 |
在本地解決地區 |
運動區域端點數據流。 |
initialPosition |
最新、trim_horizon最早trim_horizon(化名), |
最新的 |
從哪裏開始閱讀的流。 |
生產注意事項,審查關鍵技術因素和最佳實踐。
在時間點上開始閱讀
請注意
這個特性可以在磚運行時7.3 LTS及以上。
開始閱讀的時候,你可以使用一個at_timestamp
值initialPosition
選擇。您指定的值作為一個JSON字符串,如{“at_timestamp”:“06/25/202010:23:45PDT "}
。流查詢將讀取所有更改或在給定的時間戳(包容)。它使用Java的默認格式解析時間戳。您可以顯式地指定格式通過提供JSON字符串中的一個額外字段,例如:
(火花。readStream。格式(“運動”)。選項(“streamName”,kinesisStreamName)。選項(“地區”,kinesisRegion)。選項(“initialPosition”,”{at_timestamp”:“06/25/2020 10:23:45 PDT”、“格式”:“MM / dd / yyyy HH: MM: ss ZZZ”}”)。選項(“awsAccessKey”,awsAccessKeyId)。選項(“awsSecretKey”,awsSecretKey)。負載())
此外,還有配置從運動控製閱讀的吞吐量和延遲。運動源在一個後台線程運行引發工作定期預抓取動作的內存和緩存的數據引發執行人。緩存數據流查詢處理每個預取步驟完成後,使數據進行處理。因此,這個預取步驟決定了許多觀察到的端到端延時和吞吐量。你可以使用以下選項控製性能。
選項 |
價值 |
默認的 |
描述 |
---|---|---|---|
maxRecordsPerFetch |
一個正整數。 |
10000年 |
有多少記錄讀取/ API請求動作。返回的記錄數可能是高取決於sub-records聚合到單個記錄使用運動生產商庫。 |
maxFetchRate |
積極的十進製表示數據率MB / s。 |
1.0 (max = 2.0) |
每切分速度預取數據。這是限製獲取速度,避免運動調節。2.0 MB / s是動作允許的最大速率。 |
minFetchPeriod |
例如,一個時間字符串 |
400 ms (min = 200毫秒) |
多長時間等之間的連續預取的嚐試。這是限製獲取的頻率,避免運動節流。200 ms是最低動作最多允許5獲取/秒。 |
maxFetchDuration |
例如,一個時間字符串 |
十年代 |
多久之前緩衝預取的新的數據使其可用於處理。 |
fetchBufferSize |
一個字節的字符串,例如, |
20 gb |
多少數據緩衝區為下一個觸發器。這是作為一個停止條件,而不是一個嚴格的上限,因此更多的數據可能比指定的緩衝這個值。 |
shardsPerTask |
一個正整數。 |
5 |
有多少運動碎片從每火花任務並行預取。在理想的情況下 |
shardFetchInterval |
例如,一個時間字符串 |
1 |
為重新切分調查運動的頻率。 |
awsAccessKey |
字符串 |
沒有違約。 |
AWS訪問密鑰。 |
awsSecretKey |
字符串 |
沒有違約。 |
AWS秘密訪問密鑰對應的訪問密鑰。 |
roleArn |
字符串 |
沒有違約。 |
亞馬遜資源名(攻擊)的角色承擔當訪問動作。 |
roleExternalId |
字符串 |
沒有違約。 |
可以使用一個可選值,當授權訪問AWS帳戶。看到如何使用外部ID嗎。 |
roleSessionName |
字符串 |
沒有違約。 |
一個標識符假設角色的會話,惟一地標識一個會話時相同的角色是由不同的主體或不同的原因。 |
coalesceThresholdBlockSize |
一個正整數。 |
10000000年 |
的閾值自動合並發生。如果平均塊大小小於此值,預取塊合並向 |
coalesceBinSize |
一個正整數。 |
128000000年 |
合並後的近似的塊大小。 |
請注意
默認值已被選定的選項,這樣兩個讀者(火花或其他)可以同時使用一個運動流沒有觸及運動速度限製。如果你有更多的消費者,你必須相應地調整選項。例如,您可能會減少maxFetchRate
,增加minFetchPeriod
。
這裏有一些建議配置特定的用例。
ETL從運動到S3
當你執行ETL長期存儲,你寧願有少量的大文件。在這種情況下,您可能希望設置一個大型流觸發間隔,例如,5 - 10分鍾。此外,您可能想要增加你的maxFetchDuration
這樣你緩衝大塊將要寫入處理期間,和增加fetchBufferSize
所以你不要停止抓取過早在觸發器之間,並開始落後在你流。
低延遲監控和報警
當你有一個報警用例,你會想要更低的延遲。實現:
確保隻有一個消費者(也就是說,隻有你流查詢和沒有人)的運動流,這樣我們就可以優化你的隻有流查詢獲取盡可能快沒有跑到運動速度限製。
設置選項
maxFetchDuration
小值(說,200毫秒
)開始盡可能快地處理獲取數據。設置選項
minFetchPeriod
來210毫秒
獲取盡可能經常。設置選項
shardsPerTask
或配置集群等#核在集群> =2*(#運動碎片)/shardsPerTask
。這將確保背景預取和流媒體查詢任務可以並行執行任務。
如果你看到,你查詢接收數據每5秒,那麼很可能你擊球動作速度限製。回顧你的配置。
警告
如果你刪除並重新創建一個動作流,你不能重用任何現有的檢查點目錄重新啟動流查詢。你必須刪除目錄和檢查站這些查詢從頭開始。
指標
請注意
在磚運行時8.1及以上。
運動報道的毫秒數消費者已經落後於流的開始為每個工作區。你可以得到平均、最小和最大的毫秒數在所有的工作區流查詢過程中(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html reading-metrics-interactively)作為avgMsBehindLatest
,maxMsBehindLatest
,minMsBehindLatest
指標。如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據流查詢進展儀表板選項卡:
{“源”:({“描述”:“KinesisV2(流)”,“指標”:{“avgMsBehindLatest”:“32000.0”,“maxMsBehindLatest”:“32000”,“minMsBehindLatest”:“32000”},}]}
攝取動作記錄作為增量的批處理
在磚運行時的13.1及以上,磚支持使用Trigger.AvailableNow
與運動數據源增量批語義。下麵描述了基本配置:
現在micro-batch讀觸發時可用模式,當前時間記錄的磚客戶機。
磚民調的源係統之間的所有記錄時間戳記錄時間和前一個檢查點。
磚加載這些記錄使用
Trigger.AvailableNow
語義。
請注意
磚是最好的嚐試使用所有記錄在消息隊列資源存在閱讀時閱讀。因為小的潛在差異在時間戳和缺乏保證數據源的排序,一些記錄可能不包括在批觸發。省略了記錄下觸發micro-batch加工的一部分。
看到配置增量的批處理。