Dag,Directed Acyclic Graph,是以一定方式反映任务关系和依赖的,所有任务的集合。

DAG的实例叫做Dag Run,Task的实例叫做Task Instance。DAG会在python文件中定义,并放置在DAG_FOLDER .

下面以Airflow的Github的tutorial示例说明DAG。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

DAG脚本只是定义任务的执行流程,并不涉及真正的任务内容,因此在脚本中通常不能在不同task之间通信,如果需要通信则需要使用XCom。DAG脚本需要快速执行,因为scheduler通常会周期性的执行DAG脚本。

引入Modules

Airflow的pipeline就是python脚本,因此可以引入所需要的任何python类库。

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator

默认参数

当创建DAG和tasks时,允许为构造器传参,可以采用字典的形式,这很方便为不同的环境设置不同的字典。

from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

初始化DAG

DAG对象需要嵌入tasks,首先定义DAG对象

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

Tasks

当初始化Operator对象时,会生成Tasks。

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

task设置参数的优先级:

  • 直接传参

  • default_args 字典设置

  • operator的默认参数值

Jinja模板

Airflow充分利用Jinja模板,提供预置参数及宏表达式。Airflow提供hooks,用来自定义参数、宏和模板。

在tutorial实例中简单的说明Jinja模板的使用

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

依赖

在创建的任务之间,建立依赖。

t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')

运行脚本

在运行之前,先验证pipeline可以解析正确。首先将定义的tutorial.py存放在~/airflow/dags

python ~/airflow/dags/tutorial.py

验证脚本的Metadata

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

在某个特定时间点运行Task实例,此时设置的日期是execution_date ,它会让scheduler在特定的时间+time运行task或dag

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

渲染命令模板,并运行

# testing templated
airflow test tutorial templated 2015-06-01

airflow test 命令是在本地运行Task实例,并打印日志到stdout,它并不会影响依赖,也不会与database交流运行状态。

当Dag流程都准备完毕,可以运行backfill。backfill会考虑依赖,输出日志到文件,并通过database记录运行状态。当启动webserer时,可以追踪运行进度。

depends_on_past=True,只有前一个Task实例运行成功,才能运行当前实例。

但是如果Task设定start_date ,则依赖关系被忽略。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

results matching ""

    No results matching ""