三角洲湖快速入門

湖三角洲快速入門的概述與三角洲湖的基礎知識。快速入門演示了如何將數據加載到一個三角洲表,修改表,看表,顯示表的曆史,和優化表。

的演示本文中描述的一些特性(以及更多),看這個YouTube視頻(9分鍾)。

您可以運行本文中的示例代碼中筆記本附加到一個磚集群

對現有數據磚筆記本,證明這些特性,明白了介紹性的筆記本

創建一個表

創建一個增量表,您可以使用Apache火花SQL代碼,改變現有的編寫格式拚花,csv,json等等,δ

對於所有類型的文件,你的文件讀入DataFrame使用相應的輸入格式(例如,拚花,csv,json等等),然後寫出三角洲中的數據格式。在這個代碼示例中,輸入文件已經在三角洲格式和位於樣本數據集(databricks-datasets)。這段代碼在三角洲格式保存數據磚文件係統(DBFS)在指定的位置save_path

#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path=“/ tmp /δ/ people-10m”table_name=“default.people10m”#從源頭裝載數據。=火花\\格式(read_format)\負載(load_path)#寫數據到其目標。\格式(write_format)\保存(save_path)#創建表。火花sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)
圖書館(SparkR)sparkR.session()#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path=“/ tmp /δ/ people-10m /”table_name=“default.people10m”#從源頭裝載數據。=read.df(load_path,=read_format)#寫數據到其目標。write.df(,=write_format,路徑=save_path)#創建表。sql(粘貼(“CREATE TABLE”,table_name,“用δ位置”,save_path,“”,9月=”“))
/ /定義輸入和輸出格式和路徑和表名。瓦爾read_format=“δ”瓦爾write_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾table_name=“default.people10m”/ /加載數據從其來源。瓦爾=火花格式(read_format)負載(load_path)/ /寫數據到其目標。格式(write_format)保存(save_path)/ /創建表。火花sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)
——位置必須已經存在的路徑,必須是三角洲格式。創建默認的people10m使用δ位置“/ tmp /δ/ people-10m”

前麵的操作創建一個新的非托管表通過從數據推斷的模式。對於非托管的表,你控製的位置數據。磚跟蹤表的名稱和它的位置。信息可用選項創建一個增量表時,看到的創建一個表寫一個表

如果你的拚花格式的源文件,您可以使用轉換為δ聲明轉換文件。如果非托管相應的表,表轉換後仍非托管:

轉換δ拚花' /tmp/δ/- - - - - -10

創建一個新的管理表,你可以使用創建表語句指定表名,然後你可以數據加載到表中。或者你可以使用saveAsTable與Python方法,R,或Scala。例如:

的表=“people10m”sourceType=“δ”loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”=火花\\格式(sourceType)\負載(loadPath)\格式(sourceType)\saveAsTable(的表)顯示(火花sql(“SELECT * FROM”+的表))
圖書館(SparkR)sparkR.session()的表=“people10m”sourceType=“δ”loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”=read.df(路徑=loadPath,=sourceType)saveAsTable(df=,=sourceType,的表=的表)顯示(sql(粘貼(“SELECT * FROM”,的表,9月=”“)))
瓦爾的表=“people10m”瓦爾sourceType=“δ”瓦爾loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾=火花格式(sourceType)負載(loadPath)格式(sourceType)saveAsTable(的表)顯示(火花sql(“SELECT * FROM”+的表))
創建people10m使用δ作為選擇*δ' /- - - - - -數據集/學習- - - - - -火花- - - - - -v2//- - - - - -10δ;選擇*people10m;

如果你的源文件是三角洲湖支持的格式,您可以使用以下速記直接讀取一個文件,通過使用的位置說明符<文件類型>。<位置>的“:

#定義輸入格式和路徑。read_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”#從源頭裝載數據。=火花sql(“SELECT * FROM”+read_format+”。”+load_path+“”)
圖書館(SparkR)sparkR.session()#定義輸入格式和路徑。read_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”#從源頭裝載數據。=sql(粘貼(“SELECT * FROM”,read_format,”。”,load_path,“”,9月=”“))
/ /定義輸入格式和路徑。瓦爾read_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”/ /加載數據從其來源。瓦爾=火花sql(“SELECT * FROM”+read_format+”。”+load_path+“”)
選擇*δ' /- - - - - -數據集/學習- - - - - -火花- - - - - -v2//- - - - - -10δ

對於管理表,磚決定數據的位置。的位置,您可以使用描述的細節語句,例如:

顯示(火花sql(“描述細節people10m”))
顯示(sql(“描述細節people10m”))
顯示(火花sql(“描述細節people10m”))
描述細節people10m;

另請參閱創建一個表控製數據的位置

對數據進行分區

