Scope
严格的说,DAG必须出现在globals()
,但是下面的示例,只有dag_1会加载,而第2个dag只会出现在local scope
dag_1 = DAG('this_dag_will_be_discovered')
def my_function()
dag_2 = DAG('but_this_dag_will_not')
my_function()
Context Manager
Airflow 1.8新增,DAG可以用作context manager,并自动为其assign新的operator
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
op = DummyOperator('op')
op.dag is dag # True
DAG Assignment
Airflow 1.8新增。
Operator没有必要立即赋予DAG,一旦operator被赋予DAG,它将不能transfer或unassigned
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
Bitshift Composition
Airflow 1.8新增。
传统的,operator关系是通过set_upstream()
和set_downstream()
设置。
op1 >> op2
op1.set_downstream(op2)
op2 << op1
op2.set_upstream(op1)
也可以
op1 >> op2 >> op3 << op4
位移运算甚至可以用于DAG
dag >> op1 >> op2
在pipeline中使用位移运算
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
(
DummyOperator(task_id='dummy_1')
>> BashOperator(
task_id='bash_1',
bash_command='echo "HELLO!"')
>> PythonOperator(
task_id='python_1',
python_callable=lambda: print("GOODBYE!"))
)
Pools
airflow的pools用于限制执行的并发。
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
Connections
Queue
当使用CeleryExecutor时,任务会被发送到celery queues。queue是BaseOperator的属性,因此每个任务都可以设置queue。
默认的queue是在airflow.cfg
定义的celery->default_queue
XComs
XComs可以在任务之间推送消息。
# inside a PythonOperator called 'pushing_task'
def push_function():
return value
# inside another PythonOperator where provide_context=True
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='pushing_task')
Variables
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
Branching
可以通过BranchPythonOperator实现分支。
当需要跳过某些任务时,不能使用空路径,而是创建dummy task
而不是
SubDAGs
此时可以将并行的task合并为SubDAG
subdag文件
#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
dummy_operator = DummyOperator(
task_id='dummy_task',
dag=dag,
)
return dag
maindag文件
# main_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag
PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'
main_dag = DAG(
dag_id=PARENT_DAG_NAME,
schedule_interval=timedelta(hours=1),
start_date=datetime(2016, 1, 1)
)
sub_dag = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
main_dag.schedule_interval),
task_id=CHILD_DAG_NAME,
dag=main_dag,
)
Trigger Rules
Airflow支持复杂的依赖配置。所有的Operator都有一个trigger_rule
参数,
默认值是all_success
Latest Run Only
#dags/latest_only_with_trigger.py
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])
task4 = DummyOperator(task_id='task4', dag=dag,
trigger_rule=TriggerRule.ALL_DONE)
task4.set_upstream([task1, task2])
Zombies & Undeads
Zombie任务确实心跳,在数据库维持running状态。
Undead进程的特征是进程存在,存在心跳,但是Airflow却无法察觉到它。
Packaged dags
虽然,通常会将dag放置于单个py文件,当需要将多个dags合并在一起时
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py