Actors#

Actors allow you to reuse a container and environment between tasks, avoiding the cost of starting a new container for each task. This can be useful when you have a task that requires a lot of setup or has a long startup time.

To create an actor, instantiate the ActorEnvironment class, then add the instance as a decorator to the task that requires that environment.

ActorEnvironment parameters#

  • container_image: The container image to use for the task. Defaults to cr.union.ai/union/unionai:py3.11-latest.

  • environment: Environment variables as key, value pairs in a Python dictionary.

  • limits: Compute resource limits.

  • replica_count: The number of workers to provision that are able to accept tasks.

  • requests: Compute resource requests per task.

  • secret_requests: Keys (ideally descriptive) that can identify the secrets supplied at runtime. For more information, see Managing secrets.

  • ttl_seconds: How long to keep the Actor alive while no tasks are being run.

Examples#

Hello world#

The following example shows how to create a basic ActorEnvironment and use it for one task:

hello_world.py#
from flytekit import workflow, Resources
from union.actor import ActorEnvironment

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=Resources(
        cpu="2",
        mem="300Mi",
    ),
)


@actor.task
def say_hello() -> str:
    return "hello"


@workflow
def wf():
    say_hello()

Multiple instances of the same task#

In this example, the actor.task-decorated task is invoked multiple times in one workflow, and will use the same ActorEnvironment on each invocation:

plus_one.py#
from flytekit import workflow, Resources
from union.actor import ActorEnvironment

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=300,
    requests=Resources(cpu="2", mem="500Mi"),
)


@actor.task
def plus_one(input: int) -> int:
    return input + 1


@workflow
def wf(input: int = 0) -> int:
    a = plus_one(input=input)
    b = plus_one(input=a)
    c = plus_one(input=b)
    return plus_one(input=c)

Multiple tasks#

Every task execution in the following example will execute in the same ActorEnvironment. You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launchplans:

multiple_tasks.py#
from flytekit import current_context, workflow, LaunchPlan, Resources
from union.actor import ActorEnvironment

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=Resources(cpu="1", mem="450Mi"),
)


@actor.task
def say_hello(name: str) -> str:
    return f"hello {name}"


@actor.task
def scream_hello(name: str) -> str:
    return f"HELLO {name}"


@workflow
def my_child_wf(name: str) -> str:
    return scream_hello(name=name)


my_child_wf_lp = LaunchPlan.get_default_launch_plan(current_context(), my_child_wf)


@workflow
def my_parent_wf(name: str) -> str:
    a = say_hello(name=name)
    b = my_child_wf(name=a)
    return my_child_wf_lp(name=b)

Custom PodTemplates#

Both tasks in the following example will be executed in the same ActorEnvironment, which is created with a PodTemplate for additional configuration.

pod_template.py#
import os
from kubernetes.client.models import (
    V1Container,
    V1PodSpec,
    V1ResourceRequirements,
    V1EnvVar,
)
from flytekit import workflow, ImageSpec, PodTemplate
from union.actor import ActorEnvironment

image = ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union", "flytekitplugins-pod"],
)

pod_template = PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                image=image,
                resources=V1ResourceRequirements(
                    requests={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                    limits={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                ),
                env=[V1EnvVar(name="COMP_KEY_EX", value="compile_time")],
            ),
        ],
    ),
)

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    pod_template=pod_template,
)

@actor.task
def get_and_set() -> str:
    os.environ["RUN_KEY_EX"] = "run_time"
    return os.getenv("COMP_KEY_EX")


@actor.task
def check_set() -> str:
    return os.getenv("RUN_KEY_EX")


@workflow
def wf() -> tuple[str,str]:
    return get_and_set(), check_set()