加快查詢謂詞包括分區的列,可以對數據進行分區。下麵的代碼示例是類似的創建一個表,但本例中分區的數據。

#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by=“性別”save_path=“/ tmp /δ/ people-10m”table_name=“default.people10m”#從源頭裝載數據。=火花\\格式(read_format)\負載(load_path)#寫數據到其目標。\partitionBy(partition_by)\格式(write_format)\保存(save_path)#創建表。火花sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)

如果你已經運行Python中的代碼範例創建一個表,您必須先刪除現有表和保存數據:

#定義表名和輸出路徑。table_name=“default.people10m”save_path=“/ tmp /δ/ people-10m”#刪除表。火花sql(“刪除表”+table_name)#刪除保存的數據。dbutilsfsrm(save_path,真正的)
圖書館(SparkR)sparkR.session()#定義輸入和輸出格式和路徑和表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by=“性別”save_path=“/ tmp /δ/ people-10m /”table_name=“default.people10m”#從源頭裝載數據。=read.df(load_path,=read_format)#寫數據到其目標。write.df(,=write_format,partitionBy=partition_by,路徑=save_path)#創建表。sql(粘貼(“CREATE TABLE”,table_name,“用δ位置”,save_path,“”,9月=”“))

如果你已經跑R中的代碼範例創建一個表,您必須先刪除現有表和保存數據:

圖書館(SparkR)sparkR.session()#定義表名和輸出路徑。table_name=“default.people10m”save_path=“/ tmp /δ/ people-10m”#刪除表。sql(粘貼(“刪除表”,table_name,9月=”“))#刪除保存的數據。dbutils.fs.rm(save_path,真正的)
/ /定義輸入和輸出格式和路徑和表名。瓦爾read_format=“δ”瓦爾write_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾partition_by=“性別”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾table_name=“default.people10m”/ /加載數據從其來源。瓦爾=火花格式(read_format)負載(load_path)/ /寫數據到其目標。partitionBy(partition_by)格式(write_format)保存(save_path)/ /創建表。火花sql(“CREATE TABLE”+table_name+“用δ位置”+save_path+“”)

如果你已經跑Scala中的代碼範例創建一個表,您必須先刪除現有表和保存數據:

/ /定義表名和輸出路徑。瓦爾table_name=“default.people10m”瓦爾save_path=“/ tmp /δ/ people-10m”/ /刪除表。火花sql(“刪除表”+table_name)/ /刪除保存的數據。dbutilsfsrm(save_path,真正的)

當你創建一個增量表對數據進行分區使用SQL,指定分區通過列。

