airflow调度管理所有的tasks和DAGs。它监控并与DAG目录保持同步,并定期查看active tasks是否被trigger。

我们注意到DAG的schedule_interval 参数,这里设置为1day。如果我们指定它在2016-01-01 运行,它将会在

2016-01-01T23:59 被trigger。

scheduler会启动airflow.cfg 配置的executor,如果它是LocalExector,任务会作为子进程运行。如果是CeleryExecutor和

MesosExecutor,任务会远程执行。

运行scheduler,只需要

airflow scheduler

DAG Runs

DAG Run是某时DAG的实例化。DAG的schedule_interval 可以是crob表达式字符串,或者是datetime.timedelta对象,或者是预置的crob参数:None、@once、@hourly、@daily、@weekly、@monthly、@yearly

DAG run的运行状态包括:running、failed、success

Backfill与Catchup

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 12, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': '@hourly',
}

dag = DAG('tutorial', catchup=False, default_args=default_args)

上面的示例,如果DAG在2016-01-02 6 .a.m被scheduler获取,那么DAG Run会被创建,execution_date 是2016-01-01.

当在2016-01-03凌晨执行第2个时,执行时间是2016-01-02.

如果dag.catchup 设置为True,则会创建2015-12-01到2016-01-02所有时间间隔的DAG Run。

外部触发

DAG Runs同样可以通过airflow trigger_dag 创建。

注意,第一个DAG Run 会根据DAG中start_date最小值创建。

而接下来的DAG Runs会通过scheduler进程,依据DAG的scheduler_interval 创建。

results matching ""

    No results matching ""