跳轉到主要內容
公司博客上

Apache火花的結構化流與亞馬遜運動磚

2017年8月9日 公司博客上

分享這篇文章

2017年7月11日,我們宣布的一般可用性Apache火花2.2.0作為的一部分磚3.0運行時(DBR)統一的分析平台Beplay体育安卓版本。增加DBR結構流的範圍,我們的支持AWS運動連接器來源(從)讀取流,給開發者的自由做三件事。

首先,你可以選擇Apache卡夫卡或亞馬遜的動作讀流數據。其次,你不是被束縛在運動分析進行分析,但可以使用火花SQL和結構化api。最後,您可以使用Apache的火花統一數據磚平台Beplay体育安卓版本和其他工作負載,寫你的端到端連續應用程序

在這個博客中,我們將討論四個方麵動作連接器的年代tructured流,這樣您就可以開始迅速在磚上,並以最小的更改,您可以切換到其他流來源你的選擇。

  1. 運動數據模式
  2. 配置參數
  3. 身份驗證與AWS運動
  4. 動作的解剖學結構流媒體應用程序

運動數據模式

知道運動記錄你從流讀取和理解這些記錄如何映射到一個定義的模式讓開發人員的生活變得更簡單。如果一個更好動作記錄映射到Apache火花的DataFrames與命名列及其相關類型。然後你可以選擇所需的有效載荷的運動記錄,通過訪問的列DataFrame和雇傭DataFrame api操作。

假設您發送JSON氣泡運動作為你的記錄。二進製數據的訪問負載,這是你的JSON編碼的數據,您可以使用DataFrame API方法鑄像JsonData(數據作為字符串)將二進製負載數據反序列化到一個JSON字符串。此外,一旦轉換為一個JSON字符串,然後您可以使用from_json ()SQL效用函數爆炸成各自DataFrame列。

因此,了解運動模式,以及它如何映射到DataFrame使事情容易做流媒體ETL,無論您的數據很簡單,比如單詞,或結構化的和複雜的,如嵌套JSON。

動作配置

一樣重要的理解及其模式是知道正確的動作記錄配置參數和選項供應在你運動連接器代碼。雖然選項很多,很少有進口的值得注意:

詳細選擇閱讀動作配置文檔

現在我們知道我們DataFrame來源於的格式動作記錄和理解選項我們可以供應讀流運動的連接器,我們可以編寫代碼,如下所示的解剖學運動流媒體的應用程序。但首先,我們必須通過AWS用於身份驗證的安全把關。

身份驗證與AWS運動

默認情況下,運動連接器度假村亞馬遜的默認憑據提供程序鏈,所以如果您已經創建了一個我的角色磚集群,包括訪問動作然後訪問將自動獲得。此外,根據您訪問我的作用,相同的默認憑證,即可獲得AWS S3 bucket的寫作。

或者,您可以顯式地提供憑證作為“選項”運動的一部分,連接器。當提供明確的密鑰,使用兩個“選擇”參數:awsAccessKeyawsSecretKey。然而,我們建議使用AWS我角色而不是提供密鑰在生產。

動作的解剖學結構流媒體應用程序

到目前為止,我們引進了三個概念,使我們編寫結構化流媒體應用程序使用運動連接器。結構化的流媒體應用程序有一個獨特的解剖學,連續步驟,不管你的流來源。讓我們學習每一步。

步驟1:定義數據的模式

雖然運動連接器可以閱讀任何編碼數據包括JSON, Avro,字節,隻要你會解碼接收火花代碼,這個博客我們假定我們動作流的設備數據編碼為一個JSON字符串,使用以下模式。

(code_tabs)

val jsonSchema=StructType ()添加(“battery_level LongType)添加(“c02_level LongType)添加(“cca3 StringType)添加(“cn”StringType)添加(“device_id LongType)添加(“device_type StringType)添加(“信號”,LongType)添加(“知識產權”,StringType)添加(“臨時”LongType)添加(“時間戳”,TimestampType)
pyspark.sql。類型的進口*pythonSchema=StructType () \添加(“battery_level”, LongType ()) \添加(“c02_level”, LongType ()) \添加(“cca3”, StringType ()) \添加(“cn”, StringType ()) \添加(“device_id”, LongType ()) \添加(“device_type”, StringType ()) \添加(“信號”,LongType ()) \添加(“知識產權”,StringType ()) \添加(“臨時”,LongType ()) \添加(“時間戳”,TimestampType ())

[/ code_tabs]

步驟2:閱讀源代碼

一個你已經定義了模式,下一步是閱讀你的流,使用運動連接器。隻有指定源代碼格式,即“運動”,磚將自動使用運動連接器進行閱讀。它將處理所有方麵的碎片讀取和記錄的所有元數據。你不需要擔心。

