三角洲湖快速入門
湖三角洲快速入門的概述與三角洲湖的基礎知識。快速入門演示了如何將數據加載到一個三角洲表,修改表,看表,顯示表的曆史,和優化表。
的演示本文中描述的一些特性(以及更多),看這個YouTube視頻(9分鍾)。
對現有數據磚筆記本,證明這些特性,明白了介紹性的筆記本。
創建一個表
創建一個增量表,您可以使用Apache火花SQL代碼,改變現有的編寫格式拚花
,csv
,json
等等,δ
。
對於所有類型的文件,你的文件讀入DataFrame使用相應的輸入格式(例如,拚花
,csv
,json
等等),然後寫出三角洲中的數據格式。在這個代碼示例中,輸入文件已經在三角洲格式和位於樣本數據集(databricks-datasets)。這段代碼在三角洲格式保存數據磚文件係統(DBFS)在指定的位置save_path
。
#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path=“/ tmp /δ/ people-10m”table_name=“default.people10m”#從源頭裝載數據。人=火花\。讀\。格式(read_format)\。負載(load_path)#寫數據到其目標。人。寫\。格式(write_format)\。保存(save_path)#創建表。火花。sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)
圖書館(SparkR)sparkR.session()#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path=“/ tmp /δ/ people-10m /”table_name=“default.people10m”#從源頭裝載數據。人=read.df(load_path,源=read_format)#寫數據到其目標。write.df(人,源=write_format,路徑=save_path)#創建表。sql(粘貼(“CREATE TABLE”,table_name,“用δ位置”,save_path,“”,9月=”“))
/ /定義輸入和輸出格式和路徑和表名。瓦爾read_format=“δ”瓦爾write_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾table_name=“default.people10m”/ /加載數據從其來源。瓦爾人=火花。讀。格式(read_format)。負載(load_path)/ /寫數據到其目標。人。寫。格式(write_format)。保存(save_path)/ /創建表。火花。sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)
——位置必須已經存在的路徑,必須是三角洲格式。創建表默認的。people10m使用δ位置“/ tmp /δ/ people-10m”
前麵的操作創建一個新的非托管表通過從數據推斷的模式。對於非托管的表,你控製的位置數據。磚跟蹤表的名稱和它的位置。信息可用選項創建一個增量表時,看到的創建一個表和寫一個表。
如果你的拚花格式的源文件,您可以使用轉換為δ聲明轉換文件。如果非托管相應的表,表轉換後仍非托管:
轉換來δ拚花。' /tmp/δ/人- - - - - -10米”
創建一個新的管理表,你可以使用創建表語句指定表名,然後你可以數據加載到表中。或者你可以使用saveAsTable
與Python方法,R,或Scala。例如:
的表=“people10m”sourceType=“δ”loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”人=火花\。讀\。格式(sourceType)\。負載(loadPath)人。寫\。格式(sourceType)\。saveAsTable(的表)顯示(火花。sql(“SELECT * FROM”+的表))
圖書館(SparkR)sparkR.session()的表=“people10m”sourceType=“δ”loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”人=read.df(路徑=loadPath,源=sourceType)saveAsTable(df=人,源=sourceType,的表=的表)顯示(sql(粘貼(“SELECT * FROM”,的表,9月=”“)))
瓦爾的表=“people10m”瓦爾sourceType=“δ”瓦爾loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾人=火花。讀。格式(sourceType)。負載(loadPath)人。寫。格式(sourceType)。saveAsTable(的表)顯示(火花。sql(“SELECT * FROM”+的表))
創建表people10m使用δ作為選擇*從δ。' /磚- - - - - -數據集/學習- - - - - -火花- - - - - -v2/人/人- - - - - -10米。δ”;選擇*從people10m;
如果你的源文件是三角洲湖支持的格式,您可以使用以下速記直接讀取一個文件,通過使用的位置說明符<文件類型>。<位置>的“:
#定義輸入格式和路徑。read_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”#從源頭裝載數據。人=火花。sql(“SELECT * FROM”+read_format+”。”+load_path+“”)
圖書館(SparkR)sparkR.session()#定義輸入格式和路徑。read_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”#從源頭裝載數據。人=sql(粘貼(“SELECT * FROM”,read_format,”。”,load_path,“”,9月=”“))
/ /定義輸入格式和路徑。瓦爾read_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”/ /加載數據從其來源。瓦爾人=火花。sql(“SELECT * FROM”+read_format+”。”+load_path+“”)
選擇*從δ。' /磚- - - - - -數據集/學習- - - - - -火花- - - - - -v2/人/人- - - - - -10米。δ”
對於管理表,磚決定數據的位置。的位置,您可以使用描述的細節語句,例如:
顯示(火花。sql(“描述細節people10m”))
顯示(sql(“描述細節people10m”))
顯示(火花。sql(“描述細節people10m”))
描述細節people10m;
對數據進行分區
加快查詢謂詞包括分區的列,可以對數據進行分區。下麵的代碼示例是類似的創建一個表,但本例中分區的數據。
#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by=“性別”save_path=“/ tmp /δ/ people-10m”table_name=“default.people10m”#從源頭裝載數據。人=火花\。讀\。格式(read_format)\。負載(load_path)#寫數據到其目標。人。寫\。partitionBy(partition_by)\。格式(write_format)\。保存(save_path)#創建表。火花。sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)
如果你已經運行Python中的代碼範例創建一個表,您必須先刪除現有表和保存數據:
#定義表名和輸出路徑。table_name=“default.people10m”save_path=“/ tmp /δ/ people-10m”#刪除表。火花。sql(“刪除表”+table_name)#刪除保存的數據。dbutils。fs。rm(save_path,真正的)
圖書館(SparkR)sparkR.session()#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by=“性別”save_path=“/ tmp /δ/ people-10m /”table_name=“default.people10m”#從源頭裝載數據。人=read.df(load_path,源=read_format)#寫數據到其目標。write.df(人,源=write_format,partitionBy=partition_by,路徑=save_path)#創建表。sql(粘貼(“CREATE TABLE”,table_name,“用δ位置”,save_path,“”,9月=”“))
如果你已經跑R中的代碼範例創建一個表,您必須先刪除現有表和保存數據:
圖書館(SparkR)sparkR.session()#定義表名和輸出路徑。table_name=“default.people10m”save_path=“/ tmp /δ/ people-10m”#刪除表。sql(粘貼(“刪除表”,table_name,9月=”“))#刪除保存的數據。dbutils.fs.rm(save_path,真正的)
/ /定義輸入和輸出格式和路徑和表名。瓦爾read_format=“δ”瓦爾write_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾partition_by=“性別”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾table_name=“default.people10m”/ /加載數據從其來源。瓦爾人=火花。讀。格式(read_format)。負載(load_path)/ /寫數據到其目標。人。寫。partitionBy(partition_by)。格式(write_format)。保存(save_path)/ /創建表。火花。sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)
如果你已經跑Scala中的代碼範例創建一個表,您必須先刪除現有表和保存數據:
/ /定義表名和輸出路徑。瓦爾table_name=“default.people10m”瓦爾save_path=“/ tmp /δ/ people-10m”/ /刪除表。火花。sql(“刪除表”+table_name)/ /刪除保存的數據。dbutils。fs。rm(save_path,真正的)
當你創建一個增量表對數據進行分區使用SQL,指定分區通過
列。
——位置必須已經存在的路徑,必須是三角洲格式。創建表默認的。people10m(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ分區通過(性別)位置“/ tmp /δ/ people-10m”
如果你已經跑SQL中的代碼範例創建一個表,您必須先刪除現有表:
下降表默認的。people10m
修改一個表
三角洲湖支持一組豐富的操作來修改表。
流寫入一個表
你可以寫數據到三角洲表中使用結構化的流。三角洲湖事務日誌保證隻有一次處理,即使還有其他流對表或批量查詢並發運行的情況。默認情況下,流在附加模式下運行,向表添加新記錄。
以下代碼示例開始結構化流。它監視DBFS中指定的位置json_read_path
JSON文件,掃描上傳到這個位置。結構化流通知文件上傳,它試圖將數據寫入DBFS中指定的位置save_path
通過使用模式中指定read_schema
。結構化流持續監測上傳文件到代碼停止。結構化流使用DBFS中指定的位置checkpoint_path
確保上傳的文件隻計算一次。
#定義模式和輸入、檢查點和輸出路徑。read_schema=(“int id”,+“firstName字符串”,+“middleName弦。”+“姓字符串”,+“性別字符串,”+“生日時間戳”,+“ssn弦。”+“工資int”)json_read_path=' / FileStore / streaming-uploads / people-10m 'checkpoint_path=“/ tmp /δ/ people-10m /檢查站”save_path=“/ tmp /δ/ people-10m”people_stream=(火花。readStream。模式(read_schema)。選項(“maxFilesPerTrigger”,1)。選項(“多行”,真正的)。格式(“json”)。負載(json_read_path))(people_stream。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,checkpoint_path)。開始(save_path))
圖書館(SparkR)sparkR.session()#定義模式和輸入、檢查點和輸出路徑。read_schema=“id int, firstName字符串,middleName字符串,lastName字符串,字符串性別,出生年月日時間戳,ssn字符串,工資int”json_read_path=“/ FileStore / streaming-uploads / people-10m”checkpoint_path=“/ tmp /δ/ people-10m /檢查站”save_path=“/ tmp /δ/ people-10m”people_stream=read.stream(“json”,路徑=json_read_path,模式=read_schema,多行=真正的,maxFilesPerTrigger=1)write.stream(people_stream,路徑=save_path,模式=“添加”,checkpointLocation=checkpoint_path)
/ /定義模式和輸入,檢查點和輸出路徑。瓦爾read_schema=(“int id”,+“firstName字符串”,+“middleName弦。”+“姓字符串”,+“性別字符串,”+“生日時間戳”,+“ssn弦。”+“工資int”)瓦爾json_read_path=“/ FileStore / streaming-uploads / people-10m”瓦爾checkpoint_path=“/ tmp /δ/ people-10m /檢查站”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾people_stream=(火花。readStream。模式(read_schema)。選項(“maxFilesPerTrigger”,1)。選項(“多行”,真正的)。格式(“json”)。負載(json_read_path))people_stream。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,checkpoint_path)。開始(save_path)
為了測試這種行為,這是一個符合JSON文件,你可以上傳到指定的位置json_read_path
,然後查詢的位置save_path
看寫的結構化數據流。
({“id”:10000021,“firstName”:“喬”,“middleName”:“亞曆山大”,“姓”:“史密斯”,“性別”:“M”,“生日”:188712000,“ssn”:“123-45-6789”,“工資”:50000年},{“id”:10000022,“firstName”:“瑪麗”,“middleName”:“簡”,“姓”:“母鹿”,“性別”:“F”,“生日”:“1968 - 10 - 27 t04:00:00.000 + 000”,“ssn”:“234-56-7890”,“工資”:75500年}]
關於三角洲湖與結構化集成流的更多信息,見表流讀取和寫入和結構化的生產流。看到也結構化流編程指南在Apache火花網站上。
批量插入
合並一組更新和插入到現有δ表,您使用合並成聲明。例如,下麵的語句將數據從源表合並成目標三角洲表。當兩個表中有一個匹配的行,三角洲湖更新數據列使用給定的表達式。當沒有匹配的行,三角洲湖添加一個新行。該操作被稱為一個插入。
合並成默認的。people10m使用默認的。people10m_upload在默認的。people10m。id=默認的。people10m_upload。id當匹配然後更新集*當不匹配然後插入*
如果您指定*
,這更新或插入目標表中的所有列。這假設源表具有相同的列的目標表,否則查詢將拋出一個錯誤分析。
你必須為每一列指定值表當您執行一個插入
操作(例如,當現有的數據集)中的沒有匹配的行。然而,你不需要更新所有的值。
測試前麵的示例中,創建一個源表如下:
創建表默認的。people10m_upload(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ
測試當匹配
條款,源表填入下麵的行,然後運行前合並成
聲明。因為兩個表有匹配的行在
條款,目標表的匹配行更新。
插入成默認的。people10m_upload值(9999998,“比利”,“湯米·”,“Luppitt”,“米”,1992 - 09 - 17 t04:00:00.000 + 0000的,“953-38-9452”,55250年),(9999999,“伊萊亞斯”,“西裏爾”,“利百特”,“米”,1984 - 05 - 22 t04:00:00.000 + 0000的,“906-51-2137”,48500年),(10000000,“約書亞”,‘底盤’,“Broggio”,“米”,1968 - 07 - 22 t04:00:00.000 + 0000的,“988-61-6247”,90000年)
看到結果,查詢該表。
選擇*從默認的。people10m在哪裏id之間的9999998和10000000排序通過idASC
測試當不匹配
條款,源表填入下麵的行,然後運行前合並成
聲明。因為目標表沒有以下行,這些行被添加到目標表。
插入成默認的。people10m_upload值(20000001,“約翰。”,”,“母鹿”,“米”,1978 - 01 - 14 - t04:00:00.000 + 000的,“345-67-8901”,55500年),(20000002,“瑪麗”,”,“史密斯”,“F”,1982 - 10 - 29 t01:00:00.000 + 000,“456-78-9012”,98250年),(20000003,“簡”,”,“母鹿”,“F”,1981 - 06 - 25 - t04:00:00.000 + 000的,“567-89-0123”,89900年)
看到結果,查詢該表。
選擇*從默認的。people10m在哪裏id之間的20000001和20000003排序通過idASC
運行前的任何SQL語句在Python中,R,或Scala,通過聲明作為一個字符串參數spark.sql
在Python中,函數或Scalasql
函數R。
讀一個表
在本節中:
您訪問三角洲表中的數據通過指定路徑DBFS (“/ tmp /δ/ people-10m”
)或表名(“default.people10m”
):
人=火花。讀。格式(“δ”)。負載(“/ tmp /δ/ people-10m”)顯示(人)
或
人=火花。表(“default.people10m”)顯示(人)
圖書館(SparkR)sparkR.session()人=read.df(路徑=“/ tmp /δ/ people-10m”,源=“δ”)顯示(人)
或
圖書館(SparkR)sparkR.session()人=tableToDF(“default.people10m”)顯示(人)
瓦爾人=火花。讀。格式(“δ”)。負載(“/ tmp /δ/ people-10m”)顯示(人)
或
瓦爾人=火花。表(“default.people10m”)顯示(人)
選擇*從δ。' /tmp/δ/人- - - - - -10米”
或
選擇*從默認的。people10m
查詢的一個早期版本表(時間旅行)
三角洲湖時間旅行允許您查詢一個年長的三角洲表的快照。
查詢一個舊版本的表,指定一個版本或時間戳選擇
聲明。例如,要從曆史查詢版本0以上,使用:
火花。sql(“從默認選擇*。people10m版本作為的0')
或
火花。sql(“SELECT *從違約。people10m時間戳作為的“2019-01-29 00:37:58”")
圖書館(SparkR)sparkR.session()sql(“SELECT *從違約。people10m版本作為的0")
或
圖書館(SparkR)sparkR.session()sql(“SELECT *從違約。people10m時間戳作為的“2019-01-29 00:37:58”")
火花。sql(“SELECT *從違約。people10m版本作為的0")
或
火花。sql(“SELECT *從違約。people10m時間戳作為的“2019-01-29 00:37:58”")
選擇*從默認的。people10m版本作為的0
或
選擇*從默認的。people10m時間戳作為的“2019-01-29 00:37:58”
時間戳,隻接受日期或時間戳字符串,例如,“2019-01-01”
和“2019 - 01 - 01 - 00:00:00.000Z”
。
請注意
因為版本1是在時間戳“2019-01-2900:38:10”
,查詢版本0您可以使用任何時間戳的範圍“2019-01-2900:37:58”
來“2019-01-2900:38:09”
包容性。
DataFrameReader選項允許你創建一個從三角洲DataFrame表是固定到一個特定版本的表,例如在Python中:
df1=火花。讀。格式(“δ”)。選項(“timestampAsOf”,“2019-01-01”)。負載(“/ tmp /δ/ people-10m”)顯示(df1)
或
df2=火花。讀。格式(“δ”)。選項(“versionAsOf”,2)。負載(“/ tmp /δ/ people-10m”)顯示(df2)
有關詳細信息,請參見查詢一個表(舊的快照時間旅行)。
優化一個表
一旦完成多個更改一個表,你可能會有很多小文件。提高閱讀的速度查詢,您可以使用優化
崩潰的小文件到較大的:
火花。sql(“優化delta. / tmp /δ/ people-10m”)
或
火花。sql(“優化default.people10m”)
圖書館(SparkR)sparkR.session()sql(“優化delta. / tmp /δ/ people-10m”)
或
圖書館(SparkR)sparkR.session()sql(“優化default.people10m”)
火花。sql(“優化delta. / tmp /δ/ people-10m”)
或
火花。sql(“優化default.people10m”)
優化δ。' /tmp/δ/人- - - - - -10米”
或
優化默認的。people10m
z值的列
為了進一步提高讀取性能,您可以在同一組共同部署相關信息由z值的文件。自動使用這個co-locality三角洲湖data-skipping算法極大地減少了需要讀取的數據量。z值數據,在指定的列順序ZORDER通過
條款。例如,在同一個地點協同工作性別
運行:
火花。sql(“優化三角洲。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)")
或
火花。sql(“優化默認。people10mZORDER通過(性別)')
圖書館(SparkR)sparkR.session()sql(“優化三角洲。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)")
或
圖書館(SparkR)sparkR.session()sql(“優化違約。people10mZORDER通過(性別)")
火花。sql(“優化三角洲。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)")
或
火花。sql(“優化違約。people10mZORDER通過(性別)")
優化δ。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)
或
優化默認的。people10mZORDER通過(性別)
全套的運行時選項優化
,請參閱壓實(裝箱)。
清理快照
三角洲湖為閱讀提供了快照隔離,這意味著它是安全的優化
盡管其他用戶或工作表查詢。然而,最終你應該清理舊的快照。你可以通過運行真空
命令:
火花。sql(“真空default.people10m”)
圖書館(SparkR)sparkR.session()sql(“真空default.people10m”)
火花。sql(“真空default.people10m”)
真空默認的。people10m
你控製的年齡最新保留快照使用保留< N >小時
選擇:
火花。sql(“真空違約。people10m保留24小時')
圖書館(SparkR)sparkR.session()sql(“真空違約。people10m保留24小時")
火花。sql(“真空違約。people10m保留24小時")
真空默認的。people10m保留24小時
有關使用真空
有效地,看刪除文件不再由三角洲表引用。