Experimental features#
Warning
The constructs below are experimental and the API is subject to breaking changes.
- @flytekit.experimental.eager(_fn=None, *, remote=None, client_secret_group=None, client_secret_key=None, timeout=None, poll_interval=None, local_entrypoint=False, client_secret_env_var=None, **kwargs)#
Eager workflow decorator.
- Parameters:
remote (FlyteRemote | None) – A
FlyteRemote
object to use for executing Flyte entities.client_secret_group (str | None) – The client secret group to use for this workflow.
client_secret_key (str | None) – The client secret key to use for this workflow.
timeout (timedelta | None) – The timeout duration specifying how long to wait for a task/workflow execution within the eager workflow to complete or terminate. By default, the eager workflow will wait indefinitely until complete.
poll_interval (timedelta | None) – The poll interval for checking if a task/workflow execution within the eager workflow has finished. If not specified, the default poll interval is 6 seconds.
local_entrypoint (bool) – If True, the eager workflow will can be executed locally but use the provided
FlyteRemote()
object to create task/workflow executions. This is useful for local testing against a remote Flyte cluster.client_secret_env_var (str | None) – if specified, binds the client secret to the specified environment variable for remote authentication.
kwargs – keyword-arguments forwarded to
task()
.
This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be used inside of an
@eager
-decorated function. This is because eager workflows use aFlyteRemote
object to kick off executions when a flyte entity needs to produce a value.For example:
from flytekit import task from flytekit.experimental import eager @task def add_one(x: int) -> int: return x + 1 @task def double(x: int) -> int: return x * 2 @eager async def eager_workflow(x: int) -> int: out = await add_one(x=x) return await double(x=out) # run locally with asyncio if __name__ == "__main__": import asyncio result = asyncio.run(eager_workflow(x=1)) print(f"Result: {result}") # "Result: 4"
Unlike
dynamic workflows
, eager workflows are not compiled into a workflow spec, but uses python’s async capabilities to execute flyte entities.Note
Eager workflows only support @task, @workflow, and @eager entities. Dynamic workflows and launchplans are currently not supported.
Note that for the
@eager
function is anasync
function. Under the hood, tasks and workflows called inside an@eager
workflow are executed asynchronously. This means that task and workflow calls will return an awaitable, which need to be awaited.Important
A
client_secret_group
andclient_secret_key
is needed for authenticating viaFlyteRemote
using theclient_credentials
authentication, which is configured viaPlatformConfig
.from flytekit.remote import FlyteRemote from flytekit.configuration import Config @eager( remote=UnionRemote(config=Config.auto(config_file="config.yaml")), client_secret_group="my_client_secret_group", client_secret_key="my_client_secret_key", ) async def eager_workflow(x: int) -> int: out = await add_one(x) return await double(one)
Where
config.yaml
contains is a flytectl-compatible config file. For more details, see here.When using a sandbox cluster started with
flytectl demo start
, however, theclient_secret_group
andclient_secret_key
are not needed, :@eager(remote=UnionRemote(config=Config.for_sandbox())) async def eager_workflow(x: int) -> int: ...
Important
When using
local_entrypoint=True
you also need to specify theremote
argument. In this case, the eager workflow runtime will be local, but all task/subworkflow invocations will occur on the specified Flyte cluster. This argument is primarily used for testing and debugging eager workflow logic locally.
- class flytekit.experimental.EagerException#
Raised when a node in an eager workflow encounters an error.
This exception should be used in an
@eager
workflow function to catch exceptions that are raised by tasks or subworkflows.from flytekit import task from flytekit.experimental import eager, EagerException @task def add_one(x: int) -> int: if x < 0: raise ValueError("x must be positive") return x + 1 @task def double(x: int) -> int: return x * 2 @eager async def eager_workflow(x: int) -> int: try: out = await add_one(x=x) except EagerException: # The ValueError error is caught # and raised as an EagerException raise return await double(x=out)