這裏要注意的是如果我的東西除了“運動”,我隻會改變這一指示“卡夫卡”或“套接字”,放AWS憑證。

(code_tabs)

/ /讀取數據流從動作使用連接器val kinesisDF = spark.readStream.format (“運動”).option (“streamName”,“設備”).option (“initialPosition”,“最早”).option (“地區”,“us-west-2”).option (“awsAccessKey”awsAccessKey).option (“awsSecretKey”awsSecretKey).load ()
kinesisDF =火花\.readStream \格式(“運動”)\.option (“streamName”,“設備”)\.option (“initialPosition”,“最早”)\.option (“地區”,“us-west-2”)\.option (“awsAccessKey”awsAccessKey) \.option (“awsSecretKey”awsSecretKey) \.load ()

[/ code_tabs]

步驟3:探索或轉換流

一旦我們有數據加載和運動記錄現在已經映射到DataFrames,我們可以使用SQL和DataFrames /數據API來處理。和底層流引擎將確保隻有一次語義和容錯。了解更多關於如何火花流達到這一至關重要的功能在結構化流,看來我們深潛水火花峰會會議

(code_tabs)

//提取數據的有效載荷使用轉換//你的分析val dataDevicesDF=kinesisDF.selectExpr(“鑄(數據作為字符串)jsonData”))選擇(from_json (“jsonData jsonSchema)。作為(“設備”))//爆炸其等效DataFrame的名字選擇(“設備。*”)//過濾器一些設備某些屬性過濾器(美元“devices.temp”>10美元“devices.signal”>15)
#從有效負載中提取數據,並使用轉換你的分析dataDevicesDF = kinesisDF \.selectExpr (“鑄(數據作為字符串)jsonData”)\.select (from_json (“jsonData”pythonSchema) .alias (“設備”))\.select (“設備。*”)\過濾器(”設備。臨時> 10和設備。信號> 15”)

[/ code_tabs]

完成這一步是大多數你的分析,你來自可行的見解。用火花的結構化api在這個步驟中,你得到的所有優點火花從鎢SQL性能和緊湊的代碼生成,不使用另一個SQL引擎或編程進行ETL或在一個單獨的SDK流分析

第四步:保存轉換流

最後,您可以編寫轉換流拚花S3 bucket文件在指定位置,分區的“約會”或“時間戳。“通知火花確保容錯,您可以指定一個選項參數“checkpointLocation,“和底層引擎將保持狀態。

(code_tabs)

val dataDevicesQuery=kinesisDF.selectExpr(“鑄(數據作為字符串)jsonData”))選擇(from_json (“jsonData jsonSchema)。作為(“設備”))//爆炸其等效DataFrame的名字選擇(“設備。*”)//過濾器一些設備某些屬性過濾器(美元“devices.temp”>10美元“devices.signal”>15).writeStream//鑲花的文件.partitionBy(“時間戳”).format(“鋪”)//指定檢查點的位置.option (“checkpointLocation”、“/ parquetCheckpoint”)//位置在哪裏拚花分區文件將被寫開始(“/ parquetDeviceTable”)
#從有效負載中提取數據,並使用轉換你的分析dataDevicesDF = kinesisDF \.selectExpr (“鑄(數據作為字符串)jsonData”)\.select (from_json (“jsonData”pythonSchema) .alias (“設備”))\.select (“設備。*”)\過濾器(”設備。臨時> 10和設備。信號> 15”)\#寫拚花文件.writeStream \.partitionBy (“時間戳”)\格式(“鋪”)\#指定檢查點的位置.option (“checkpointLocation”,“/ parquetCheckpoint”)\#位置存儲鋪分區文件.start (“/ parquetDeviceTable”)

[/ code_tabs]

這四個基本步驟封裝一個典型的解剖學結構流媒體的應用程序。源是否動作或卡夫卡或套接字或本地文件係統,您可以遵循這些指導方針和結構結構化流計算。

如果你想寫你的轉換流,例如,自己的水槽,如運動或NoSQL,現不支持火花的結構化流。你可以寫你自己的水槽功能的實現ForeachSink接口數據寫入動作

接下來是什麼

而不是這個博客充斥著一個完整的代碼示例顯示運動連接器流媒體應用程序,我將參考你檢查代碼和探索分布式計算的典型的“Hello World”WordCount在運動。更好的是,您可以導入WordCount筆記本和供應你的AWS憑證和相處。

沒有必要為你安裝或附加任何動作庫。不需要訪問外部動作SDK。您可以簡單地編寫您的結構化流磚3.0運行時。我們將做其餘的。

如果你沒有一個帳戶在磚上,得到一個今天

閱讀更多

我們有一係列的結構化流博客闡述它的許多特性,
你可以谘詢我們運動連接器文檔和
結構化流編程指南對於一些身臨其境的閱讀。

免費試著磚
看到所有公司博客上的帖子
Baidu
map