Dag,Directed Acyclic Graph,是以一定方式反映任务关系和依赖的,所有任务的集合。
DAG的实例叫做Dag Run,Task的实例叫做Task Instance。DAG会在python文件中定义,并放置在DAG_FOLDER .
下面以Airflow的Github的tutorial示例说明DAG。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
DAG脚本只是定义任务的执行流程,并不涉及真正的任务内容,因此在脚本中通常不能在不同task之间通信,如果需要通信则需要使用XCom。DAG脚本需要快速执行,因为scheduler通常会周期性的执行DAG脚本。
引入Modules
Airflow的pipeline就是python脚本,因此可以引入所需要的任何python类库。
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
默认参数
当创建DAG和tasks时,允许为构造器传参,可以采用字典的形式,这很方便为不同的环境设置不同的字典。
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
初始化DAG
DAG对象需要嵌入tasks,首先定义DAG对象
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))
Tasks
当初始化Operator对象时,会生成Tasks。
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
task设置参数的优先级:
直接传参
default_args字典设置operator的默认参数值
Jinja模板
Airflow充分利用Jinja模板,提供预置参数及宏表达式。Airflow提供hooks,用来自定义参数、宏和模板。
在tutorial实例中简单的说明Jinja模板的使用
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
依赖
在创建的任务之间,建立依赖。
t2.set_upstream(t1)
# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)
t3.set_upstream(t1)
# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')
运行脚本
在运行之前,先验证pipeline可以解析正确。首先将定义的tutorial.py存放在~/airflow/dags
python ~/airflow/dags/tutorial.py
验证脚本的Metadata
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
在某个特定时间点运行Task实例,此时设置的日期是execution_date ,它会让scheduler在特定的时间+time运行task或dag
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
渲染命令模板,并运行
# testing templated
airflow test tutorial templated 2015-06-01
airflow test 命令是在本地运行Task实例,并打印日志到stdout,它并不会影响依赖,也不会与database交流运行状态。
当Dag流程都准备完毕,可以运行backfill。backfill会考虑依赖,输出日志到文件,并通过database记录运行状态。当启动webserer时,可以追踪运行进度。
当depends_on_past=True,只有前一个Task实例运行成功,才能运行当前实例。
但是如果Task设定start_date ,则依赖关系被忽略。
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07