亞馬遜運動

結構化流的運動連接器是包含在磚運行時。

驗證與亞馬遜運動

與運動進行身份驗證,我們使用亞馬遜的默認憑據提供程序鏈默認情況下。我們建議啟動你的磚集群實例配置文件,可以訪問動作。如果你想使用鍵訪問,您可以使用選項提供awsAccessKeyawsSecretKey

你也可以假設一個我的角色使用roleArn選擇。您可以選擇指定外部IDroleExternalId和一個會話名稱roleSessionName。為了承擔的角色,您可以啟動集群與權限承擔角色或通過提供訪問密鑰awsAccessKeyawsSecretKey。cross-account認證,我們推薦使用roleArn假定的角色,然後可以通過你的磚AWS帳戶。cross-account認證的更多信息,請參閱委托銀行登錄在AWS帳戶使用我的角色

請注意

運動源需要ListShards,GetRecords,GetShardIterator權限。如果你遇到亞馬遜:訪問否認異常,檢查你的用戶或概要文件有這些權限。看到控製訪問亞馬遜運動數據流資源使用我為更多的細節。

模式

記錄的模式是:

類型

partitionKey

字符串

數據

二進製

字符串

shardId

字符串

sequenceNumber

字符串

approximateArrivalTimestamp

時間戳

使用DataFrame操作(鑄造(“字符串”)udf)顯式地反序列化數據列。

快速入門

讓我們先從一個簡單的例子:WordCount。以下筆記本演示了如何使用結構化運行WordCount流運動。

與結構化流運動WordCount筆記本

在新標簽頁打開筆記本

配置

警告

由於利率限製由運動和限製運動的API,一旦觸發執行(Trigger.Once ()與運動不支持)。

這裏有最重要的配置指定讀取的數據。

選項

價值

默認的

描述

streamName

一個以逗號分隔的名稱。

(沒有一個所需的參數)

訂閱流的名字。

地區

指定流區域。

在本地解決地區

流中定義的區域。

端點

地區的運動數據流。

在本地解決地區

運動區域端點數據流。

initialPosition

最新、trim_horizon最早trim_horizon(化名),at_timestamp

最新的

從哪裏開始閱讀的流。

生產注意事項,審查關鍵技術因素和最佳實踐

在時間點上開始閱讀

請注意

這個特性可以在磚運行時7.3 LTS及以上。

