數據攝取到三角洲湖
磚提供了多種方式來幫助你攝取數據到三角洲湖。
上傳CSV文件
你可以安全地從CSV文件使用創建表在磚SQL創建表的UI。
合作夥伴集成
磚夥伴集成允許您方便地將數據加載到數據磚。這些集成使low-code、易於實現、和可伸縮的數據從各種各樣的來源攝取到磚。看到磚的集成。
複製成
SQL命令
的複製成
SQL命令允許您的數據文件位置加載到三角洲表。這是一個re-triable和冪等操作;文件已經被加載的源位置跳過。
使用複製成
SQL命令,而不是自動加載程序當:
你想從一個文件位置,其中包含文件加載數據的順序的或更少。
你的數據模式預計不會經常演變。
你計劃來加載之前上傳文件的子集。
簡要概述和演示複製成
SQL命令,以及自動加載程序在本文的後麵,看這個YouTube視頻(2分鍾)。
下麵的例子顯示了如何創建一個增量表,然後使用複製成
SQL命令加載示例數據樣本數據集(databricks-datasets)到桌子上。您可以從運行示例代碼筆記本附加到一個磚集群。
table_name=“default.loan_risks_upload”source_data=/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet 'source_format=“鋪”火花。sql(如果存在刪除表”+table_name)火花。sql(“CREATE TABLE”+table_name+”(“\“loan_id BIGINT。”+\“funded_amnt INT。”+\“paid_amnt加倍,”+\“addr_state字符串)”)火花。sql(“複製到”+table_name+\“從”+source_data+“”+\" FILEFORMAT = "+source_format)loan_risks_upload_data=火花。sql(“SELECT * FROM”+table_name)顯示(loan_risks_upload_data)“‘結果:+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +| loan_id | funded_amnt | paid_amnt | addr_state |+ = = = = = = = = = + = = = = = = = = = = = = = + = = = = = = = = = = = + = = = = = = = = = = = = +| 0 | 1000 | 182.22 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +| 1 | 1000 | 361.19 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +| 2 | 1000 | 176.26 | TX |+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +…“‘
圖書館(SparkR)sparkR.session()table_name=“default.loan_risks_upload”source_data=“/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet”source_format=“鋪”sql(粘貼(如果存在刪除表”,table_name,9月=”“))sql(粘貼(“CREATE TABLE”,table_name,”(“,“loan_id BIGINT。”,“funded_amnt INT。”,“paid_amnt加倍,”,“addr_state字符串)”,9月=”“))sql(粘貼(“複製到”,table_name,“從”,source_data,“”," FILEFORMAT = ",source_format,9月=”“))loan_risks_upload_data=tableToDF(table_name)顯示(loan_risks_upload_data)結果:# + - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +# | loan_id | funded_amnt | paid_amnt | addr_state |# + = = = = = = = = = + = = = = = = = = = = = = = + = = = = = = = = = = = + = = = = = = = = = = = = +# | 0 | 1000 | 182.22 | |# + - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +# | 1 | 1000 | 361.19 | |# + - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +# | 2 | 1000 | 176.26 | TX |# + - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +#……
瓦爾table_name=“default.loan_risks_upload”瓦爾source_data=“/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet”瓦爾source_format=“鋪”火花。sql(如果存在刪除表”+table_name)火花。sql(“CREATE TABLE”+table_name+”(“+“loan_id BIGINT。”+“funded_amnt INT。”+“paid_amnt加倍,”+“addr_state字符串)”)火花。sql(“複製到”+table_name+“從”+source_data+“”+" FILEFORMAT = "+source_format)瓦爾loan_risks_upload_data=火花。表(table_name)顯示(loan_risks_upload_data)/ *結果:+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +| loan_id | funded_amnt | paid_amnt | addr_state |+ = = = = = = = = = + = = = = = = = = = = = = = + = = = = = = = = = = = + = = = = = = = = = = = = +| 0 | 1000 | 182.22 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +| 1 | 1000 | 361.19 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +| 2 | 1000 | 176.26 | TX |+ - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +…* /
下降表如果存在默認的。loan_risks_upload;創建表默認的。loan_risks_upload(loan_id長整型數字,funded_amntINT,paid_amnt雙,addr_state字符串);複製成默認的。loan_risks_upload從/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet 'FILEFORMAT=拚花;選擇*從默認的。loan_risks_upload;——結果:- + - + - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +——| loan_id | funded_amnt | paid_amnt | addr_state |- + = = = = = = = = = + = = = = = = = = = = = = = + = = = = = = = = = = = + = = = = = = = = = = = = +——| 0 | 1000 | 182.22 | |- + - + - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +——| 1 | 1000 | 361.19 | |- + - + - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +——| 2 | 1000 | 176.26 | TX |- + - + - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - +——……
清理,運行以下代碼,刪除表:
火花。sql(“刪除表”+table_name)
sql(粘貼(“刪除表”,table_name,9月=”“))
火花。sql(“刪除表”+table_name)
下降表默認的。loan_risks_upload
更多的例子和細節,請參閱
自動加載程序
汽車逐步加載程序和有效的過程隨著他們到達雲存儲新的數據文件沒有任何額外的設置。自動加載器提供了一個新的結構化流源cloudFiles
。給定一個輸入在雲端文件存儲目錄路徑,cloudFiles
源自動流程為到達的新文件,選擇也處理現有的文件目錄。
使用自動加載程序,而不是複製到SQL命令當:
你想從一個文件位置,其中包含文件加載數據的順序數百萬或更高。自動裝載器可以發現更有效的文件
複製成
SQL命令,可以將文件處理分為多個批次。你經常數據模式的發展。自動加載程序模式推理和演化提供了更好的支持。看到配置模式推理和進化自動加載程序。
你不計劃來加載之前上傳文件的子集。使用自動加載程序,它可以很難再加工的子集文件。然而,您可以使用
複製成
SQL命令重新加載文件時自動加載程序流的子集是同時運行。
簡要概述和演示的自動加載程序,以及複製到SQL命令本文前麵,看這個YouTube視頻(2分鍾)。
較長的概述和示範的自動加載程序,看這個YouTube視頻(59分鍾)。
下麵的代碼示例演示了如何自動加載程序檢測到雲存儲新的數據文件。您可以從運行示例代碼筆記本附加到一個磚集群。
創建文件上傳目錄,例如:
user_dir=<我的名字> @ < my-organization.com >的upload_path=“/ FileStore / shared-uploads /”+user_dir+“/ population_data_upload”dbutils。fs。mkdir(upload_path)
瓦爾user_dir=“<我的名字> @ < my-organization.com >”瓦爾upload_path=“/ FileStore / shared-uploads /”+user_dir+“/ population_data_upload”dbutils。fs。mkdir(upload_path)
創建以下樣例CSV文件,然後上傳的文件上傳目錄使用DBFS文件瀏覽器:
WA.csv
:2019年西雅圖市,人口地鐵,西雅圖地鐵3406000,2020,3433000
OR.csv
:城市,人口波特蘭地鐵,2019年,波特蘭地鐵2127000,2020,2151000
運行下麵的代碼開始自動加載程序。
checkpoint_path=“/ tmp /δ/ population_data / _checkpoints”write_path=“/ tmp /δ/ population_data”#設置流讀取輸入文件的開始# upload_path位置。df=火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“csv”)\。選項(“頭”,“真正的”)\。模式(“城市字符串,int,人口長”)\。負載(upload_path)#開始流。#使用checkpoint_path位置記錄的所有文件#已經上傳到upload_path位置。#對於那些已經上傳自上次檢查,#新上載文件的數據寫入write_path位置。df。writeStream。格式(“δ”)\。選項(“checkpointLocation”,checkpoint_path)\。開始(write_path)
瓦爾checkpoint_path=“/ tmp /δ/ population_data / _checkpoints”瓦爾write_path=“/ tmp /δ/ population_data”/ /設置流讀取輸入文件的開始/ / upload_path位置。瓦爾df=火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“csv”)。選項(“頭”,“真正的”)。模式(“城市字符串,int,人口長”)。負載(upload_path)/ /啟動流。/ /使用checkpoint_path位置記錄的所有文件/ /已經上傳到upload_path位置。/ /對於那些已經上傳自上次檢查,/ /上傳文件的數據寫入write_path位置。df。writeStream。格式(“δ”)。選項(“checkpointLocation”,checkpoint_path)。開始(write_path)
步驟3中的代碼仍然運行,運行以下代碼來查詢中的數據寫入目錄:
df_population=火花。讀。格式(“δ”)。負載(write_path)顯示(df_population)“‘結果:+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +| | | |年人口的城市+ = = = = = = = = = = = = = = = = + = = = = = = + = = = = = = = = = = = = +西雅圖地鐵| 2019 | 3406000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +西雅圖地鐵| 2020 | 3433000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2019 | 2127000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2020 | 2151000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +“‘
瓦爾df_population=火花。讀。格式(“δ”)。負載(write_path)顯示(df_population)/ *結果:+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +| | | |年人口的城市+ = = = = = = = = = = = = = = = = + = = = = = = + = = = = = = = = = = = = +西雅圖地鐵| 2019 | 3406000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +西雅圖地鐵| 2020 | 3433000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2019 | 2127000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2020 | 2151000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +* /
步驟3中的代碼仍在運行,請創建以下額外的CSV文件,然後上傳上傳目錄使用DBFS文件瀏覽器:
ID.csv
:城市,人口博伊西,2019年,438000博伊西,2020年、447000年
MT.csv
:城市,人口海倫娜,2019年,81653年海倫娜,2020,82590
Misc.csv
:2021年西雅圖市,人口地鐵3461000波特蘭地鐵,2021年,2174000博伊西,2021年,455000年海倫娜,2021,81653
步驟3中的代碼仍然運行,運行以下代碼來查詢的現有數據寫目錄,除了新數據文件自動加載程序已經探測到的上傳目錄,然後寫入寫目錄:
df_population=火花。讀。格式(“δ”)。負載(write_path)顯示(df_population)“‘結果:+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +| | | |年人口的城市+ = = = = = = = = = = = = = = = = + = = = = = = + = = = = = = = = = = = = +西雅圖地鐵| 2019 | 3406000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +西雅圖地鐵| 2020 | 3433000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +81653 | | 2019 | |海倫娜+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +82590 | | 2020 | |海倫娜+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +438000 | | 2019 | |博伊西+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +447000 | | 2020 | |博伊西+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2019 | 2127000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2020 | 2151000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +西雅圖地鐵| 2021 | 3461000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2021 | 2174000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +455000 | | 2021 | |博伊西+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +81653 | | 2021 | |海倫娜+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +“‘
瓦爾df_population=火花。讀。格式(“δ”)。負載(write_path)顯示(df_population)/ *結果+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +| | | |年人口的城市+ = = = = = = = = = = = = = = = = + = = = = = = + = = = = = = = = = = = = +西雅圖地鐵| 2019 | 3406000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +西雅圖地鐵| 2020 | 3433000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +81653 | | 2019 | |海倫娜+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +82590 | | 2020 | |海倫娜+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +438000 | | 2019 | |博伊西+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +447000 | | 2020 | |博伊西+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2019 | 2127000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2020 | 2151000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +西雅圖地鐵| 2021 | 3461000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +波特蘭地鐵| 2021 | 2174000 | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +455000 | | 2021 | |博伊西+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +81653 | | 2021 | |海倫娜+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +* /
清理,取消在步驟3中運行代碼,然後運行下麵的代碼,刪除上傳的檢查站,並編寫目錄:
dbutils。fs。rm(write_path,真正的)dbutils。fs。rm(upload_path,真正的)
dbutils。fs。rm(write_path,真正的)dbutils。fs。rm(upload_path,真正的)
更多細節,請參閱自動加載程序。