Schedules
Launch plans let you schedule the invocation of your workflows.
A launch plan can be associated with one or more schedules, where at most one schedule is active at any one time.
If a schedule is activated on a launch plan, the workflow will be invoked automatically by the system at the scheduled time with the inputs provided by the launch plan. Schedules can be either fixed-rate or cron
-based.
To set up a schedule, you can use the schedule
parameter of the LaunchPlan.get_or_create()
method.
Fixed-rate schedules
In the following example we add a FixedRate that will invoke the workflow every 10 minutes.
from datetime import timedelta
import flytekit as fl
from flytekit import FixedRate
@fl.task
def my_task(a: int, b: int, c: int) -> int:
return a + b + c
@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:
return my_task(a=a, b=b, c=c)
fl.LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_workflow_custom_lp",
fixed_inputs={"a": 3},
default_inputs={"b": 4, "c": 5},
schedule=FixedRate(
duration=timedelta(minutes=10)
)
)
Above, we defined the duration of the FixedRate
schedule using minutes
.
Fixed rate schedules can also be defined using days
or hours
.
Cron schedules
A
CronSchedule
allows you to specify a schedule using a cron
expression:
import flytekit as fl
from flytekit import CronSchedule
@fl.task
def my_task(a: int, b: int, c: int) -> int:
return a + b + c
@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:
return my_task(a=a, b=b, c=c)
fl.LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_workflow_custom_lp",
fixed_inputs={"a": 3},
default_inputs={"b": 4, "c": 5},
schedule=CronSchedule(
schedule="*/10 * * * *"
)
)
Cron expression format
A cron
expression is a string that defines a schedule using five space-separated fields, each representing a time unit.
The format of the string is:
minute hour day-of-month month day-of-week
Each field can contain values and special characters. The fields are defined as follows:
Field | Values | Special characters |
---|---|---|
minute |
0-59 |
* / , - |
hour |
0-23 |
* / , - |
day-of-month |
1-31 |
* / , - ? |
month |
1-12 or JAN-DEC |
* / , - |
day-of-week |
0-6 or SUN-SAT |
* / , - ? |
-
The
month
andday-of-week
abbreviations are not case-sensitive. -
The
,
(comma) is used to specify multiple values. For example, in themonth
field,JAN,FEB,MAR
means every January, February, and March. -
The
-
(dash) specifies a range of values. For example, in theday-of-month
field,1-15
means every day from1
through15
of the specified month. -
The
*
(asterisk) specifies all values of the field. For example, in thehour
field,*
means every hour (on the hour), from0
to23
. You cannot use*
in both theday-of-month
andday-of-week
fields in the samecron
expression. If you use it in one, you must use?
in the other. -
The
/
(slash) specifies increments. For example, in theminute
field,1/10
means every tenth minute, starting from the first minute of the hour (that is, the 11th, 21st, and 31st minute, and so on). -
The
?
(question mark) specifies any value of the field. For example, in theday-of-month
field you could enter7
and, if any day of the week was acceptable, you would enter?
in theday-of-week
field.
Cron expression examples
Expression | Description |
---|---|
0 0 * * * |
Midnight every day. |
0 12 * * MON-FRI |
Noon every weekday. |
0 0 1 * * |
Midnight on the first day of every month. |
0 0 * JAN,JUL * |
Midnight every day in January and July. |
*/5 * * * * |
Every five minutes. |
30 2 * * 1 |
At 2:30 AM every Monday. |
0 0 15 * ? |
Midnight on the 15th of every month. |
Cron aliases
The following aliases are also available.
An alias is used in place of an entire cron
expression.
Alias | Description | Equivalent to |
---|---|---|
@yearly |
Once a year at midnight at the start of 1 January. | 0 0 1 1 * |
@monthly |
Once a month at midnight at the start of first day of the month. | 0 0 1 * * |
@weekly |
Once a week at midnight at the start of Sunday. | 0 0 * * 0 |
@daily |
Once a day at midnight. | 0 0 * * * |
@hourly |
Once an hour at the beginning of the hour. | 0 * * * * |
kickoff_time_input_arg
Both FixedRate
and CronSchedule
can take an optional parameter called kickoff_time_input_arg
This parameter is used to specify the name of a workflow input argument. Each time the system invokes the workflow via this schedule, the time of the invocation will be passed to the workflow through the specified parameter. For example:
from datetime import datetime, timedelta
import flytekit as fl
from flytekit import FixedRate
@fl.task
def my_task(a: int, b: int, c: int) -> int:
return a + b + c
@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:
return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"
fl.LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_workflow_custom_lp",
fixed_inputs={"a": 3},
default_inputs={"b": 4, "c": 5},
schedule=FixedRate(
duration=timedelta(minutes=10),
kickoff_time_input_arg="kickoff_time"
)
)
Here, each time the schedule calls my_workflow
, the invocation time is passed in the kickoff_time
argument.