處理數據在Apache卡夫卡與結構化流在Apache 2.2火花
2017年4月26日 在工程的博客
這是第三個帖子由多部分組成的係列如何你可以執行複雜的流分析使用Apache火花。
在這個博客中,我們將展示如何利用火花SQL api的使用和轉換複雜的數據流Apache卡夫卡。使用這些簡單的api,可以表達複雜的轉換僅一次事件時間聚合和各種係統的輸出結果。在一起,你可以使用Apache火花和Apache卡夫卡:
- 轉換和增加實時數據讀取Apache卡夫卡和處理批處理數據一樣使用相同的api。
- 集成數據讀卡夫卡的信息存儲在其他係統中包括S3, HDFS或MySQL。
- 自動從增量執行提供的好處催化劑優化器由鎢和隨後的高效代碼生成。
我們首先回顧卡夫卡的術語,然後現在的例子結構化流讀取數據的查詢和寫數據到Apache卡夫卡。最後,我們將探討一個端到端的實際用例。
Apache卡夫卡
卡夫卡是一個流行的分布式發布-訂閱消息傳遞係統攝取實時數據流並將其轉化為下遊消費者以並行和容錯的方式。這使得卡夫卡適合構建實時流數據管道可靠之間移動數據異構處理係統。在我們深入結構化流的卡夫卡支持的細節,讓我們回顧一些基本概念和術語。
卡夫卡被組織成的數據主題是分成分區並行性。每個分區是一個有序,不可變的序列記錄,可以被認為是一個結構化的提交日誌。生產商的尾部追加記錄這些日誌消費者讀日誌按照自己的節奏。多個用戶可以訂閱主題和接收傳入的記錄,因為他們的到來。新記錄到一個分區在卡夫卡的話題,他們被分配一個順序id號叫做抵消。卡夫卡集群保留所有records-whether發表或不被消耗在一個可配置的保存期,之後他們標記為刪除。
指定閱讀從卡夫卡的數據
卡夫卡的話題可以看作是無限流數據保留一個可配置的時間。自然的無限流意味著,當開始一個新的查詢,我們首先必須決定哪些數據及時閱讀,我們要開始了。在高級別上,有三個選擇:
- 最早的- - - - - -開始閱讀之初流。這不包括數據已被刪除從卡夫卡,因為它是年齡超過保存期(“歲”數據)。
- 最新的——從現在開始,隻處理新數據到來後,查詢已經開始。
- 每個分區分配——指定精確抵消開始從每一個分區,允許細粒度的控製處理應該開始。例如,如果我們想要接哪裏其他係統或查詢,那麼可以利用這個選項。
正如您將在下麵看到的,startingOffsets選擇接受上麵的三個選項之一,開始時隻使用一個查詢從一個新鮮的檢查點。如果你重啟一個查詢從現有的檢查站,那麼它將永遠恢複完全的地方重新開始,除非數據抵消年齡了。如果任何未處理的數據是年齡,查詢行為取決於設定的failOnDataLoss選項,該選項的描述卡夫卡集成指南。
現有用戶的KafkaConsumer會注意到結構化流提供了更細粒度的版本的配置選項,auto.offset.reset
。而不是一種選擇,我們把這些問題分成兩個不同的參數,一個說要做什麼當流第一次開始(startingOffsets),另一個處理做什麼如果查詢是無法從那裏離開,因為所需的數據已經歲了(failOnDataLoss)。
Apache卡夫卡支持結構化流
結構化流提供了一個統一的批處理和流API,使我們能夠發布到卡夫卡作為視圖數據DataFrame。當處理無限數據流的方式,我們使用相同的API和得到相同的數據一致性保證在批處理。係統確保端到端隻有一次容錯擔保,這樣用戶不需要理由的低級方麵流。
讓我們研究和探索讀取或寫入卡夫卡的例子,緊隨其後的是一個端到端的應用程序。
閱讀記錄從卡夫卡的話題
第一步是指定的位置我們卡夫卡集群和我們感興趣的話題。火花可以讀一個話題,一組特定的話題,一個正則表達式模式的話題,甚至一組特定的分區屬於一組主題。我們隻看一個例子從單個主題閱讀,另一個可能性是覆蓋著卡夫卡集成指南。
#構造一個流< a href = " //m.eheci.com/glossary/what-are-dataframes " > DataFrame < / >,從人類讀取df =火花\.readStream \。格式(“卡夫卡”)\.option (“kafka.bootstrap.servers”,“host1:端口1,host2:端口2”)\.option (“訂閱”,“人類”)\.option (“startingOffsets”,“最早”)\.load ()
上麵的DataFrame是流DataFrame訂閱“人類”。提供選項的配置設置DataStreamReader和所需的最小參數的位置kafka.bootstrap.servers(即。主持人:港口
),我們想要的主題訂閱出現。在這裏,我們也指定startingOffsets“最早”,閱讀主題中的所有數據的查詢。如果startingOffsets沒有指定選項,默認值的“最新”,隻有數據到達後開始查詢將被處理。
df.printSchema ()
揭示了我們DataFrame的模式。
根| - - -關鍵:二進製(可空=真正的)| - - -價值:二進製(可空=真正的)| - - -主題:字符串(可空=真正的)|——分區:整數(可空=真正的)|——抵消:長(可空=真正的)|——時間戳:時間戳(可空=真正的)|——timestampType:整數(可空=真正的)
返回DataFrame卡夫卡的包含所有熟悉的字段記錄和相關的元數據。我們現在可以使用所有的熟悉DataFrame或數據集操作轉換結果。通常,然而,我們首先解析二進製值出現在鍵和值列。如何解釋這些斑點是特定於應用程序的。幸運的是,火花SQL包含許多內置轉換為常見類型的序列化如下我們將展示。
數據存儲為UTF8字符串
如果卡夫卡記錄代表UTF8字符串的字節數,我們可以簡單地使用一個演員將二進製數據轉換為正確的類型。
df.selectExpr (“鑄(關鍵字符串)”,“鑄(值作為字符串)”)
數據存儲為JSON
JSON是另一種常見格式的數據寫入卡夫卡。在這種情況下,我們可以使用內置的from_json
函數以及期望的模式將一個二進製值轉換為SQL結構火花。
#價值模式:{“a”:1,“b”:“字符串”}模式=StructType ()。添加(“a”, IntegerType ())。添加(“b”, StringType ())df。選擇(\坳(“關鍵”)。投(“字符串”),from_json(坳(“價值”)。投(模式),“字符串”))
用戶定義的序列化器和反序列化器
在某些情況下,您可能已經實現了的代碼卡夫卡反序列化器接口。你可以利用這段代碼的包裝是一個用戶定義函數(UDF)使用Scala代碼如下所示。
對象MyDeserializerWrapper {新MyDeserializer val des =}spark.udf.register (“序列化”(主題:字符串,字節:數組(字節))= >MyDeserializerWrapper.deser.deserialize(主題,字節))df.selectExpr (”“反序列化(“人類”,值)作為消息“”“)
注意,上麵DataFrame代碼類似於指定value.deserializer
當消費者使用標準的卡夫卡。
用火花卡夫卡生產國
寫數據從任何火花支持數據源到卡夫卡調用一樣簡單writeStream
在任何DataFrame包含一列命名為“價值”,並選擇一個列命名為“關鍵”。如果沒有指定鍵列,那麼一個null值鍵列將自動添加。一個空的鍵列,在某些情況下,在卡夫卡導致不均勻的數據分區,應小心使用。
目的地的主題DataFrame可以是靜態指定的記錄作為一個選項DataStreamWriter每記錄的基礎上或作為列DataFrame命名的“主題”。
#寫鍵值數據從DataFrame卡夫卡主題中指定一個選項查詢= df \.selectExpr (“鑄(用戶id字符串)鍵”,“to_json (struct(*))價值”)\.writeStream \。格式(“卡夫卡”)\.option (“kafka.bootstrap.servers”,“host1:端口1,host2:端口2”)\.option (“主題”,“人類”)\.option (“checkpointLocation”,“/ HDFS dir /道路/”)\.start ()
上麵的查詢需要一個包含用戶信息,並將它寫到DataFrame卡夫卡。用戶標識序列化為一個字符串,並用作鍵。我們的所有列DataFrame和序列化為JSON字符串,將導致記錄的值。
卡夫卡的寫作兩個必需的選項kafka.bootstrap.servers和checkpointLocation。如在上麵的例子中,使用一個額外的主題選項可用於設置一個主題寫,這個選項將覆蓋“話題”專欄DataFrame如果它存在。
與巢設備的端到端示例
在本節中,我們將探討一個端到端的管道包括卡夫卡以及其他數據源和下沉。我們將使用一個數據集的集合巢設備日誌,JSON格式這裏描述。我們將專門研究數據從巢的相機,這看起來像下麵的JSON:
“設備”:{“相機”:{“device_id”:“awJo6rH……”,“last_event”:{“has_sound”:真正的,“has_motion”:真正的,“has_person”:真正的,“start_time”:“2016 - 12 - 29 t00:00:00.000z”,“end_time”:“2016 - 12 - 29 t18:42:00.000z”}}}
我們也會加入一個靜態數據集(稱為“device_locations”),其中包含的一個映射device_id
到zip_code
設備注冊的地方。
在高層,所需的工作流程如上圖。給定一個流的更新從巢相機,我們想使用火花來執行不同的任務:
- 創建一個高效、可查詢所有事件的曆史檔案采用柱狀的形式像拚花。
- 執行低延遲事件時間聚合並把結果返回給卡夫卡對其他消費者。
- 執行批處理報告的數據存儲在一個壓縮在卡夫卡的話題。
雖然這些可能聽起來像是完全不同的用例,您可以執行所有這些使用DataFrames和結構化流在一個端到端火花應用程序!在下麵幾節中,我們將逐步通過單獨的步驟,從攝取加工存儲聚合的結果。
從卡夫卡讀取巢設備日誌
我們的第一步是閱讀原始巢數據流從卡夫卡和項目相機我們感興趣的數據。我們首先從卡夫卡記錄解析JSON築巢,通過調用from_json
功能和提供預期的JSON模式和時間戳格式。然後,我們應用各種變換的數據和項目相關的列相機為了簡化數據處理中的數據部分。
預期的JSON數據模式
模式=StructType () \。添加(“元數據”,StructType () \。添加(“access_token”, StringType ()) \。添加(“client_version”, IntegerType ())) \。添加(“設備”,StructType () \。添加(“恒溫器”,MapType (StringType (), StructType ()。添加(…)))\。添加(“smoke_co_alarms MapType (StringType (), StructType ()。添加(…)))\。添加(“相機”,MapType (StringType (), StructType ()。添加(…)))\。添加(“公司名稱”,StructType ()。添加(…)))\。添加(“結構”,MapType (StringType (), StructType ()。添加(…)))nestTimestampFormat=“yyyy-MM-dd 'HH: mm: ss.sss 'Z’”
解析原始JSON
jsonOptions={“timestampFormat”: nestTimestampFormat}解析=火花\.readStream \.format \(“卡夫卡”).option (“kafka.bootstrap。服務器”、“localhost: 9092”) \.option(“訂閱”、“nest-logs”) \.load () \。選擇(from_json(坳(“價值”)。投(“字符串”)、模式jsonOptions) .alias (“parsed_value”))
項目相關的列
相機=解析\。選擇(爆炸(“parsed_value.devices.cameras”)) \。選擇(“價值。*”)目擊=相機\。選擇(“device_id”、“last_event。has_person”、“last_event.start_time”) \。在哪裏(坳(“has_person”)==真正的)
創建相機
DataFrame,我們首先unnest運算“相機”json領域頂級。由於“相機”MapType,每個結果行包含一個鍵-值對的地圖。所以,我們使用爆炸
函數來創建一個新行對於每一個鍵-值對,壓扁的數據。最後,我們使用明星()unnest運算“值”列。下麵是調用的結果camera.printSchema ()
根|——device_id:字符串(可空=真正的)|——software_version:字符串(可空=真正的)|——structure_id:字符串(可空=真正的)|——where_id:字符串(可空=真正的)|——where_name:字符串(可空=真正的)| - - -名稱:字符串(可空=真正的)|——name_long:字符串(可空=真正的)|——is_online:布爾(可空=真正的)|——is_streaming:布爾(可空=真正的)|——is_audio_input_enable:布爾(可空=真正的)|——last_is_online_change:時間戳(可空=真正的)|——is_video_history_enabled:布爾(可空=真正的)|——web_url:字符串(可空=真正的)|——app_url:字符串(可空=真正的)|——is_public_share_enabled:布爾(可空=真正的)|——activity_zones:數組(可空=真正的)| |——元素:結構(containsNull =真正的)| | | -名稱:字符串(可空=真正的)| | | - id:字符串(可空=真正的)|——public_share_url:字符串(可空=真正的)|——snapshot_url:字符串(可空=真正的)|——last_event:結構(可空=真正的)| |——has_sound:布爾(可空=真正的)| |——has_motion:布爾(可空=真正的)| |——has_person:布爾(可空=真正的)| |——start_time:時間戳(可空=真正的)| |——end_time:時間戳(可空=真正的)| |——urls_expire_time:時間戳(可空=真正的)| |——web_url:字符串(可空=真正的)| |——app_url:字符串(可空=真正的)| |——image_url:字符串(可空=真正的)| |——animated_image_url:字符串(可空=真正的)| |——activity_zone_ids:數組(可空=真正的)| | |——元素:字符串(containsNull =真正的)
聚合和寫回卡夫卡
現在我們將處理目擊
DataFrame,向每個看到它的位置。讓我們回想一下,我們有一些位置數據查找的郵政編碼設備的設備id。我們首先創建一個DataFrame代表這個位置數據,然後加入的目擊
DataFrame匹配設備id。我們所做的就是加入了流媒體DataFrame目擊
與一個靜態DataFrame位置!
添加位置數據
locationDF=spark.table (“device_locations”)。選擇(“device_id”、“zip_code”)sightingLoc=目擊事件。加入(locationDF“device_id”)
總統計和卡夫卡寫出來
現在,讓我們來生成一個流聚合計算攝像機的數量人目擊每個小時在每一個郵政編碼,並把它寫出來壓實卡夫卡的話題1稱為“nest-camera-stats”。
sightingLoc \.groupBy (“zip_code”,窗口(“start_time”,“1小時”))\。數()\。選擇(\to_json(結構體(“zip_code”、“窗口”)).alias(“關鍵”),坳(“計數”)。投(“字符串”).alias \(“價值”)).writeStream \.format \(“卡夫卡”).option (“kafka.bootstrap。服務器”、“localhost: 9092”) \.option(“主題”、“nest-camera-stats”) \.option (“checkpointLocation”、“/道路/ / HDFS dir”) \.outputMode \(“完整的”)。開始()
上麵的查詢將處理任何目擊事件發生和寫出的更新計數照準卡夫卡,鍵控的郵政編碼和小時窗口看到。隨著時間的推移,許多相同的密鑰更新將導致許多記錄關鍵,壓實和卡夫卡的話題將會刪除舊更新為新值到達關鍵。這種方式,壓實試圖確保最終,隻有最新的值保存為任何給定的鍵。
在持久存儲歸檔結果
除了寫出聚合結果卡夫卡,我們可能想保存原始的相機記錄持久性存儲供以後使用。下麵的例子寫出相機
DataFrame S3在地板上的格式。我們選擇拚花的壓縮和柱狀存儲,盡管許多不同的格式,如獸人,Avro、CSV、等支持定製不同的用例。
相機。寫Stream \。格式(“鋪”)\.option (“startingOffsets”,“最早”)\.option (“路徑”,“s3: / / nest-logs”)\.option (“checkpointLocation”,“/ HDFS dir /道路/”)\.start ()
注意,我們可以簡單地重用相同的相機
DataFrame多個流查詢開始。例如,我們可以查詢DataFrame離線的相機列表,並將通知發送到網絡運營中心的進一步調查。
批量查詢報告
我們的下一個例子將運行一個批處理查詢在卡夫卡“nest-camera-stats”壓實話題並生成一個報告顯示,與大量的目擊郵政編碼。
寫批處理查詢類似於流查詢除外,我們使用讀
方法,而不是readStream
方法和寫
而不是writeStream
。
批量讀取和格式化數據
報告=火花\.read \.format \(“卡夫卡”).option (“kafka.bootstrap。服務器”、“localhost: 9092”) \.option(“訂閱”、“nest-camera-stats”) \.load () \。選擇(\json_tuple(坳(“關鍵”)。投(“字符串”)、“zip_code”、“窗口”)。別名(“zip_code”、“窗口”),坳(“價值”)。投(“字符串”)。投(“整數”).alias \(“計數”))。在哪裏(數> 1000)\。選擇(“zip_code”、“窗口”)\。截然不同的()
這份報告DataFrame可用於報告或創建一個實時指示板顯示與極端的目擊事件。
結論
在這篇文章中,我們顯示的例子使用和將實時數據流從卡夫卡。我們實現了一個端到端的的例子連續應用程序,顯示了簡潔和易於編程和結構化流api,同時利用這些api提供了強大的僅一次語義。
未來的博客文章在本係列中,我們將討論更多關於:
- 流媒體應用程序監控
- 計算事件時間聚合與結構化流
如果你想了解更多關於結構化流,這裏有一些有用的鏈接:
- 以前的博客帖子解釋的動機和概念結構化流:
- 連續應用程序:發展流在Apache 2.0火花
- 結構化流在Apache火花
- 實時流ETL結構化流在Apache 2.1火花
- 處理複雜的結構化流在Apache 2.1火花
- 結構化流編程指南
- 火花峰會上討論2017年東——結構化流準備生產和未來的發展方向
嚐試結構化流在Apache 2.1火花,今天試著磚。