開始
加載和管理數據
處理數據
政府
參考和資源
2023年2月9日更新
給我們反饋
本教程向您展示如何為Databricks湖屋設置端到端分析管道。
重要的
本教程使用交互式筆記本在Unity Catalog啟用的集群上用Python完成常見的ETL任務。如果您沒有使用Unity Catalog,請參閱在Databricks上運行您的第一個ETL工作負載.
在本文結束時,你會覺得很舒服:
啟動一個支持Unity Catalog的計算集群.
創建Databricks筆記本.
從Unity Catalog外部位置寫入和讀取數據.
使用Auto Loader配置增量數據攝取到Unity Catalog表.
執行筆記本單元格來處理、查詢和預覽數據.
將筆記本調度為Databricks作業.
從Databricks SQL查詢Unity Catalog表
Databricks提供了一套生產就緒工具,允許數據專業人員快速開發和部署提取、轉換和加載(ETL)管道。Unity Catalog允許數據管理員為整個組織的用戶配置和保護存儲憑證、外部位置和數據庫對象。Databricks SQL允許分析師對生產ETL工作負載中使用的相同表運行SQL查詢,允許大規模的實時商業智能
您已經登錄到Databricks,並且處於Data Science & Engineering工作區。有關更多信息,請參見開始:免費試用和安裝.
請注意
如果您沒有集群控製權限,隻要您有,您仍然可以完成下麵的大部分步驟訪問集群.
如果您隻能訪問Databricks SQL工作區,請參見:開始Databricks SQL.
要進行探索性數據分析和數據工程,可以創建一個集群來提供執行命令所需的計算資源。
點擊計算在側欄中。
在“計算池”頁麵,單擊創建集群.這將打開New Cluster頁麵。
為集群指定唯一的名稱。
選擇單獨的節點單選按鈕。
選擇單用戶從訪問模式下拉。
確保您的電子郵件地址在單用戶接入字段。
選擇所需的Databricks運行時版本, 11.1或以上版本使用Unity Catalog。
點擊創建集群.
要了解有關Databricks集群的更多信息,請參見集群.
要開始在Databricks上編寫和執行交互式代碼,請創建一個筆記本。
點擊新在側欄中,然後單擊筆記本.
在“創建筆記本”界麵:
為筆記本指定唯一的名稱。
確保默認語言設置為Python.
選擇步驟1中創建的集群集群下拉。
點擊創建.
筆記本打開時頂部有一個空單元格。
有關創建和管理筆記本電腦的詳細信息,請參見管理筆記本.
Databricks推薦使用自動加載程序用於增量數據攝取。Auto Loader自動檢測和處理新文件,因為他們到達雲對象存儲。
您可以使用Unity Catalog來管理外部位置的安全訪問。的用戶或服務主體讀文件外部位置上的權限可以使用自動加載程序來攝取數據。
讀文件
通常,由於來自其他係統的寫入,數據將到達外部位置。在這個演示中,您可以通過將JSON文件寫入外部位置來模擬數據到達。
將下麵的代碼複製到一個筆記本單元格中。替換字符串值for目錄的目錄名稱創建目錄而且使用目錄權限。替換字符串值forexternal_location的外部位置的路徑讀文件,寫文件,創建外部表格權限。
目錄
創建目錄
使用目錄
external_location
寫文件
創建外部表格
外部位置可以定義為整個存儲容器,但通常指向容器中嵌套的目錄。
外部位置路徑的正確格式是“s3: / / bucket名/道路/ / external_location”.
“s3: / / bucket名/道路/ / external_location”
external_location=“< your_external_location >”目錄=“< your_catalog >”dbutils.fs.把(f"{external_location}/ filename.txt”,“Hello world !”,真正的)顯示(dbutils.fs.頭(f"{external_location}/ filename.txt”))dbutils.fs.rm(f"{external_location}/ filename.txt”)顯示(火花.sql(f顯示模式{目錄}"))
執行這個單元格應該打印一行寫了12字節,打印字符串“Hello world!”,並顯示所提供目錄中的所有數據庫。如果您無法讓這個單元格運行,請確認您是在一個支持Unity Catalog的工作空間中,並從您的工作空間管理員請求適當的權限來完成本教程。
下麵的Python代碼使用您的電子郵件地址在提供的目錄中創建唯一的數據庫,並在提供的外部位置中創建唯一的存儲位置。執行此單元格將刪除與本教程相關的所有數據,從而允許您以冪等方式執行此示例。定義並實例化了一個類,您將使用它來模擬從連接的係統到達源外部位置的批量數據。
將此代碼複製到筆記本中的新單元格,並執行它來配置環境。
這段代碼中定義的變量應該允許您安全地執行它,而不會有與現有工作區資產或其他用戶衝突的風險。在執行此代碼時,受限製的網絡或存儲權限將引發錯誤;請與工作區管理員聯係以排除這些限製。
從pyspark.sql.functions進口上校#設置工作區隔離參數並重置演示用戶名=火花.sql(的“選擇regexp_replace (current_user (), [^ a-zA-Z0-9)”,“_”)”).第一個() (0]數據庫=f"{目錄}.e2e_lakehouse_{用戶名}_db”源=f"{external_location}/ e2e-lakehouse-source”表格=f"{數據庫}.target_table”checkpoint_path=f"{external_location}/ _checkpoint / e2e-lakehouse-demo”火花.sql(f“c.username = '{用戶名}’”)火花.sql(f“設置c.database ={數據庫}")火花.sql(f“c.source = '{源}’”)火花.sql(drop database if exists $ .{c.database}級聯”)火花.sql(創建數據庫{c.database}")火花.sql(“使用美元{c.database}")清除之前演示執行的數據dbutils.fs.rm(源,真正的)dbutils.fs.rm(checkpoint_path,真正的)定義一個類來加載批量數據到源代碼類LoadData:def__init__(自我,源):自我.源=源defget_date(自我):試一試:df=火花.讀.格式(“json”).負載(源)除了:返回“2016-01-01”batch_date=df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1天").第一個() (0]如果batch_date.月= =3.:提高異常(“源數據耗盡”)返回batch_datedefget_batch(自我,batch_date):返回(火花.表格(“samples.nyctaxi.trips”).過濾器(上校(“tpep_pickup_datetime”).投(“日期”)= =batch_date))defwrite_batch(自我,批處理):批處理.寫.格式(“json”).模式(“添加”).保存(自我.源)defland_batch(自我):batch_date=自我.get_date()批處理=自我.get_batch(batch_date)自我.write_batch(批處理)RawData=LoadData(源)
現在,您可以通過將以下代碼複製到單元格中並執行它來獲取一批數據。您可以手動執行此單元格最多60次以觸發新數據到達。
RawData.land_batch()
Databricks建議使用三角洲湖.Delta Lake是一個開源存儲層,它提供ACID事務並支持數據湖屋。Delta Lake是在Databricks中創建的表的默認格式。
要配置Auto Loader以攝取數據到Unity Catalog表中,請將以下代碼複製並粘貼到筆記本中的空單元格中:
#導入函數從pyspark.sql.functions進口input_file_name,current_timestamp#配置自動加載器攝取JSON數據到Delta表(火花.readStream.格式(“cloudFiles”).選項(“cloudFiles.format”,“json”).選項(“cloudFiles.schemaLocation”,checkpoint_path).負載(file_path).選擇(“*”,input_file_name().別名(“source_file”),current_timestamp().別名(“processing_time”)).writeStream.選項(“checkpointLocation”,checkpoint_path).觸發(availableNow=真正的).選項(“mergeSchema”,“真正的”).toTable(表格))
要了解有關自動裝載機的更多信息,請參見什麼是自動加載器?.
要了解有關使用Unity目錄的結構化流媒體的更多信息,請參見使用Unity目錄與結構化流.
筆記本逐個單元執行邏輯。使用以下步驟在單元格中執行邏輯:
要運行在上一步中完成的單元格,選擇單元格並按下SHIFT + ENTER.
若要查詢剛剛創建的表,請將以下代碼複製並粘貼到空單元格中,然後按SHIFT + ENTER運行單元格。
df=火花.讀.表格(table_name)
要預覽DataFrame中的數據,請將以下代碼複製並粘貼到空單元格中,然後按SHIFT + ENTER運行單元格。
顯示(df)
有關可視化數據的交互選項的詳細信息,請參見可視化.
通過將Databricks筆記本添加為Databricks作業中的任務,可以將它們作為生產腳本運行。在這一步中,您將創建一個可以手動觸發的新作業。
把你的筆記本安排為一項任務:
點擊時間表在標題欄的右側。
屬性的惟一名稱作業名.
點擊手冊.
在集群下拉菜單,選擇步驟1中創建的集群。
在出現的窗口中,單擊現在運行.
要查看作業運行結果,請單擊圖標旁邊的最後一次運行時間戳。
有關作業的更多信息,請參見創建、運行和管理Databricks作業.
任何有使用目錄對當前目錄的權限使用模式當前模式的權限選擇表上的權限可以從他們首選的Databricks API查詢表的內容。
使用模式
選擇
可以使用上麵的角色切換器切換到Databricks SQL UI+在屏幕的左上方。選擇SQL從下拉菜單。
+
在Databricks SQL中執行查詢需要訪問正在運行的SQL倉庫。
在本教程前麵創建的表具有此名稱target_table.您可以使用在第一個單元格中提供的目錄和帶有模式的數據庫進行查詢e2e_lakehouse_ < your_username >.您可以使用數據瀏覽來查找您創建的數據對象。
target_table
e2e_lakehouse_ < your_username >
使用Databricks了解更多關於數據工程集成和工具的信息:
連接您最喜歡的IDE
使用dbt和Databricks
了解Databricks CLI (Command Line Interface)
了解Databricks Terraform Provider