編排與Apache氣流磚工作
這篇文章展示了一個示例的編排與Apache數據管道氣流磚工作。您還將了解如何設置氣流與磚的集成。編製管理工作複雜任務之間的依賴關係。
工作編製的數據管道
開發和部署一個數據處理管道通常需要管理複雜任務之間的依賴關係。例如,管道可能讀取數據從源,清理數據,將清洗數據,轉換後的數據寫入一個目標。您需要測試、進度和解決數據管道時實施。
工作流係統應對這些挑戰,允許您定義任務之間的依賴關係,安排當管道運行和監控工作流。Apache氣流是一個開源的解決方案來管理和調度的數據管道。氣流代表數據管道有向無環圖(無進取心的人)的操作。你定義一個工作流在Python文件和氣流管理調度和執行。氣流磚連接允許您利用優化的火花引擎提供的磚與氣流的調度功能。
安裝氣流磚集成
安裝氣流磚集成,打開終端並運行下列命令。一定要用您的用戶名和電子郵件在最後一行:
mkdir氣流cd氣流pipenv——python38 pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)pipenv安裝apache-airflow= =2.1.0 pipenv安裝apache-airflow-providers-databricks mkdir熟練的技藝氣流db init氣流用戶創建用戶名admin——firstname < firstname >——lastname < lastname >——角色管理——電子郵件your.com
當你複製和運行腳本,您執行這些步驟:
創建一個目錄命名
氣流
和更改到該目錄。使用
pipenv
創建和產卵Python虛擬環境。磚建議使用Python的虛擬環境隔離包版本和代碼依賴環境。這種隔離有助於減少意外包版本不匹配和代碼依賴碰撞。初始化一個環境變量命名
AIRFLOW_HOME
設置的路徑氣流
目錄中。安裝氣流,氣流磚提供者包。
創建一個
氣流/無進取心的人
目錄中。氣流使用熟練的技藝
目錄存儲DAG定義。初始化一個氣流SQLite數據庫用來跟蹤的元數據。在生產氣流部署,您將配置氣流與一個標準的數據庫。氣流的SQLite數據庫和默認配置部署中初始化
氣流
目錄中。創建管理員用戶對氣流。
安裝臨時演員例如,芹菜
,s3
,密碼
運行:
pip安裝“apache-airflow(磚、芹菜、s3、密碼)”
啟動氣流web服務器和調度器
氣流web服務器需要查看氣流UI。啟動web服務器,打開終端並運行下列命令:
氣流網絡服務器
調度器是氣流組件,日程安排熟練的技藝。要運行它,打開一個新的終端和運行以下命令:
pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)氣流調度器
氣流安裝進行測試
驗證氣流安裝,您可以運行一個例子與氣流無進取心的人包括:
在一個瀏覽器窗口,打開http://localhost: 8080 / home。氣流熟練的技藝屏幕上出現了。
單擊暫停/ Unpause DAG切換到unpause裝飾邊的一個例子,例如,
example_python_operator
。點擊觸發DAG的例子開始按鈕。
單擊DAG名稱查看詳細信息,包括DAG的運行狀態。
氣流運營商的磚
氣流磚集成提供了兩個不同的操作觸發的工作:
的DatabricksRunNowOperator需要一個現有的磚的工作和使用引發新工作運行(
帖子/工作/運行
)API請求觸發運行。磚推薦使用DatabricksRunNowOperator
因為它可以減少重複工作的定義和工作運行觸發這個操作符很容易找到的工作界麵。的DatabricksSubmitRunOperator不需要工作存在在磚和使用創建並觸發一次運行(
帖子/ /運行/提交工作
)API請求提交作業規範和觸發運行。
磚氣流操作符寫作業運行氣流日誌每一個頁麵的URLpolling_period_seconds
(默認是30秒)。有關更多信息,請參見apache-airflow-providers-databricks包在氣流的網站頁麵。
運行一個磚氣流的工作
下麵的例子演示了如何創建一個簡單的氣流部署在本地機器上運行和部署一個例子DAG觸發運行在磚。對於本例,您:
創建一個新的筆記本和添加代碼來打印一個問候根據配置參數。
創建一個磚工作運行筆記本的一個任務。
配置一個氣流聯係磚工作區。
創建一個氣流DAG觸發筆記本工作。你在一個Python腳本使用定義DAG
DatabricksRunNowOperator
。利用氣流UI觸發DAG並查看運行狀態。
創建一個筆記本
這個例子使用一個筆記本,其中包含兩個單元:
第一個單元格包含一個磚實用程序的文本小部件定義一個變量命名
問候
設置為默認值世界
。第二個單元格打印的值
問候
變量的前綴你好
。
創建筆記本:
去你的磚著陸頁麵並選擇創建空白筆記本,或點擊新在側邊欄並選擇筆記本。的創建筆記本對話框出現了。
在創建筆記本對話框中,給你的筆記本一個名字,例如你好氣流。集默認的語言來Python。離開集群設置為默認值。您將配置集群創建任務時使用這個筆記本。
點擊創建。
複製下麵的Python代碼粘貼到第一個單元格的筆記本。
dbutils。小部件。文本(“問候”,“世界”,“問候”)問候=dbutils。小部件。得到(“問候”)
添加一個新的細胞低於第一個單元格和下麵的Python代碼複製並粘貼到新的細胞:
打印(“你好{}”。格式(問候))
創建一個工作
點擊工作流在側邊欄。
點擊。
的任務選項卡顯示了創建任務對話框。
取代添加一個名稱為你的工作…對你的工作名稱。
在任務名稱字段中,輸入任務的名稱,例如,greeting-task。
在類型下拉,選擇筆記本。
使用文件瀏覽器來找到您創建的筆記本,點擊筆記本名稱,點擊確認。
點擊添加下參數。在關鍵字段中,輸入
問候
。在價值字段中,輸入氣流用戶
。點擊創建任務。
創建一個磚個人訪問令牌的氣流
氣流連接使用磚磚個人訪問令牌(PAT)。看到個人訪問令牌為創建一個帕特的說明。
配置一個磚連接
磚的氣流安裝包含一個默認的連接。更新連接連接到您的工作空間中使用上麵創建的個人訪問令牌:
在一個瀏覽器窗口,打開http://localhost: 8080 /聯係/清單/。
下康涅狄格州ID,定位databricks_default並單擊編輯記錄按鈕。
取代的價值主機場的工作區實例名你的磚部署。
在額外的字段中,輸入下列值:
{“令牌”:“PERSONAL_ACCESS_TOKEN”}
取代
PERSONAL_ACCESS_TOKEN
與你的磚個人訪問令牌。
創建一個新的氣流DAG
你在一個Python文件定義一個氣流DAG。創建一個DAG觸發的例子筆記本工作:
在文本編輯器中或者IDE,創建一個新文件命名
databricks_dag.py
用下麵的內容:從氣流進口DAG從airflow.providers.databricks.operators.databricks進口DatabricksRunNowOperator從airflow.utils.dates進口days_agodefault_args={“主人”:“氣流”}與DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=沒有一個,default_args=default_args)作為dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)
取代
JOB_ID
早些時候與工作ID的值保存。保存文件
氣流/無進取心的人
目錄中。氣流自動讀取和安裝DAG文件存儲在氣流/無進取心的人
。
安裝和驗證DAG的氣流
觸發並驗證的DAG氣流界麵:
在一個瀏覽器窗口,打開http://localhost: 8080 / home。氣流熟練的技藝屏幕上出現了。
定位
databricks_dag
並單擊暫停/ Unpause DAG切換unpause DAG。觸發DAG點擊開始按鈕。
點擊運行運行列視圖狀態和運行的細節。