Schedules#
Launch plans let you schedule the invocation of your workflows. A launch plan can be associated with one or more schedules, where at most one schedule is active at any one time. If a schedule is activated on a launch plan, the workflow will be invoked automatically by the system at the scheduled time with the inputs provided by the launch plan.
To add a schedule to a launch plan, add a schedule object to the launch plan, like this:
from flytekit import task, workflow, LaunchPlan, FixedRate
from datetime import timedelta
@task
def my_task(a: int, b: int, c: int) -> int:
return a + b + c
@workflow
def my_workflow(a: int, b: int, c: int) -> int:
return my_task(a=a, b=b, c=c)
LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_workflow_custom_lp",
fixed_inputs={"a": 3},
default_inputs={"b": 4, "c": 5},
schedule=FixedRate(
duration=timedelta(minutes=10)
)
)
Here we specify a FixedRate schedule that will invoke the workflow every 10 minutes. Fixed rate schedules can also be defined using days or hours.
Alternatively, you can specify a CronSchedule that uses the Unix standard cron format(See crontab guru for a handy helper for cron expressions):
from flytekit import task, workflow, LaunchPlan, CronSchedule
@task
def my_task(a: int, b: int, c: int) -> int:
return a + b + c
@workflow
def my_workflow(a: int, b: int, c: int) -> int:
return my_task(a=a, b=b, c=c)
LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_workflow_custom_lp",
fixed_inputs={"a": 3},
default_inputs={"b": 4, "c": 5},
schedule=CronSchedule(
schedule="*/10 * * * *"
)
)
kickoff_time_input_arg#
Both FixedRate
and CronSchedule
can take an optional parameter called kickoff_time_input_arg
This parameter is used to specify the name of a workflow input argument. Each time the system invokes the workflow via this schedule, the time of the invocation will be passed to the workflow through the specified parameter. For example:
from flytekit import task, workflow, LaunchPlan, FixedRate
from datetime import datetime, timedelta
@task
def my_task(a: int, b: int, c: int) -> int:
return a + b + c
@workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:
return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"
LaunchPlan.get_or_create(
workflow=my_workflow,
name="my_workflow_custom_lp",
fixed_inputs={"a": 3},
default_inputs={"b": 4, "c": 5},
schedule=FixedRate(
duration=timedelta(minutes=10),
kickoff_time_input_arg="kickoff_time"
)
)
Here, each time the schedule calls my_workflow
, the invocation time is passed in the kickoff_time
argument.