Actors#
Actors allow you to reuse a container and environment between tasks that need to maintain state. To create an actor, instantiate the ActorEnvironment
class, then add the instance as a decorator to the task that requires that environment.
ActorEnvironment
parameters#
container_image: The container image to use for the task. Defaults to
cr.flyte.org/flyteorg/flytekit:py3.11-latest
.environment: Environment variables as key, value pairs in a Python dictionary.
limits: Compute resource limits.
replica_count: The number of workers to provision that are able to accept tasks.
requests: Compute resource requests per task.
secret_requests: Keys (ideally descriptive) that can identify the secrets supplied at runtime. For more information, see Managing secrets.
ttl_seconds: How long to keep the Actor alive while no tasks are being run.
Examples#
Hello world#
The following example shows how to create a basic ActorEnvironment
and use it for one task:
import os
from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
requests=Resources(
cpu="2",
mem="300Mi",
),
container_image=image,
)
@actor.task
def say_hello() -> str:
return "hello"
@workflow
def wf():
say_hello()
Multiple instances of the same task#
In this example, the actor.task
-decorated task is invoked multiple times in one workflow, and will use the same ActorEnvironment
on each invocation:
import os
from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=300,
requests=Resources(cpu="2", mem="500Mi"),
container_image=image,
)
@actor.task
def plus_one(input: int) -> int:
return input + 1
@workflow
def wf(input: int = 0) -> int:
a = plus_one(input=input)
b = plus_one(input=a)
c = plus_one(input=b)
return plus_one(input=c)
Multiple tasks#
Every task execution in the following example will execute in the same ActorEnvironment
. You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launchplans:
import os
from flytekit import current_context, workflow, LaunchPlan, Resources, ImageSpec
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
requests=Resources(cpu="1", mem="450Mi"),
container_image=image,
)
@actor.task
def say_hello(name: str) -> str:
return f"hello {name}"
@actor.task
def scream_hello(name: str) -> str:
return f"HELLO {name}"
@workflow
def my_child_wf(name: str) -> str:
return scream_hello(name=name)
my_child_wf_lp = LaunchPlan.get_default_launch_plan(current_context(),
my_child_wf)
@workflow
def my_parent_wf(name: str) -> str:
a = say_hello(name=name)
b = my_child_wf(name=a)
return my_child_wf_lp(name=b)