Scope

严格的说,DAG必须出现在globals() ,但是下面的示例,只有dag_1会加载,而第2个dag只会出现在local scope

dag_1 = DAG('this_dag_will_be_discovered')

def my_function()
    dag_2 = DAG('but_this_dag_will_not')

my_function()

Context Manager

Airflow 1.8新增,DAG可以用作context manager,并自动为其assign新的operator

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    op = DummyOperator('op')

op.dag is dag # True

DAG Assignment

Airflow 1.8新增。

Operator没有必要立即赋予DAG,一旦operator被赋予DAG,它将不能transfer或unassigned

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)

# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

Bitshift Composition

Airflow 1.8新增。

传统的,operator关系是通过set_upstream()set_downstream()设置。

op1 >> op2
op1.set_downstream(op2)

op2 << op1
op2.set_upstream(op1)

也可以

op1 >> op2 >> op3 << op4

位移运算甚至可以用于DAG

dag >> op1 >> op2

在pipeline中使用位移运算

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    (
        DummyOperator(task_id='dummy_1')
        >> BashOperator(
            task_id='bash_1',
            bash_command='echo "HELLO!"')
        >> PythonOperator(
            task_id='python_1',
            python_callable=lambda: print("GOODBYE!"))
    )

Pools

airflow的pools用于限制执行的并发。

aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

Connections

Queue

当使用CeleryExecutor时,任务会被发送到celery queues。queue是BaseOperator的属性,因此每个任务都可以设置queue。

默认的queue是在airflow.cfg 定义的celery->default_queue

XComs

XComs可以在任务之间推送消息。

# inside a PythonOperator called 'pushing_task'
def push_function():
    return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='pushing_task')

Variables

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

Branching

可以通过BranchPythonOperator实现分支。

当需要跳过某些任务时,不能使用空路径,而是创建dummy task

而不是

SubDAGs

此时可以将并行的task合并为SubDAG

subdag文件

#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator


# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),
    schedule_interval=schedule_interval,
    start_date=start_date,
  )

  dummy_operator = DummyOperator(
    task_id='dummy_task',
    dag=dag,
  )

  return dag

maindag文件

# main_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag


PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'

main_dag = DAG(
  dag_id=PARENT_DAG_NAME,
  schedule_interval=timedelta(hours=1),
  start_date=datetime(2016, 1, 1)
)

sub_dag = SubDagOperator(
  subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
                 main_dag.schedule_interval),
  task_id=CHILD_DAG_NAME,
  dag=main_dag,
)

Trigger Rules

Airflow支持复杂的依赖配置。所有的Operator都有一个trigger_rule 参数,

默认值是all_success

Latest Run Only

#dags/latest_only_with_trigger.py
import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule


dag = DAG(
    dag_id='latest_only_with_trigger',
    schedule_interval=dt.timedelta(hours=4),
    start_date=dt.datetime(2016, 9, 20),
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)

task2 = DummyOperator(task_id='task2', dag=dag)

task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])

task4 = DummyOperator(task_id='task4', dag=dag,
                      trigger_rule=TriggerRule.ALL_DONE)
task4.set_upstream([task1, task2])

Zombies & Undeads

Zombie任务确实心跳,在数据库维持running状态。

Undead进程的特征是进程存在,存在心跳,但是Airflow却无法察觉到它。

Packaged dags

虽然,通常会将dag放置于单个py文件,当需要将多个dags合并在一起时

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

results matching ""

    No results matching ""