開始閱讀的時候,你可以使用一個at_timestampinitialPosition選擇。您指定的值作為一個JSON字符串,如{“at_timestamp”:“06/25/202010:23:45PDT "}。流查詢將讀取所有更改或在給定的時間戳(包容)。它使用Java的默認格式解析時間戳。您可以顯式地指定格式通過提供JSON字符串中的一個額外字段,例如:

(火花readStream格式(“運動”)選項(“streamName”,kinesisStreamName)選項(“地區”,kinesisRegion)選項(“initialPosition”,”{at_timestamp”:“06/25/2020 10:23:45 PDT”、“格式”:“MM / dd / yyyy HH: MM: ss ZZZ”}”)選項(“awsAccessKey”,awsAccessKeyId)選項(“awsSecretKey”,awsSecretKey)負載())

此外,還有配置從運動控製閱讀的吞吐量和延遲。運動源在一個後台線程運行引發工作定期預抓取動作的內存和緩存的數據引發執行人。緩存數據流查詢處理每個預取步驟完成後,使數據進行處理。因此,這個預取步驟決定了許多觀察到的端到端延時和吞吐量。你可以使用以下選項控製性能。

選項

價值

默認的

描述

maxRecordsPerFetch

一個正整數。

10000年

有多少記錄讀取/ API請求動作。返回的記錄數可能是高取決於sub-records聚合到單個記錄使用運動生產商庫。

maxFetchRate

積極的十進製表示數據率MB / s。

1.0 (max = 2.0)

每切分速度預取數據。這是限製獲取速度,避免運動調節。2.0 MB / s是動作允許的最大速率。

minFetchPeriod

例如,一個時間字符串11秒。

400 ms (min = 200毫秒)

多長時間等之間的連續預取的嚐試。這是限製獲取的頻率,避免運動節流。200 ms是最低動作最多允許5獲取/秒。

maxFetchDuration

例如,一個時間字符串1米1分鍾。

十年代

多久之前緩衝預取的新的數據使其可用於處理。

fetchBufferSize

一個字節的字符串,例如,2 gb10 mb

20 gb

多少數據緩衝區為下一個觸發器。這是作為一個停止條件,而不是一個嚴格的上限,因此更多的數據可能比指定的緩衝這個值。

shardsPerTask

一個正整數。

5

有多少運動碎片從每火花任務並行預取。在理想的情況下#集群> =#運動碎片/shardsPerTask最小查詢延遲和最大資源使用情況。

shardFetchInterval

例如,一個時間字符串2米為2分鍾。

1

為重新切分調查運動的頻率。

awsAccessKey

字符串

沒有違約。

AWS訪問密鑰。

awsSecretKey

字符串

沒有違約。

AWS秘密訪問密鑰對應的訪問密鑰。

roleArn

字符串

沒有違約。

亞馬遜資源名(攻擊)的角色承擔當訪問動作。

roleExternalId

字符串

沒有違約。

可以使用一個可選值,當授權訪問AWS帳戶。看到如何使用外部ID嗎

roleSessionName

字符串

沒有違約。

一個標識符假設角色的會話,惟一地標識一個會話時相同的角色是由不同的主體或不同的原因。

coalesceThresholdBlockSize

一個正整數。

10000000年

的閾值自動合並發生。如果平均塊大小小於此值,預取塊合並向coalesceBinSize

coalesceBinSize

一個正整數。

128000000年

合並後的近似的塊大小。

請注意

默認值已被選定的選項,這樣兩個讀者(火花或其他)可以同時使用一個運動流沒有觸及運動速度限製。如果你有更多的消費者,你必須相應地調整選項。例如,您可能會減少maxFetchRate,增加minFetchPeriod

這裏有一些建議配置特定的用例。

ETL從運動到S3

當你執行ETL長期存儲,你寧願有少量的大文件。在這種情況下,您可能希望設置一個大型流觸發間隔,例如,5 - 10分鍾。此外,您可能想要增加你的maxFetchDuration這樣你緩衝大塊將要寫入處理期間,和增加fetchBufferSize所以你不要停止抓取過早在觸發器之間,並開始落後在你流。

低延遲監控和報警

當你有一個報警用例,你會想要更低的延遲。實現:

  • 確保隻有一個消費者(也就是說,隻有你流查詢和沒有人)的運動流,這樣我們就可以優化你的隻有流查詢獲取盡可能快沒有跑到運動速度限製。

  • 設置選項maxFetchDuration小值(說,200毫秒)開始盡可能快地處理獲取數據。

  • 設置選項minFetchPeriod210毫秒獲取盡可能經常。

  • 設置選項shardsPerTask或配置集群等#集群> =2*(#運動碎片)/shardsPerTask。這將確保背景預取和流媒體查詢任務可以並行執行任務。

如果你看到,你查詢接收數據每5秒,那麼很可能你擊球動作速度限製。回顧你的配置。

警告

如果你刪除並重新創建一個動作流,你不能重用任何現有的檢查點目錄重新啟動流查詢。你必須刪除目錄和檢查站這些查詢從頭開始。

指標

請注意

在磚運行時8.1及以上。

運動報道的毫秒數消費者已經落後於流的開始為每個工作區。你可以得到平均、最小和最大的毫秒數在所有的工作區流查詢過程中(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html reading-metrics-interactively)作為avgMsBehindLatest,maxMsBehindLatest,minMsBehindLatest指標。如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據流查詢進展儀表板選項卡:

{“源”:({“描述”:“KinesisV2(流)”,“指標”:{“avgMsBehindLatest”:“32000.0”,“maxMsBehindLatest”:“32000”,“minMsBehindLatest”:“32000”},}]}

寫信給運動

下麵的代碼片段可以用作ForeachSink寫數據來運動。它需要一個數據集[(字符串,數組(字節)))

請注意

下麵的代碼片段至少一次語義,不是一次。

運動Foreach水槽筆記本

在新標簽頁打開筆記本