Launch Plans#

A launch plan is a template for a workflow invocation. It combines:

  • A reference to a workflow

  • A (possibly partial) set of inputs required to initiate that workflow

  • Optionally, notifications and schedules

When invoked, the launch plan starts the workflow, passing the inputs as parameters. If the launch plan does not contain the entire set of required workflow inputs, additional input arguments must be provided at execution time.

Every workflow automatically comes with a default launch plan. This launch plan does not define any default inputs, so they must all be provided at execution time. A default launch plan always has the same name as its workflow.

Like tasks and workflows, launch plans are versioned. A launch plan can be updated to change, for example, the set of inputs, the schedule, or the notifications. Each update creates a new version of the launch plan.

Additional launch plans, other than the default one, can be defined for any workflow. In general, a given workflow can be associated with multiple launch plans, but a given launch plan is always associated with exactly one workflow.

Defining a launch plan#

A launch plan is defined using the flytekit.LaunchPlan class.

For example:

from flytekit import task, workflow, LaunchPlan

@task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@workflow
def my_workflow(a: int, b: int, c: int) -> int:
    return my_task(a=a, b=b, c=c)

LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5}
)

In this example, we define a task my_task and a workflow my_workflow. We then define a launch plan for my_workflow. The launch plan is declared by calling LaunchPlan.get_or_create with the workflow name, launch plan name, and the desired default and fixed inputs.

Registering a launch plan#

Registering a launch plan on the command line with pyflyte or uctl#

In most cases, launch plans are defined alongside the workflows and tasks in your project code and registered as a bundle with the other entities using pyflyte or uctl( See Registering workflows).

Registering a launch plan in Python with UnionRemote#

As with all Union command line actions, you can also perform registration of launch plans programmatically with UnionRemote, specifically, UnionRemote.register_launch_plan.

Results of registration#

When the code above is registered to Union, it results in the creation of four objects:

  • The task workflows.launch_plan_example.my_task

  • The workflow workflows.launch_plan_example.my_workflow

  • The default launch plan workflows.launch_plan_example.my_workflow (notice that it has the same name as the workflow)

  • The custom launch plan my_workflow_custom_lp (this is the one we defined in the code above)

Changing a launch plan#

Launch plans are changed by altering their definition in code and re-registering. When a launch plan with the same project, domain, and name as a preexisting one is re-registered, a new version of that launch plan is created.

Launch plan inputs#

A launch plan can be defined with:

  • Some (or all) workflow inputs unspecified. These must be provided at execution time. In the UI they appear in the Launch Workflow dialog as empty input fields.

  • Some (or all) workflow inputs specified as default, with values. These may be provided as overrides at execution time, but if not, the default values are used. In the UI they appear in the Launch Workflow dialog as pre-filled input fields that can be changed.

  • Some (or all) workflow inputs specified as fixed, with values. These cannot be overridden at execution time. They are not shown in the Launch Workflow dialog.

Notifications#

A launch plan may also be associated with one or more notifications. When the workflow is completed, the notifications are triggered.

There are three types of notifications:

  • Email: Sends an email to the specified recipients.

  • PagerDuty: Sends a PagerDuty notification to the PagerDuty service (with recipients specified). PagerDuty then forwards the notification as per your PagerDuty configuration.

  • Slack: Sends a Slack notification to the email address of a specified channel. This requires that you configure your Slack account to accept notifications.

Separate notifications can be sent depending on the specific end state of the workflow. The options are:

  • WorkflowExecutionPhase.ABORTED

  • WorkflowExecutionPhase.FAILED

  • WorkflowExecutionPhase.SUCCEEDED

  • WorkflowExecutionPhase.TIMED_OUT

For example:

from flytekit import (
    task,
    workflow,
    LaunchPlan,
    WorkflowExecutionPhase,
    Email,
    PagerDuty,
    Slack
)
from datetime import timedelta

@task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:
    return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"

LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    notifications=[
        Email(
            phases=[WorkflowExecutionPhase.FAILED],
            recipients_email=["[email protected]", "[email protected]"],
        ),
        PagerDuty(
            phases=[WorkflowExecutionPhase.SUCCEEDED],
            recipients_email=["[email protected]"],
        ),
        Slack(
            phases=[
                WorkflowExecutionPhase.SUCCEEDED,
                WorkflowExecutionPhase.ABORTED,
                WorkflowExecutionPhase.TIMED_OUT,
            ],
            recipients_email=["your_slack_channel_email"],
        ),
    ],
)

Schedules#

Launch plans also let you schedule the invocation of your workflows. A launch plan can be associated with one or more schedules, where at most one schedule is active at any one time. If a schedule is activated on a launch plan, the workflow will be invoked automatically by the system at the scheduled time with the inputs provided by the launch plan.

To add a schedule to a launch plan, add a schedule object to the launch plan, like this:

from flytekit import task, workflow, LaunchPlan, FixedRate
from datetime import timedelta

@task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@workflow
def my_workflow(a: int, b: int, c: int) -> int:
    return my_task(a=a, b=b, c=c)

LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    schedule=FixedRate(
        duration=timedelta(minutes=10)
    )
)

Here we specify a FixedRate schedule that will invoke the workflow every 10 minutes. Fixed rate schedules can also be defined using days or hours.

Alternatively, you can specify a CronSchedule that uses the Unix standard cron format(See crontab guru for a handy helper for cron expressions):

from flytekit import task, workflow, LaunchPlan, CronSchedule

@task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@workflow
def my_workflow(a: int, b: int, c: int) -> int:
    return my_task(a=a, b=b, c=c)

LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    schedule=CronSchedule(
        schedule="*/10 * * * *"
    )
)

kickoff_time_input_arg#

Both FixedRate and CronSchedule can take an optional parameter called kickoff_time_input_arg

This parameter is used to specify the name of a workflow input argument. Each time the system invokes the workflow via this schedule, the time of the invocation will be passed to the workflow through the specified parameter. For example:

from flytekit import task, workflow, LaunchPlan, FixedRate
from datetime import datetime, timedelta

@task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:
    return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"

LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    schedule=FixedRate(
        duration=timedelta(minutes=10),
        kickoff_time_input_arg="kickoff_time"
    )
)

Here, each time the schedule calls my_workflow, the invocation time is passed in the kickoff_time argument.