跳轉到主要內容
工程的博客

處理複雜的數據格式與結構化流在Apache 2.1火花

@磚可伸縮的第2部分數據
分享這篇文章

第1部分本係列的結構化流媒體博客,我們演示了如何簡單寫一個端到端的流媒體ETL管道使用結構化流將JSON CloudTrail日誌轉化為拚花表。博客強調,構建此類管道的主要挑戰之一是閱讀和轉換來自各種數據源的數據和複雜的格式。在這篇文章中,我們將進一步詳細地檢查這個問題,和展示Apache火花可以使用SQL的內置函數來解決你所有的數據轉換的挑戰。

具體地說,我們將討論如下:

  • 不同的數據格式和他們的權衡是什麼
  • 如何與他們合作容易使用火花SQL
  • 如何選擇適合你的用例的最終格式嗎

數據來源和格式

數據可用在各種不同的格式。電子表格可以用XML表示、CSV、TSV;可以編寫應用程序指標在原始文本或JSON。每個用例都有一個特定的數據格式為。在大數據的世界中,我們經常遇到格式像拚花,獸人,Avro, JSON、CSV、SQL和NoSQL數據來源,和純文本文件。我們可以大致將這些數據格式為三個類別:結構化、半結構化和非結構化數據。讓我們試著去理解每個類別的好處和缺點。

圖顯示的各種類型的數據來源和格式

結構化數據

結構化數據源定義一個模式的數據。通過這個額外的信息基礎數據,結構化數據源提供有效的存儲和性能。例如,柱狀格式如鑲木地板和獸人讓你更容易提取值列的一個子集。一行一行地閱讀每條記錄,然後提取感興趣的特定列的值可以讀取更多的數據比是必要的查詢時隻對一小部分感興趣的列。基於行的有效存儲格式如Avro序列化和存儲數據提供存儲的好處。然而,這些優勢通常為代價的靈活性。例如,由於結構的剛度,發展模式是很有挑戰性的。

非結構化數據

相比之下,非結構化數據源通常是自由格式的文本或二進製對象不包含標記,或元數據(例如,逗號在CSV文件),定義數據的組織。報紙文章、醫療記錄、圖像斑點、應用程序日誌通常是作為非結構化數據。這些資源通常需要上下文數據解析。也就是說,你需要知道文件是一個圖像或一篇報紙文章。大多數的非結構化數據來源。成本的非結構化格式是它變得繁瑣的提取價值這些數據源轉換和需要解釋這些特征提取技術數據集

半結構化數據

半結構化數據源的結構化/記錄但不一定有明確的全球模式生成所有記錄。因此,每個數據記錄是增強模式信息。JSON和XML是受歡迎的例子。半結構化數據格式的好處是,他們提供最靈活地表達你的數據,每個記錄是自描述的。這些格式是非常普遍的在許多應用程序中盡可能多的輕量級的解析器來處理這些記錄存在,他們也有造福人類可讀的。然而,這些格式的主要缺點是他們承擔額外的解析開銷,並不是特別特別查詢。

交換的數據格式與火花SQL

在我們的以前的博文,我們討論了如何將從JSON Cloudtrail日誌轉換為拚花縮短我們的特別的查詢的運行時的10倍。火花SQL允許用戶從這些類攝取數據的數據源,在批處理和流媒體查詢。本機支持讀寫數據拚花,獸人,JSON、CSV、文本格式和大量的存在於其他連接器火花包。你也可以使用JDBC數據源連接到SQL數據庫。

Apache火花可以用來交換的數據格式一樣容易:

=引發事件。readStream \格式(“json”)\#或鑲木地板,卡夫卡,獸人…….option () \#格式特定選項. schema (my_schema) \#需要.load (“路徑/ /數據”)輸出=…#執行你的轉換輸出。writeStream \#寫出您的數據格式(“鋪”)\.start (“路徑/ /寫”)

