將Apache氣流與磚
這篇文章是我們一係列的一部分內部工程博客磚平台,基礎設施管理、集成、工具、監視和配置。Beplay体育安卓版本
今天,我們很高興宣布本機磚整合Apache氣流一個流行的開源工作流調度程序。這篇文章展示了如何設置氣流和用它來觸發磚工作。
磚的一個非常受歡迎的特性”統一數據分析平台Beplay体育安卓版本(UAP)是數據科學筆記本直接轉化為生產能力的工作可以定期運行。雖然這個特性結合工作流從探索性數據科學生產數據工程,工程工作一些數據可以包含複雜的依賴關係,難以捕捉的筆記本。支持這些複雜的用例,我們提供REST api基於筆記本電腦和圖書館工作可以由外部係統。其中,最常見的一種調度器使用我們的客戶是氣流。beplay体育app下载地址我們樂於分享,我們也擴展支持磚的氣流。
氣流的基礎
氣流是一個通用的工作流調度程序依賴關係管理。除了安排定期的能力工作,氣流可以表達明確的數據管道中的不同階段之間的依賴關係。
每一個ETL管道被表示為一個有向無環圖(DAG)的任務(不要錯誤的火花的DAG調度程序和任務)。依賴關係編碼到DAG的邊緣——對於任何給定的邊緣,下遊任務隻安排如果上遊任務成功完成。下麵舉個例子,在這個例子中,DAG任務B和C後隻會觸發任務一個成功完成。任務D時將觸發任務B和C成功完成。
氣流的任務是“操作符”類的實例和實現小的Python腳本。因為它們是簡單的Python腳本,運營商在氣流可以執行許多任務:他們可以輪詢一些前提是真實的(也稱為一個傳感器)在成功之前,直接執行ETL,或觸發外部係統像磚。
在氣流的更多信息,請看看他們文檔。
本地數據磚集成在氣流
我們實現了一個氣流運營商DatabricksSubmitRunOperator,使氣流和磚之間的平滑集成。通過該操作,我們可以磚運行提交API端點,外部觸發一個運行jar, python腳本,或者筆記本。初始請求提交後,操作員將繼續調查的結果。當成功完成時,操作員將返回允許下遊任務運行。
我們的貢獻DatabricksSubmitRunOperator上遊氣流開源項目。然而,集成將不會被切成一個版本分支在氣流1.9.0發布之前。在那之前,使用這個操作符可以安裝磚的氣流的叉,本質上是氣流1.8.1與我們的版本DatabricksSubmitRunOperator補丁應用。
pip安裝,升級“git + git: / /github.com/databricks/(電子郵件保護)#蛋= apache-airflow(磚)”
氣流與磚教程
在本教程中,我們將設置一個玩具氣流1.8.1部署在本地機器上運行並部署一個例子DAG觸發運行在磚。
我們要做的第一件事就是初始化sqlite數據庫。氣流將使用它來跟蹤其他元數據。氣流在生產部署,你想編輯配置點氣流MySQL或Postgres數據庫但對玩具的案例中,我們將簡單地使用默認的sqlite數據庫。執行初始化運行:
氣流initdb
氣流的SQLite數據庫和默認配置部署將被初始化~ /氣流
。
在下一步,我們將編寫一個DAG運行兩個磚工作的一個線性依賴關係。第一個將觸發一個筆記本位於磚的工作/用戶/(電子郵件保護)/ PrepareData
,第二個將jar位於運行dbfs: / lib / etl-0.1.jar
。
基本上從一英裏高來看,腳本DAG結構兩種DatabricksSubmitRunOperator任務,然後設置依賴最後set_dowstream方法。一個框架版本的代碼是這樣的:
notebook_task = DatabricksSubmitRunOperator (task_id =“notebook_task”,…)
spark_jar_task = DatabricksSubmitRunOperator (task_id =“spark_jar_task”,…)notebook_task.set_downstream (spark_jar_task)
在現實中,還有一些其他的細節我們需要填寫工作DAG文件。第一步是設置一些默認參數將應用到每個任務在我們的DAG。
args = {“主人”:“氣流”,“電子郵件”:【”(電子郵件保護)”),“depends_on_past”:假,“start_date”:airflow.utils.dates.days_ago (2)}
這裏的兩個有趣的論點是depends_on_past和start_date。如果depends_on_past是真的,這信號氣流不應觸發一個任務,除非先前的實例任務成功完成。start_date參數決定當第一個任務實例才會安排。
下一節我們的DAG腳本實際實例化DAG。
dag = dag (dag_id =“example_databricks_operator”default_args =參數,schedule_interval =“@daily”)
在這十克,我們給它一個惟一的ID,附上我們之前宣布的默認參數,給它一個日程表。接下來,我們將指定集群運行的規範我們的任務。
new_cluster = {“spark_version”:“2.1.0-db3-scala2.11”,“node_type_id”:“r3.xlarge”,“aws_attributes”:{“可用性”:“ON_DEMAND”},“num_workers”:8}
該規範的模式匹配的新集群領域運行提交端點。DAG的例子,你可能想要減少工人的數量或改變實例規模較小。
最後,我們將實例化DatabricksSubmitRunOperator與我們的DAG並注冊它。
notebook_task_params = {“new_cluster”:new_cluster,“notebook_task”:{“notebook_path”:' /用戶/(電子郵件保護)/ PrepareData ',},}
#使用JSON參數初始化操作符的例子。notebook_task = DatabricksSubmitRunOperator (task_id =“notebook_task”,dag =十克,json = notebook_task_params)
在這段代碼中,JSON參數需要匹配的python字典運行提交端點。
添加另一個任務的下遊,我們實例化DatabricksSubmitRunOperator再使用特殊notebook_task set_downstream方法運算符實例注冊的依賴。
#使用DatabricksSubmitRunOperator的命名參數的例子#初始化操作符。spark_jar_task = DatabricksSubmitRunOperator (task_id =“spark_jar_task”,dag =十克,new_cluster = new_cluster,spark_jar_task = {“main_class_name”:“com.example.ProcessData”},庫= [{“罐子”:“dbfs: / lib / etl-0.1.jar”}])
notebook_task.set_downstream (spark_jar_task)
這個任務運行jar位於dbfs: / lib / etl-0.1.jar。
notebook_task注意到,我們使用了JSON參數指定的完整規範提交spark_jar_task端點和運行,我們夷為平地的頂級鍵提交端點跑進參數DatabricksSubmitRunOperator。雖然兩種方法實例化操作符是等價的,後一種方法不允許您使用任何新頂級域spark_python_task或spark_submit_task。有關的全部API的更多詳細信息DatabricksSubmitRunOperator,請看看文件在這裏。
現在我們有十克,在氣流中創建一個目錄來安裝它~ /氣流
被稱為~ /氣流/熟練的技藝
和DAG複製到該目錄。
在這一點上,氣流應該能夠撿起DAG。
$氣流list_dags [10:27:13](2017年-07年-06年10:27:23,868年]{__init__ . py:57}信息- - - - - -使用遺囑執行人SequentialExecutor(2017年-07年-06年10:27:24,238年]{models.py:168年}信息- - - - - -填補了DagBag從/用戶/安德魯/氣流/熟練的技藝
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -熟練的技藝- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -example_bash_operatorexample_branch_dop_operator_v3example_branch_operatorexample_databricks_operator
我們也可以可視化web UI的DAG。開始,氣流運行網絡服務器和連接到localhost: 8080。點擊進入“example_databricks_operator”,你會看到許多可視化DAG。下麵是例子:
在這一點上,一個認真的觀察者可能也注意到,我們不指定信息,如主機名、用戶名和密碼磚碎片在DAG。配置這個我們使用連接原始的氣流,使我們能夠從我們的DAG參考憑證存儲在數據庫中。默認情況下,所有DatabricksSubmitRunOperator設置databricks_conn_id參數“databricks_default”,所以對於我們的DAG,我們必須添加一個連接與ID“databricks_default。”
最簡單的方法是通過web UI。點擊進入“Admin”上麵,然後“連接”下拉會告訴你所有你當前的連接。對於我們的用例,我們將添加一個“databricks_default連接。“最後的連接應該是這樣的:
現在我們已經設置為我們的DAG的一切,是時候來測試每個任務。notebook_task我們會跑,氣流測試example_databricks_operator notebook_task 2017-07-01
和spark_jar_task
我們將會運行氣流測試example_databricks_operator spark_jar_task 2017-07-01。
的DAG調度運行,你會調用調度程序與命令氣流調度器守護進程。
如果一切順利,調度程序啟動後,您應該能夠看到回填的DAG開始運行在web UI。
下一個步驟
總之,這篇文章提供了一個簡單的例子,建立氣流與磚的集成。它演示了如何磚擴展與氣流通過磚允許訪問和集成運行提交API調用計算磚平台上。Beplay体育安卓版本更多詳細說明如何設置生產氣流部署,請看看官方的氣流的文檔。
同樣的,如果你想嚐試本教程在磚上,注冊一個今天的免費試用。