Apache火花™教程:開始使用Apache火花磚
結構化流概述
傳感器、物聯網設備、社交網絡和在線交易所有生成的數據需要不斷監控,迅速采取行動。因此,需要大規模、實時流處理比以往任何時候都更加突出。本教程模塊介紹了結構化流媒體,主要的模型來處理流數據集在Apache火花。在結構化流,數據流被視為一個不斷附加的表。這導致的流處理模型非常類似於批處理模式。你表達你的流計算標準批查詢在一個靜態的表,但火花運行作為一個增量查詢無界輸入表。
考慮輸入數據流輸入表。每個數據項都是到達流就像一個新行被添加到輸入表。
輸入的查詢生成一個結果表。在每一個觸發間隔(說,每1秒),新行添加到輸入表,最終更新結果表。每當更新結果表,改變結果行寫入外部下沉。被定義為輸出寫入到外部存儲。輸出可以配置在不同的模式:
- 完整的模式:整個結果表寫入外部存儲更新。由存儲連接器來決定如何處理整個表的編寫。
- Append模式在結果表:隻有新行附加自上次觸發被寫入外部存儲。這隻適用於現有的查詢結果表中的行是不會改變的。
- 更新模式:隻有結果表中的行,更新自上次觸發被寫入外部存儲。這不同於完整的模式,更新模式輸出的行改變了自從上次觸發。如果查詢不包含聚合,它相當於Append模式。
模塊在本教程中,您將學習如何:
- 加載示例數據
- 初始化一個流
- 開始流工作
- 查詢一個流
我們也提供一個樣的筆記本你可以導入訪問和運行的所有代碼示例包含在模塊。
加載示例數據
開始使用結構化流的最簡單方法是使用一個磚中可用數據集的例子/ databricks-datasets
在磚工作區文件夾訪問。磚示例事件數據文件/ / databricks-datasets / structured-streaming /事件
使用構建結構化的流媒體應用程序。讓我們看一看這個目錄的內容。
文件中的每一行包含一個JSON記錄兩個字段:時間
和行動
。
{“時間”:1469501675,“行動”:“開放”}{“時間”:1469501678,“行動”:“關閉”}{“時間”:1469501680,“行動”:“開放”}{“時間”:1469501685,“行動”:“開放”}{“時間”:1469501686,“行動”:“開放”}{“時間”:1469501689,“行動”:“開放”}{“時間”:1469501691,“行動”:“開放”}{“時間”:1469501694,“行動”:“開放”}{“時間”:1469501696,“行動”:“關閉”}{“時間”:1469501702,“行動”:“開放”}{“時間”:1469501703,“行動”:“開放”}{“時間”:1469501704,“行動”:“開放”}
初始化流
由於樣本數據隻是一個靜態的文件集,您可以模擬流從他們通過讀取一個文件,他們創建的順序。
inputPath =“/ databricks-datasets / structured-streaming /事件/”#定義模式加快處理jsonSchema = StructType ([StructField (“時間”、TimestampType ()真正的),StructField (“行動”、StringType ()真正的)))
streamingInputDF = (火花.readStream. schema (jsonSchema)#設置JSON數據的模式.option (“maxFilesPerTrigger”,1)#治療一係列的文件作為流一次通過選擇一個文件. json (inputPath))
streamingCountsDF = (streamingInputDF.groupBy (streamingInputDF.action,窗口(streamingInputDF.time,“1小時”)).count ())
開始流媒體工作
你開始流計算通過定義一個水槽和啟動它。在我們的例子中,交互查詢項,設置完整的組1小時數在內存中的表。
查詢= (streamingCountsDF.writeStream。格式(“記憶”)#內存=存儲內存表(僅用於測試).queryName (“計數”)# =內存表的名稱.outputMode (“完整的”)#完成=表中所有重要的應該.start ())
查詢
是一個處理流查詢命名計數
這是在後台運行。這個查詢不斷拿起文件和更新窗口的數量。命令窗口流的狀態報告:
當你擴張計數
,你會得到一個儀表板的數量記錄處理,批統計數據,和聚合的狀態:
交互式查詢流
計數
聚合:%sql選擇行動,date_format(窗口。最終,“MMM-dd HH: mm”)作為時間,數從計數訂單通過時間,行動
從這一係列的截圖可以看到,每次執行查詢變化以反映行動計算基於數據的輸入流。
我們也提供一個樣的筆記本你可以導入訪問和運行的所有代碼示例包含在模塊。