是否批量或流媒體數據,我們知道如何讀和寫不同的數據源和格式,但不同來源的支持不同的模式和數據類型。傳統數據庫隻支持原始數據類型,而格式像JSON對象允許用戶窩在列,有一個數組的值,或者代表一組鍵值對。用戶通常會去中間這些數據類型有效地存儲和代表他們的數據。幸運的是,火花SQL便於處理原始的和複雜的數據類型。現在讓我們進入一個快速概述如何從複雜數據類型的基本數據類型和vice-a-versa。

將複雜數據類型

各種不同類型的數據的例子

是常見的複雜數據類型,如結構、地圖和數組在處理半結構化的格式。例如,您可能是您的web服務器日誌API請求。這個API請求將包含HTTP頭,這將是一個string-string地圖。請求負載可能包含JSON格式的形式,它可以包含嵌套的字段或數組。一些來源或格式可能或可能不支持複雜的數據類型。一些格式可以提供性能優勢時將數據存儲在一個特定的數據類型。例如,當使用拚花,所有結構列將收到相同的治療頂級列。因此,如果你有一個嵌套過濾領域,你將得到相同的利益作為一個頂級列。然而,地圖被視為兩個數組列,因此你不會得到高效過濾語義。

讓我們看看一些例子關於火花SQL允許你形狀數據隨意一些數據轉換技術。

選擇從嵌套列

點()可以用來訪問嵌套列結構和地圖。

/ /輸入{“一個”:{“b”:1}}Python:events.select (“a.b”)Scala:events.select (“a.b”)SQL:選擇a.b事件/ /輸出{“b”:1}

扁平化結構

一個明星(*)可用於選擇所有的分支結構。

/ /輸入{“一個”:{“b”:1,“c”:2}}Python:events.select (“。*”)Scala:events.select (“。*”)SQL:選擇一個。*事件/ /輸出{“b”:1,“c”:2}

嵌套列

結構體函數或括號在SQL可以用來創建一個新的結構。

/ /輸入{“一個”:1,“b”:2,“c”:3}Python:events.select (struct(坳(“一個”).alias (“y”).alias (“x”))Scala:events.select(結構體(”為“y)作為“x)SQL:選擇named_struct (“y”)作為x事件/ /輸出{" x ": {“y”: 1}}

嵌套所有列

明星(*)也可以用於包含所有列在一個嵌套的結構體。

/ /輸入{“一個”:1,“b”:2}Python:events.select(結構體(“*”).alias (“x”))Scala:events.select(結構體(“*”)作為“x)SQL:選擇結構(*)x事件/ /輸出{" x ": {“a”: 1、“b”: 2}}

選擇一個數組或地圖元素

getItem ()或方括號(即。[])可以用來選擇一個元素的數組或地圖。

/ /輸入{“一個”:【1,2]}Python:events.select(坳(“一個”).getItem (0).alias (“x”))Scala:events.select (“a.getItem (0) 'x)SQL:選擇一個0]作為x事件/ /輸出{“x”:1}
/ /輸入{“一個”:{“b”:1}}Python:events.select(坳(“一個”).getItem (“b”).alias (“x”))Scala:events.select (“a.getItem (b)為“x)SQL:選擇一個“b”]作為x事件/ /輸出{“x”:1}

為每個數組或地圖創建一行元素

爆炸()可用於創建一個新行數組中每個元素或每一個鍵-值對。這是類似於HiveQL側視爆炸。

/ /輸入{“一個”:【1,2]}Python:events.select(爆炸(“一個”).alias (“x”))Scala:events.select(爆炸(“)”x)SQL:選擇爆炸(a)作為x事件/ /輸出[{“x”:1},{“x”:2})
/ /輸入{“一個”:{“b”:1,“c”:2}}Python:events.select(爆炸(“一個”).alias (“x”,“y”))Scala:events.select(爆炸(”)作為Seq (" x "、" y "))SQL:選擇爆炸(一)(x, y)事件/ /輸出[{“x”:“b”,“y”: 1}, {“x”:“c”、“y”: 2}]

