airflow调度管理所有的tasks和DAGs。它监控并与DAG目录保持同步,并定期查看active tasks是否被trigger。
我们注意到DAG的schedule_interval 参数,这里设置为1day。如果我们指定它在2016-01-01 运行,它将会在
2016-01-01T23:59 被trigger。
scheduler会启动airflow.cfg 配置的executor,如果它是LocalExector,任务会作为子进程运行。如果是CeleryExecutor和
MesosExecutor,任务会远程执行。
运行scheduler,只需要
airflow scheduler
DAG Runs
DAG Run是某时DAG的实例化。DAG的schedule_interval 可以是crob表达式字符串,或者是datetime.timedelta对象,或者是预置的crob参数:None、@once、@hourly、@daily、@weekly、@monthly、@yearly
DAG run的运行状态包括:running、failed、success
Backfill与Catchup
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
上面的示例,如果DAG在2016-01-02 6 .a.m被scheduler获取,那么DAG Run会被创建,execution_date 是2016-01-01.
当在2016-01-03凌晨执行第2个时,执行时间是2016-01-02.
如果dag.catchup 设置为True,则会创建2015-12-01到2016-01-02所有时间间隔的DAG Run。
外部触发
DAG Runs同样可以通过airflow trigger_dag 创建。
注意,第一个DAG Run 会根据DAG中start_date最小值创建。
而接下来的DAG Runs会通过scheduler进程,依据DAG的scheduler_interval 创建。