Examples#

Hello world#

The following example shows how to create a basic ActorEnvironment and use it for one task:

hello_world.py#
import os

from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment

image = ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=Resources(
        cpu="2",
        mem="300Mi",
    ),
    container_image=image,
)


@actor.task
def say_hello() -> str:
    return "hello"


@workflow
def wf():
    say_hello()

Multiple instances of the same task#

In this example, the actor.task-decorated task is invoked multiple times in one workflow, and will use the same ActorEnvironment on each invocation:

plus_one.py#
import os

from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment

image = ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=300,
    requests=Resources(cpu="2", mem="500Mi"),
    container_image=image,
)


@actor.task
def plus_one(input: int) -> int:
    return input + 1


@workflow
def wf(input: int = 0) -> int:
    a = plus_one(input=input)
    b = plus_one(input=a)
    c = plus_one(input=b)
    return plus_one(input=c)

Multiple tasks#

Every task execution in the following example will execute in the same ActorEnvironment. You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launchplans:

multiple_tasks.py#
import os

from flytekit import current_context, workflow, LaunchPlan, Resources, ImageSpec
from union.actor import ActorEnvironment

image = ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=Resources(cpu="1", mem="450Mi"),
    container_image=image,
)


@actor.task
def say_hello(name: str) -> str:
    return f"hello {name}"


@actor.task
def scream_hello(name: str) -> str:
    return f"HELLO {name}"


@workflow
def my_child_wf(name: str) -> str:
    return scream_hello(name=name)


my_child_wf_lp = LaunchPlan.get_default_launch_plan(current_context(),
                                                    my_child_wf)


@workflow
def my_parent_wf(name: str) -> str:
    a = say_hello(name=name)
    b = my_child_wf(name=a)
    return my_child_wf_lp(name=b)

Custom PodTemplates#

Both tasks in the following example will be executed in the same ActorEnvironment, which is created with a PodTemplate for additional configuration.

pod_template.py#
import os
from kubernetes.client.models import (
    V1Container,
    V1PodSpec,
    V1ResourceRequirements,
    V1EnvVar,
)
from flytekit import workflow, ImageSpec, PodTemplate
from union.actor import ActorEnvironment

image = ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union", "flytekitplugins-pod"],
)

pod_template = PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                image=image,
                resources=V1ResourceRequirements(
                    requests={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                    limits={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                ),
                env=[V1EnvVar(name="COMP_KEY_EX", value="compile_time")],
            ),
        ],
    ),
)

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    pod_template=pod_template,
)

@actor.task
def get_and_set() -> str:
    os.environ["RUN_KEY_EX"] = "run_time"
    return os.getenv("COMP_KEY_EX")


@actor.task
def check_set() -> str:
    return os.getenv("RUN_KEY_EX")


@workflow
def wf() -> tuple[str,str]:
    return get_and_set(), check_set()

Refactoring from Regular Tasks to Actors#

Notice that converting a non-actor workflow to use actors is as simple as replacing the @flytekit.task decorator with the @actor_env.task decorator. Additionally, task decorator arguments can be moved either to the actor environment or the actor task decorator, depending on whether they apply to the entire environment (e.g., resource specifications) or to a single task execution (e.g., caching arguments).

import flytekit as fl
+from union.actor import ActorEnvironment
+
+actor_env = ActorEnvironment(
+    name = "myenv",
+    replica_count = 10,
+    ttl_seconds = 120,
+    requests = fl.Resources(mem="1Gi"),
+    container_image = "myrepo/myimage-with-scipy:latest",
+)
+
- @fl.task(requests=fl.Resources(mem="1Gi"))
+ @actor_env.task
def add_numbers(a: float, b: float) -> float:
    return a + b

- @fl.task(container_image="myrepo/myimage-with-scipy:latest")
+ @actor_env.task
def calculate_distance(point_a: list[int], point_b: list[int]) -> float:
    from scipy.spatial.distance import euclidean
    return euclidean(point_a, point_b)

- @fl.task(cache=True, cache_version="v1")
+ @actor_env.task(cache=True, cache_version="v1")
def is_even(number: int) -> bool:
    return number % 2 == 0

@fl.workflow
def distance_add_wf(point_a: list[int], point_b: list[int]) -> float:
    distance = calculate_distance(point_a=point_a, point_b=point_b)
    return add_numbers(a=distance, b=1.5)

@fl.workflow
def is_even_wf(point_a: list[int]) -> list[bool]:
    return fl.map_task(is_even)(number=point_a)