收集多個行到一個數組中

collect_list ()collect_set ()可用於聚合條目到一個數組中。

/ /輸入[{“x”:1},{“x”:2})Python:events.select (collect_list (“x”).alias (“x”))Scala:events.select (collect_list (“x)”x)SQL:選擇collect_list (x)作為x事件/ /輸出{“x”:【1,2]}
/ /輸入[{“x”:1,“y”:“一個”},{“x”:2,“y”:“b”})Python:events.groupBy (“y”).agg (collect_list (“x”).alias (“x”))Scala:events.groupBy (“y”).agg (collect_list (“x)”x)SQL:選擇y, collect_list (x)作為xy組的事件/ /輸出[{“y”:“一個”,“x”:【1},{“y”:“b”,“x”:【2]}]

選擇一個字段數組中的每一項

當你使用點符號,我們返回一個新的數組,數組字段從每個數組元素被選中。

/ /輸入{“一個”:【{“b”:1},{“b”:2}]}Python:events.select (“a.b”)Scala:events.select (“a.b”)SQL:選擇a.b事件/ /輸出{“b”:【1,2]}

的力量to_json()和from_json ()

如果你真的想保護你列的複雜結構但你需要編碼為字符串存儲嗎?你是命中注定的嗎?當然不是!火花SQL提供函數to_json ()編碼作為一個字符串和一個結構體from_json ()檢索結構作為一個複雜類型。使用JSON字符串作為列是有用的閱讀或者寫作的時候就像卡夫卡流來源。每個卡夫卡鍵-值記錄將增強一些元數據,如攝入時間戳到卡夫卡,卡夫卡的抵消,等等。如果“價值”字段包含JSON數據,您可以使用from_json ()提取數據,豐富,清潔它,然後把它下遊再次卡夫卡或寫出來到一個文件。

編碼作為json結構

to_json ()可用於結構轉化為JSON字符串。這個方法特別有用,當你想重新編碼多個列成一個一個寫數據時卡夫卡。該方法目前尚不支持的SQL。

/ /輸入{“一個”:{“b”:1}}Python:events.select (to_json (“一個”).alias (“c”))Scala:events.select (to_json (“)”c)/ /輸出{“c”:“{\ b \”: 1}”}

解碼json列作為一個結構體

from_json ()可以用來將一個字符串列與JSON數據結構。然後你可以平struct如上所述單個列。該方法目前尚不支持的SQL。

//輸入{“一”:“{\ b \”: 1}”}
              Python:模式=StructType ()。添加(“b”, IntegerType ())events.select (from_json (“一個”,模式)。alias("c"))Scala:薇爾模式=StructType ()。添加(“b”, IntegerType)events.select (from_json (的,模式)c)//輸出{" c ": {“b”:1}}

有時你可能想留下一個JSON字符串的一部分仍為JSON模式為了避免太多的複雜性。

//輸入{“一”:“{\ b \”: {\“x \”: 1, \“y \”: {\“z \”: 2}}}”}
              Python:模式=StructType ()。添加(“b”, StructType ()。添加(“x”, IntegerType ())添加(“y”, StringType ()))events.select (from_json (“一個”,模式)。alias("c"))Scala:薇爾模式=StructType ()。添加(“b”,StructType ()。添加(“x”, IntegerType)添加(“y”, StringType))events.select (from_json (的,模式)c)//輸出{" c ": {" b ": {“x”:1,“y”:“{\“z \”: 2}”}}}

解析一組字段的列包含JSON

json_tuple ()可以用來提供一個字符串提取字段列與JSON數據。

/ /輸入{“一個”:“{\ b \”: 1}”}Python:events.select (json_tuple (“一個”,“b”).alias (“c”))Scala:events.select (json_tuple (“一個,“b”)”c)SQL:選擇json_tuple (,“b”)作為c事件/ /輸出{“c”:1}

