Airflow - version: ** 1.10 .10 **
Cron: "30 07,08,11,15 * * *"
code:
CRON_SCHEDULER = "30 07,08,11,15 * * *"
with DAG(
"cron_test_dag",
schedule_interval = CRON_SCHEDULER,
start_date = pendulum.datetime(2021, 1, 1, tz = "UTC"),
catchup = False) as dag:
start_job = DummyOperator(task_id = 'Start_Job', dag = dag)
bash_task1 = BashOperator(
task_id = "bash_task1",
bash_command = "echo hello world",
dag = dag,
)
start_job >> bash_task1
Scheduling & Triggers,Scheduling & Triggers DAG Runs Backfill and Catchup External Triggers To Keep in Mind ,To start a scheduler, simply run the command:
airflow scheduler
"" " Code that goes along with the Airflow tutorial located at: https: //github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py "" " from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 12, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes = 5), 'schedule_interval': '@hourly', } dag = DAG('tutorial', catchup = False, default_args = default_args)
Airflow can utilize cron presets for common, basic schedules.,For example, schedule_interval='@hourly' will schedule the DAG to run at the beginning of every hour. For the full list of presets, check out the Airflow documentation. If your DAG does not need to run on a schedule and will only be triggered manually or externally triggered by another process, you can set schedule_interval=None.,For pipelines with basic schedules, you can define a schedule_interval in your DAG. For versions of Airflow prior to 2.2, this is the only mechanism for defining a DAG’s schedule.,You can pass any cron expression as a string to the schedule_interval parameter in your DAG. For example, if you want to schedule your DAG at 4:05 AM every day, you would use schedule_interval='5 4 * * *'.
With that in mind, first we’ll define next_dagrun_info
. This method provides Airflow with the logic to calculate the data interval for scheduled runs. It also contains logic to handle the DAG’s start_date
, end_date
, and catchup
parameters. To implement the logic in this method, we use the Pendulum package, which makes dealing with dates and times simple. The method looks like this:
def next_dagrun_info( self, *, last_automated_data_interval: Optional[DataInterval], restriction: TimeRestriction, ) - > Optional[DagRunInfo]: if last_automated_data_interval is not None: # There was a previous run on the regular schedule. last_start = last_automated_data_interval.start delta = timedelta(days = 1) if last_start.hour == 6: # If previous period started at 6: 00, next period will start at 16: 30 and end at 6: 00 following day next_start = last_start.set(hour = 16, minute = 30).replace(tzinfo = UTC) next_end = (last_start + delta).replace(tzinfo = UTC) else: # If previous period started at 16: 30, next period will start at 6: 00 next day and end at 16: 30 next_start = (last_start + delta).set(hour = 6, minute = 0).replace(tzinfo = UTC) next_end = (last_start + delta).replace(tzinfo = UTC) else: # This is the first ever run on the regular schedule.First data interval will always start at 6: 00 and end at 16: 30 next_start = restriction.earliest if next_start is None: # No start_date.Don 't schedule. return None if not restriction.catchup: # If the DAG has catchup = False, today is the earliest to consider. next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo = UTC)) next_start = next_start.set(hour = 6, minute = 0).replace(tzinfo = UTC) next_end = next_start.set(hour = 16, minute = 30).replace(tzinfo = UTC) if restriction.latest is not None and next_start > restriction.latest: return None # Over the DAG 's scheduled end; don' t schedule. return DagRunInfo.interval(start = next_start, end = next_end)
Then we define the data interval for manually triggered DAG Runs by defining the infer_manual_data_interval
method. The code looks like this:
def infer_manual_data_interval(self, run_after: DateTime) - > DataInterval:
delta = timedelta(days = 1)
# If time is between 6: 00 and 16: 30, period ends at 6 am and starts at 16: 30 previous day
if run_after >= run_after.set(hour = 6, minute = 0) and run_after <= run_after.set(hour = 16, minute = 30):
start = (run_after - delta).set(hour = 16, minute = 30, second = 0).replace(tzinfo = UTC)
end = run_after.set(hour = 6, minute = 0, second = 0).replace(tzinfo = UTC)
# If time is after 16: 30 but before midnight, period is between 6: 00 and 16: 30 the same day
elif run_after >= run_after.set(hour = 16, minute = 30) and run_after.hour <= 23:
start = run_after.set(hour = 6, minute = 0, second = 0).replace(tzinfo = UTC)
end = run_after.set(hour = 16, minute = 30, second = 0).replace(tzinfo = UTC)
# If time is after midnight but before 6: 00, period is between 6: 00 and 16: 30 the previous day
else:
start = (run_after - delta).set(hour = 6, minute = 0).replace(tzinfo = UTC)
end = (run_after - delta).set(hour = 16, minute = 30).replace(tzinfo = UTC)
return DataInterval(start = start, end = end)
In the DAG, we can then import the custom timetable plugin and use it to schedule the DAG by setting the timetable
parameter:
from uneven_intervals_timetable
import UnevenIntervalsTimetable
with DAG(
dag_id = "example_timetable_dag",
start_date = datetime(2021, 10, 9),
max_active_runs = 1,
timetable = UnevenIntervalsTimetable(),
default_args = {
"retries": 1,
"retry_delay": timedelta(minutes = 3),
},
catchup = True
) as dag:
Note: The crontab.guru links were breaking anycodings_cron so I wrapped them in code blocks.,The schedule interval was updated to the anycodings_cron simpler 1 8 * * 1, which according to anycodings_cron https://crontab.guru/#1_8_*_*_1 is "At 08:01 anycodings_cron UTC (03:01 EST, 00:01 PST) on Monday".,However, this caused the DAG to trigger anycodings_cron every day at 08:01 UTC; the Monday condition anycodings_cron seemed to be ignored.,EDIT: The reason you do not see the run anycodings_cron execute on the 18th is you have anycodings_cron catchup=False
Here is the complete DAG/task definition anycodings_cron (without imports or specific names):
dag = DAG( dag_id = "dag", description = "dag", # At 08: 01 UTC(03: 01 EST, 00: 01 PST) on Monday #(https: //crontab.guru/#1_8_*_*_1) schedule_interval = "1 8 * * 1", catchup = False, ) task = PythonOperator( task_id = "handle", provide_context = True, python_callable = handle, dag = dag, retries = 2, retry_delay = timedelta(minutes = 15), start_date = datetime(2019, 2, 11, 0, 0, 0, 0, pytz.UTC), )