Mapping over launch plans#
You can map over launch plans the same way you can map over tasks to execute workflows in parallel across a series of inputs.
You can either map over a LaunchPlan
object defined in one of your Python modules or a reference launch plan that points to a previously registered launch plan.
Launch plan defined in your code#
Here we define a workflow called interest_workflow
that we want to parallelize, along with a launch plan called interest_workflow_lp
, in a file we’ll call map_interest_wf.py
.
We then write a separate workflow, map_interest_wf
, that uses a map_task
to parallelize interest_workflow
over a list of inputs.
from flytekit import task, workflow, LaunchPlan, map_task
# Task to calculate monthly interest payment on a loan
@task
def calculate_interest(principal: int, rate: float, time: int) -> float:
return (principal * rate * time) / 12
# Workflow using the calculate_interest task
@workflow
def interest_workflow(principal: int, rate: float, time: int) -> float:
return calculate_interest(principal=principal, rate=rate, time=time)
# Create LaunchPlan for interest_workflow
lp = LaunchPlan.get_or_create(
workflow=interest_workflow,
name="interest_workflow_lp",
)
# Mapping over the launch plan to calculate interest for multiple loans
@workflow
def map_interest_wf() -> list[float]:
principal = [1000, 5000, 10000]
rate = [0.05, 0.04, 0.03] # Different interest rates for each loan
time = [12, 24, 36] # Loan periods in months
return map_task(lp)(principal=principal, rate=rate, time=time)
You can run the map_interest
workflow locally:
union run map_interest_wf.py map_interest_wf
You can also run the map_interest
workflow remotely on Union:
union run --remote map_interest_wf.py map_interest_wf
Previously registered launch plan#
To demonstrate the ability to map over previously registered launch plans, in this example, we map over the simple_wf
launch plan from the basic workflow example in the Flytesnacks repository.
Recall that when a workflow is registered, an associated launch plan is created automatically. One of these launch plans will be leveraged in this example, though custom launch plans can also be used.
Clone the Flytesnacks repository:
git clone [email protected]:flyteorg/flytesnacks.git
Navigate to the
basics
directory:cd flytesnacks/examples/basics
Register the
simple_wf
workflow:union register --project flytesnacks --domain development --version v1 basics/workflow.py
Note that the
simple_wf
workflow is defined as follows:@workflow def simple_wf(x: list[int], y: list[int]) -> float: slope_value = slope(x=x, y=y) intercept_value = intercept(x=x, y=y, slope=slope_value) return intercept_value
Create a file called
map_simple_wf.py
and copy the following code into it:from flytekit import reference_launch_plan, workflow, map_task @reference_launch_plan( project="flytesnacks", domain="development", name="basics.workflow.simple_wf", version="v1", ) def simple_wf_lp( x: list[int], y: list[int] ) -> float: pass @workflow def map_simple_wf() -> list[float]: x = [[-3, 0, 3], [-8, 2, 4], [7, 3, 1]] y = [[7, 4, -2], [-2, 4, 7], [3, 6, 4]] return map_task(simple_wf_lp)(x=x, y=y)
Note the fact that the reference launch plan has an interface that corresponds exactly to the registered
simple_wf
we wish to map over.Register the
map_simple_wf
workflow. Reference launch plans cannot be run locally, so we will register themap_simple_wf
workflow to Union and run it remotely.union register map_simple_wf.py
In the Union UI, run the
map_simple_wf
workflow.