有時一個字符串列可能不是自描述為JSON,但是可能還有一個格式良好的結構。例如,它可能是一個Log4j日誌消息生成使用特定的格式。引發這些字符串可以使用SQL結構為你輕鬆!

解析一個格式良好的字符串列

regexp_extract ()可以使用正則表達式來解析字符串。

/ /輸入[{“一個”:“x: 1”},{“一個”:“楊:2”})Python:events.select (regexp_extract (“一個”,”([a - z]):“,1).alias (“c”))Scala:events.select (regexp_extract (”,“([a - z]):“(1)”c)SQL:選擇regexp_extract (,”([a - z]):“,1)作為c事件/ /輸出[{“c”:“x”},{“c”:“y”})

這是一個許多轉換!現在讓我們看一些現實生活的用例將所有這些數據格式,和數據操縱功能良好的使用。

利用這種力量

在磚,我們從我們的服務和使用它們來執行收集日誌實時監控檢測問題,之前我們的客戶都受到影響。beplay体育app下载地址日誌文件非結構化文件,但它們可解析,因為他們有一個明確的Log4j格式。我們運行一個日誌收集器服務發送每個日誌條目和額外的元數據條目(如源)JSON運動。這些JSON文件記錄然後batch-uploaded S3。查詢這些JSON回答任何問題的日誌是乏味:這些文件包含重複,回答任何查詢,即使它涉及單個列,整個JSON記錄可能需要反序列化。

為了解決這個問題,我們運行一個管道,讀取這些JSON元數據記錄和執行重複數據刪除。現在我們隻剩下原來的日誌記錄,這可能是JSON格式或非結構化文本。如果我們處理JSON時,我們使用from_json ()和幾個上麵描述的轉換數據格式。如果是文本,我們使用方法等regexp_extract ()我們的Log4j格式解析成一個更結構化的形式。一旦我們完成我們所有的轉換和重組,我們在拚花分區按日期保存記錄。這給了我們10 - 100 x加速在回答這樣的問題“我們看到了多少錯誤消息10:00-10:30之間這個特定服務”?都可以歸因於:

  • 我們不再付出代價的反序列化JSON記錄
  • 我們不需要執行複雜的字符串比較原始的日誌消息
  • 我們隻需要提取兩列在我們的查詢:時間,日誌級別

下麵是幾個常見的用例,我們看到我們的客戶包括:beplay体育app下载地址

“我想和我的數據機器學習管道。我的數據已經預處理,和我將使用所有的功能在整個管道。”

Avro是個不錯的選擇,當你將訪問整個行數據。

“我有一個物聯網傳感器發送我的用例的事件。對於每個事件元數據,重要的是不同的。”

在這種情況下,您希望您的方案的靈活性,你可能會考慮使用JSON來存儲你的數據。

“我想訓練語音識別算法在報紙文章或情緒分析產品評論。”

在這種情況下,您的數據可能沒有一個固定的模式,也沒有一個固定的模式和結構,它可能會更容易將它存儲為純文本文件。你也可以有一個管道,進行特征提取這一非結構化數據並將其存儲為Avro準備你的機器學習管道。

結論

在這篇文章中,我們討論了如何引發SQL允許您使用數據外,還可以從許多別的來源和格式,且容易執行這些數據格式之間的轉換和交換。我們分享我們對數據的牧師在磚,和其他生產使用的情況下,您可能希望考慮用不同的方式來做事情。

火花SQL提供了必要的工具來訪問您的數據隻要可能,無論以什麼形式可能是在和準備為下遊應用程序與低延遲流數據或高吞吐量在舊曆史數據!

未來的博客文章在本係列中,我們將討論更多關於:

  • 流媒體應用程序監控
  • 將結構化流與Apache卡夫卡
  • 計算事件時間聚合與結構化流

如果你想了解更多關於結構化流,這裏有一些有用的鏈接。

最後,嚐試我們的示例筆記本演示將複雜數據類型在Python中,Scala或SQL:

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map