Actors#

Actors allow you to reuse a container and environment between tasks that need to maintain state. To create an actor, instantiate the ActorEnvironment class, then add the instance as a decorator to the task that requires that environment.

ActorEnvironment parameters#

  • backlog_length: The number of tasks to keep in the worker queue on the backend. Setting backlog_length ensures that the worker executing actor tasks immediately executes the next task after completing the previous one instead of waiting for the scheduler to complete other operations before scheduling the next task.

  • container_image: The container image to use for the task. Defaults to cr.flyte.org/flyteorg/flytekit:py3.9-latest.

  • environment: Environment variables as key, value pairs in a Python dictionary.

  • limits: Compute resource limits.

  • parallelism: The number of tasks that can execute in parallel, per worker.

  • replica_count: The number of workers to provision that are able to accept tasks.

  • requests: Compute resource requests per task.

  • secret_requests: Keys (ideally descriptive) that can identify the secrets supplied at runtime. For more information, see Managing secrets.

  • ttl_seconds: How long to keep the Actor alive while no tasks are being run.

Examples#

Hello world#

The following example shows how to create a basic ActorEnvironment and use it for one task:

hello_world.py#
from flytekit import workflow, Resources
from unionai.actor import ActorEnvironment

actor = ActorEnvironment(
    name="my_actor",
    replica_count=1,
    parallelism=1,
    backlog_length=10,
    ttl_seconds=30,
    requests=Resources(
        cpu="2",
        mem="300Mi",
    ),
)


@actor.task
def say_hello() -> str:
    return "hello"


@workflow
def wf():
    say_hello()

Multiple instances of the same task#

In this example, the actor.task-decorated task is invoked multiple times in one workflow, and will use the same ActorEnvironment on each invocation:

plus_one.py#
from flytekit import workflow, Resources
from unionai.actor import ActorEnvironment

actor = ActorEnvironment(
    name="my_actor",
    replica_count=1,
    parallelism=1,
    backlog_length=50,
    ttl_seconds=300,
    requests=Resources(cpu="2", mem="500Mi"),
)


@actor.task
def plus_one(input: int) -> int:
    return input + 1


@workflow
def wf(input: int = 0) -> int:
    a = plus_one(input=input)
    b = plus_one(input=a)
    c = plus_one(input=b)
    return plus_one(input=c)

Multiple tasks#

Every task execution in the following example will execute in the same ActorEnvironment. You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launchplans:

multiple_entities.py#
from flytekit import current_context, workflow, LaunchPlan, Resources
from unionai.actor import ActorEnvironment

actor = ActorEnvironment(
    name="my_actor",
    replica_count=1,
    parallelism=1,
    backlog_length=50,
    ttl_seconds=30,
    requests=Resources(cpu="1", mem="450Mi")
)


@actor.task
def say_hello(name: str) -> str:
    return f"hello {name}"


@actor.task
def scream_hello(name: str) -> str:
    return f"HELLO {name}"


@workflow
def my_child_wf(name: str) -> str:
    return scream_hello(name=name)


my_child_wf_lp = LaunchPlan.get_default_launch_plan(current_context(),
                                                    my_child_wf)


@workflow
def my_parent_wf(name: str) -> str:
    a = say_hello(name=name)
    b = my_child_wf(name=a)
    return my_child_wf_lp(name=b)