Rerunning tasks or full DAGs in Airflow is a common workflow. Maybe one of your tasks failed due to an issue in an external system, and after fixing the problem you want to rerun that particular task instance.,Running DAGs whenever you want is one of the most powerful and flexible features of Airflow. Scheduling DAGs can ensure future DAG runs happen at the right time, but you also have options for running DAGs in the past. For example, you might need to run a DAG in the past if:,ProductAstroThe unified value of data orchestration.Why Apache AirflowThe de facto standard for expressing data flows as code.Why OpenLineageAn open framework for data lineage and observability.,Deploy a copy of the DAG with a new name and a start date that is the date you want to backfill to. Airflow will consider this a separate DAG so you won’t see all the DAG runs/task instances in the same place, but it would accomplish running the DAG for data in the desired time period.
You can also use the Airflow CLI to clear task statuses:
airflow tasks clear[-h][-R][-d][-e END_DATE][-X][-x][-f][-r]
[-s START_DATE][-S SUBDIR][-t TASK_REGEX][-u][-y]
dag_id
Catchup can be controlled by setting the parameter in your DAG’s arguments (by default, catchup is set to True
). For example, this DAG would not make use of catchup:
with DAG(
dag_id = "example_dag",
start_date = datetime(2021, 10, 9),
max_active_runs = 1,
timetable = UnevenIntervalsTimetable(),
default_args = {
"retries": 1,
"retry_delay": timedelta(minutes = 3),
},
catchup = False
) as dag:
Backfilling can be accomplished in Airflow using the CLI. You simply specify the DAG ID, as well as the start date and end date for the backfill period. This command runs the DAG for all intervals between the start date and end date. DAGs in your backfill interval are still rerun even if they already have DAG runs.
airflow dags backfill[-h][-c CONF][--delay - on - limit DELAY_ON_LIMIT][-x]
[-n][-e END_DATE][-i][-I][-l][-m][--pool POOL]
[--rerun - failed - tasks][--reset - dagruns][-B]
[-s START_DATE][-S SUBDIR][-t TASK_REGEX][-v][-y]
dag_id
The backfill command will re-run all the instances of the dag_id for all the intervals within the start date and end date.,For the specified dag_id and time interval, the command clears all instances of the tasks matching the regex. For more options, you can check the help of the clear command :,Future - All the instances of the task in the runs after the DAG’s most recent data interval,Past - All the instances of the task in the runs before the DAG’s most recent data interval
"" " Code that goes along with the Airflow tutorial located at: https: //github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py "" " from airflow.models.dag import DAG from airflow.operators.bash import BashOperator import datetime import pendulum dag = DAG( "tutorial", default_args = { "depends_on_past": True, "retries": 1, "retry_delay": datetime.timedelta(minutes = 3), }, start_date = pendulum.datetime(2015, 12, 1, tz = "UTC"), description = "A simple tutorial DAG", schedule_interval = "@daily", catchup = False, )
airflow dags backfill\
--start - date START_DATE\
--end - date END_DATE\
dag_id
airflow tasks clear dag_id\
--task - regex task_regex\
--start - date START_DATE\
--end - date END_DATE
airflow tasks clear--help
airflow dags trigger--exec - date logical_date run_id
import pendulum
from airflow
import DAG
from airflow.operators.bash
import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule_interval = None,
start_date = pendulum.datetime(2021, 1, 1, tz = "UTC"),
catchup = False,
)
parameterized_task = BashOperator(
task_id = "parameterized_task",
bash_command = "echo value: {{ dag_run.conf['conf1'] }}",
dag = dag,
)
You can make use of the Airflow CLI to run the backfill
command
airflow dags backfill <dag_id> --start-date START_DATE --end-date END_DATE
11 Dec 2019
1 docker exec - it container_id / bin / sh
1
docker exec - it container_id / bin / sh
1
docker exec - it container_id / bin / sh
1 airflow backfill - s 2019 - 01 - 01 - e 2019 - 01 - 04 test_dag
1
airflow backfill - s 2019 - 01 - 01 - e 2019 - 01 - 04 test_dag
airflow backfill - s 2019 - 01 - 01 - e 2019 - 01 - 04 test_dag
1
airflow backfill - s 2019 - 01 - 01 - e 2019 - 01 - 04--reset_dagruns test_dag
1
airflow backfill - s 2019 - 01 - 01 - e 2019 - 01 - 04--reset_dagruns test_dag
An Airflow DAG with a start_date, possibly an end_date, and a schedule_interval defines a series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been cleared). This concept is called Catchup.,When clearing a set of tasks’ state in hope of getting them to re-run, it is important to keep in mind the DAG Run’s state too as it defines whether the scheduler should look into triggering tasks for that run.,In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created, with an execution_date of 2016-01-01, and the next one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02.,Note that DAG Runs can also be created manually through the CLI while running an airflow trigger_dag command, where you can define a specific run_id. The DAG Runs created externally to the scheduler get associated to the trigger’s timestamp, and will be displayed in the UI alongside scheduled DAG runs.
airflow scheduler
"" " Code that goes along with the Airflow tutorial located at: https: //github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py "" " from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'Airflow', 'depends_on_past': False, 'start_date': datetime(2015, 12, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes = 5), 'schedule_interval': '@daily', } dag = DAG('tutorial', catchup = False, default_args = default_args)