Actors#
Actors allow you to reuse a container and environment between tasks, avoiding the cost of starting a new container for each task. This can be useful when you have a task that requires a lot of setup or has a long startup time.
To create an actor, instantiate the ActorEnvironment
class, then add the instance as a decorator to the task that requires that environment.
ActorEnvironment
parameters#
container_image: The container image to use for the task. Defaults to
cr.flyte.org/flyteorg/flytekit:py3.11-latest
.environment: Environment variables as key, value pairs in a Python dictionary.
limits: Compute resource limits.
replica_count: The number of workers to provision that are able to accept tasks.
requests: Compute resource requests per task.
secret_requests: Keys (ideally descriptive) that can identify the secrets supplied at runtime. For more information, see Managing secrets.
ttl_seconds: How long to keep the Actor alive while no tasks are being run.
Examples#
Hello world#
The following example shows how to create a basic ActorEnvironment
and use it for one task:
import os
from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
requests=Resources(
cpu="2",
mem="300Mi",
),
container_image=image,
)
@actor.task
def say_hello() -> str:
return "hello"
@workflow
def wf():
say_hello()
Multiple instances of the same task#
In this example, the actor.task
-decorated task is invoked multiple times in one workflow, and will use the same ActorEnvironment
on each invocation:
import os
from flytekit import workflow, Resources, ImageSpec
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=300,
requests=Resources(cpu="2", mem="500Mi"),
container_image=image,
)
@actor.task
def plus_one(input: int) -> int:
return input + 1
@workflow
def wf(input: int = 0) -> int:
a = plus_one(input=input)
b = plus_one(input=a)
c = plus_one(input=b)
return plus_one(input=c)
Multiple tasks#
Every task execution in the following example will execute in the same ActorEnvironment
. You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launchplans:
import os
from flytekit import current_context, workflow, LaunchPlan, Resources, ImageSpec
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union"],
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
requests=Resources(cpu="1", mem="450Mi"),
container_image=image,
)
@actor.task
def say_hello(name: str) -> str:
return f"hello {name}"
@actor.task
def scream_hello(name: str) -> str:
return f"HELLO {name}"
@workflow
def my_child_wf(name: str) -> str:
return scream_hello(name=name)
my_child_wf_lp = LaunchPlan.get_default_launch_plan(current_context(),
my_child_wf)
@workflow
def my_parent_wf(name: str) -> str:
a = say_hello(name=name)
b = my_child_wf(name=a)
return my_child_wf_lp(name=b)
Custom PodTemplates#
Both tasks in the following example will be executed in the same ActorEnvironment
, which is created with a PodTemplate
for additional configuration.
import os
from kubernetes.client.models import (
V1Container,
V1PodSpec,
V1ResourceRequirements,
V1EnvVar,
)
from flytekit import workflow, ImageSpec, PodTemplate
from union.actor import ActorEnvironment
image = ImageSpec(
registry=os.environ.get("DOCKER_REGISTRY", None),
packages=["union", "flytekitplugins-pod"],
)
pod_template = PodTemplate(
primary_container_name="primary",
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary",
image=image,
resources=V1ResourceRequirements(
requests={
"cpu": "1",
"memory": "1Gi",
},
limits={
"cpu": "1",
"memory": "1Gi",
},
),
env=[V1EnvVar(name="COMP_KEY_EX", value="compile_time")],
),
],
),
)
actor = ActorEnvironment(
name="my-actor",
replica_count=1,
ttl_seconds=30,
pod_template=pod_template,
)
@actor.task
def get_and_set() -> str:
os.environ["RUN_KEY_EX"] = "run_time"
return os.getenv("COMP_KEY_EX")
@actor.task
def check_set() -> str:
return os.getenv("RUN_KEY_EX")
@workflow
def wf() -> tuple[str,str]:
return get_and_set(), check_set()