——位置必須已經存在的路徑,必須是三角洲格式。創建默認的people10m(idINT,firstName字符串,middleName字符串,字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ分區通過(性別)位置“/ tmp /δ/ people-10m”

如果你已經跑SQL中的代碼範例創建一個表,您必須先刪除現有表:

下降默認的people10m

修改一個表

三角洲湖支持一組豐富的操作來修改表。

流寫入一個表

你可以寫數據到三角洲表中使用結構化的流。三角洲湖事務日誌保證隻有一次處理,即使還有其他流對表或批量查詢並發運行的情況。默認情況下,流在附加模式下運行,向表添加新記錄。

以下代碼示例開始結構化流。它監視DBFS中指定的位置json_read_pathJSON文件,掃描上傳到這個位置。結構化流通知文件上傳,它試圖將數據寫入DBFS中指定的位置save_path通過使用模式中指定read_schema。結構化流持續監測上傳文件到代碼停止。結構化流使用DBFS中指定的位置checkpoint_path確保上傳的文件隻計算一次。

#定義模式和輸入、檢查點和輸出路徑。read_schema=(“int id”,+“firstName字符串”,+“middleName弦。”+“姓字符串”,+“性別字符串,”+“生日時間戳”,+“ssn弦。”+“工資int”)json_read_path=' / FileStore / streaming-uploads / people-10m 'checkpoint_path=“/ tmp /δ/ people-10m /檢查站”save_path=“/ tmp /δ/ people-10m”people_stream=(火花readStream模式(read_schema)選項(“maxFilesPerTrigger”,1)選項(“多行”,真正的)格式(“json”)負載(json_read_path))(people_streamwriteStream格式(“δ”)outputMode(“添加”)選項(“checkpointLocation”,checkpoint_path)開始(save_path))
圖書館(SparkR)sparkR.session()#定義模式和輸入、檢查點和輸出路徑。read_schema=“id int, firstName字符串,middleName字符串,lastName字符串,字符串性別,出生年月日時間戳,ssn字符串,工資int”json_read_path=“/ FileStore / streaming-uploads / people-10m”checkpoint_path=“/ tmp /δ/ people-10m /檢查站”save_path=“/ tmp /δ/ people-10m”people_stream=read.stream(“json”,路徑=json_read_path,模式=read_schema,多行=真正的,maxFilesPerTrigger=1)write.stream(people_stream,路徑=save_path,模式=“添加”,checkpointLocation=checkpoint_path)
/ /定義模式和輸入,檢查點和輸出路徑。瓦爾read_schema=(“int id”,+“firstName字符串”,+“middleName弦。”+“姓字符串”,+“性別字符串,”+“生日時間戳”,+“ssn弦。”+“工資int”)瓦爾json_read_path=“/ FileStore / streaming-uploads / people-10m”瓦爾checkpoint_path=“/ tmp /δ/ people-10m /檢查站”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾people_stream=(火花readStream模式(read_schema)選項(“maxFilesPerTrigger”,1)選項(“多行”,真正的)格式(“json”)負載(json_read_path))people_streamwriteStream格式(“δ”)outputMode(“添加”)選項(“checkpointLocation”,checkpoint_path)開始(save_path)

為了測試這種行為,這是一個符合JSON文件,你可以上傳到指定的位置json_read_path,然後查詢的位置save_path看寫的結構化數據流。

({“id”:10000021,“firstName”:“喬”,“middleName”:“亞曆山大”,“姓”:“史密斯”,“性別”:“M”,“生日”:188712000,“ssn”:“123-45-6789”,“工資”:50000年},{“id”:10000022,“firstName”:“瑪麗”,“middleName”:“簡”,“姓”:“母鹿”,“性別”:“F”,“生日”:“1968 - 10 - 27 t04:00:00.000 + 000”,“ssn”:“234-56-7890”,“工資”:75500年}]

關於三角洲湖與結構化集成流的更多信息,見表流讀取和寫入結構化的生產流。看到也結構化流編程指南在Apache火花網站上。

批量插入

合並一組更新和插入到現有δ表,您使用合並成聲明。例如,下麵的語句將數據從源表合並成目標三角洲表。當兩個表中有一個匹配的行,三角洲湖更新數據列使用給定的表達式。當沒有匹配的行,三角洲湖添加一個新行。該操作被稱為一個插入

合並默認的people10m使用默認的people10m_upload默認的people10mid=默認的people10m_uploadid匹配然後更新*匹配然後插入*

如果您指定*,這更新或插入目標表中的所有列。這假設源表具有相同的列的目標表,否則查詢將拋出一個錯誤分析。

你必須為每一列指定值表當您執行一個插入操作(例如,當現有的數據集)中的沒有匹配的行。然而,你不需要更新所有的值。

測試前麵的示例中,創建一個源表如下:

創建默認的people10m_upload(idINT,firstName字符串,middleName字符串,字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ

測試匹配條款,源表填入下麵的行,然後運行前合並聲明。因為兩個表有匹配的行條款,目標表的匹配行更新。

插入默認的people10m_upload(9999998,“比利”,“湯米·”,“Luppitt”,“米”,1992 - 09 - 17 t04:00:00.000 + 0000的,“953-38-9452”,55250年),(9999999,“伊萊亞斯”,“西裏爾”,“利百特”,“米”,1984 - 05 - 22 t04:00:00.000 + 0000的,“906-51-2137”,48500年),(10000000,“約書亞”,‘底盤’,“Broggio”,“米”,1968 - 07 - 22 t04:00:00.000 + 0000的,“988-61-6247”,90000年)

看到結果,查詢該表。

選擇*默認的people10m在哪裏id之間的999999810000000排序通過idASC

測試匹配條款,源表填入下麵的行,然後運行前合並聲明。因為目標表沒有以下行,這些行被添加到目標表。

插入默認的people10m_upload(20000001,“約翰。”,,“母鹿”,“米”,1978 - 01 - 14 - t04:00:00.000 + 000的,“345-67-8901”,55500年),(20000002,“瑪麗”,,“史密斯”,“F”,1982 - 10 - 29 t01:00:00.000 + 000,“456-78-9012”,98250年),(20000003,“簡”,,“母鹿”,“F”,1981 - 06 - 25 - t04:00:00.000 + 000的,“567-89-0123”,89900年)

看到結果,查詢該表。

選擇*默認的people10m在哪裏id之間的2000000120000003排序通過idASC

運行前的任何SQL語句在Python中,R,或Scala,通過聲明作為一個字符串參數spark.sql在Python中,函數或Scalasql函數R。

讀一個表

您訪問三角洲表中的數據通過指定路徑DBFS (“/ tmp /δ/ people-10m”)或表名(“default.people10m”):

=火花格式(“δ”)負載(“/ tmp /δ/ people-10m”)顯示()

=火花(“default.people10m”)顯示()
圖書館(SparkR)sparkR.session()=read.df(路徑=“/ tmp /δ/ people-10m”,=“δ”)顯示()

圖書館(SparkR)sparkR.session()=tableToDF(“default.people10m”)顯示()
瓦爾=火花格式(“δ”)。負載(“/ tmp /δ/ people-10m”)顯示()

瓦爾=火花(“default.people10m”)顯示()
選擇*δ' /tmp/δ/- - - - - -10

選擇*默認的people10m

顯示表的曆史

查看表的曆史,使用描述曆史表聲明,它提供了來源的信息,包括版本,操作,用戶,等等,每個寫一個表。

查詢的一個早期版本表(時間旅行)

三角洲湖時間旅行允許您查詢一個年長的三角洲表的快照。

查詢一個舊版本的表,指定一個版本或時間戳選擇聲明。例如,要從曆史查詢版本0以上,使用:

火花sql(“從默認選擇*。people10m版本作為的0')

火花sql(“SELECT *從違約。people10m時間戳作為的“2019-01-29 00:37:58”")
圖書館(SparkR)sparkR.session()sql(“SELECT *從違約。people10m版本作為的0")

圖書館(SparkR)sparkR.session()sql(“SELECT *從違約。people10m時間戳作為的“2019-01-29 00:37:58”")
火花sql(“SELECT *從違約。people10m版本作為的0")

火花sql(“SELECT *從違約。people10m時間戳作為的“2019-01-29 00:37:58”")
選擇*默認的people10m版本作為0

選擇*默認的people10m時間戳作為“2019-01-29 00:37:58”

時間戳,隻接受日期或時間戳字符串,例如,“2019-01-01”“2019 - 01 - 01 - 00:00:00.000Z”

請注意

因為版本1是在時間戳“2019-01-2900:38:10”,查詢版本0您可以使用任何時間戳的範圍“2019-01-2900:37:58”“2019-01-2900:38:09”包容性。

DataFrameReader選項允許你創建一個從三角洲DataFrame表是固定到一個特定版本的表,例如在Python中:

df1=火花格式(“δ”)選項(“timestampAsOf”,“2019-01-01”)負載(“/ tmp /δ/ people-10m”)顯示(df1)

df2=火花格式(“δ”)選項(“versionAsOf”,2)負載(“/ tmp /δ/ people-10m”)顯示(df2)

有關詳細信息,請參見查詢一個表(舊的快照時間旅行)

優化一個表

一旦完成多個更改一個表,你可能會有很多小文件。提高閱讀的速度查詢,您可以使用優化崩潰的小文件到較大的:

火花sql(“優化delta. / tmp /δ/ people-10m”)

火花sql(“優化default.people10m”)
圖書館(SparkR)sparkR.session()sql(“優化delta. / tmp /δ/ people-10m”)

圖書館(SparkR)sparkR.session()sql(“優化default.people10m”)
火花sql(“優化delta. / tmp /δ/ people-10m”)

火花sql(“優化default.people10m”)
優化δ' /tmp/δ/- - - - - -10

優化默認的people10m

z值的列

為了進一步提高讀取性能,您可以在同一組共同部署相關信息由z值的文件。自動使用這個co-locality三角洲湖data-skipping算法極大地減少了需要讀取的數據量。z值數據,在指定的列順序ZORDER通過條款。例如,在同一個地點協同工作性別運行:

火花sql(“優化三角洲。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)")

火花sql(“優化默認。people10mZORDER通過(性別)')
圖書館(SparkR)sparkR.session()sql(“優化三角洲。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)")

圖書館(SparkR)sparkR.session()sql(“優化違約。people10mZORDER通過(性別)")
火花sql(“優化三角洲。' /tmp/δ/人- - - - - -10米”ZORDER通過(性別)")

火花sql(“優化違約。people10mZORDER通過(性別)")
優化δ' /tmp/δ/- - - - - -10ZORDER通過(性別)

優化默認的people10mZORDER通過(性別)

全套的運行時選項優化,請參閱壓實(裝箱)

清理快照

三角洲湖為閱讀提供了快照隔離,這意味著它是安全的優化盡管其他用戶或工作表查詢。然而,最終你應該清理舊的快照。你可以通過運行真空命令:

火花sql(“真空default.people10m”)
圖書館(SparkR)sparkR.session()sql(“真空default.people10m”)
火花sql(“真空default.people10m”)
真空默認的people10m

你控製的年齡最新保留快照使用保留< N >小時選擇:

火花sql(“真空違約。people10m保留24小時')
圖書館(SparkR)sparkR.session()sql(“真空違約。people10m保留24小時")
火花sql(“真空違約。people10m保留24小時")
真空默認的people10m保留24小時

有關使用真空有效地,看刪除文件不再由三角洲表引用