Experimental features#

Warning

The constructs below are experimental and the API is subject to breaking changes.

@flytekit.experimental.eager(_fn=None, *, remote=None, client_secret_group=None, client_secret_key=None, timeout=None, poll_interval=None, local_entrypoint=False, **kwargs)#

Eager workflow decorator.

Parameters:
  • remote (FlyteRemote | None) – A FlyteRemote object to use for executing Flyte entities.

  • client_secret_group (str | None) – The client secret group to use for this workflow.

  • client_secret_key (str | None) – The client secret key to use for this workflow.

  • timeout (timedelta | None) – The timeout duration specifying how long to wait for a task/workflow execution within the eager workflow to complete or terminate. By default, the eager workflow will wait indefinitely until complete.

  • poll_interval (timedelta | None) – The poll interval for checking if a task/workflow execution within the eager workflow has finished. If not specified, the default poll interval is 6 seconds.

  • local_entrypoint (bool) – If True, the eager workflow will can be executed locally but use the provided FlyteRemote() object to create task/workflow executions. This is useful for local testing against a remote Flyte cluster.

  • kwargs – keyword-arguments forwarded to task().

This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be used inside of an @eager-decorated function. This is because eager workflows use a FlyteRemote object to kick off executions when a flyte entity needs to produce a value.

For example:

from flytekit import task
from flytekit.experimental import eager

@task
def add_one(x: int) -> int:
    return x + 1

@task
def double(x: int) -> int:
    return x * 2

@eager
async def eager_workflow(x: int) -> int:
    out = await add_one(x=x)
    return await double(x=out)

# run locally with asyncio
if __name__ == "__main__":
    import asyncio

    result = asyncio.run(eager_workflow(x=1))
    print(f"Result: {result}")  # "Result: 4"

Unlike dynamic workflows, eager workflows are not compiled into a workflow spec, but uses python’s async capabilities to execute flyte entities.

Note

Eager workflows only support @task, @workflow, and @eager entities. Dynamic workflows and launchplans are currently not supported.

Note that for the @eager function is an async function. Under the hood, tasks and workflows called inside an @eager workflow are executed asynchronously. This means that task and workflow calls will return an awaitable, which need to be awaited.

Important

A client_secret_group and client_secret_key is needed for authenticating via FlyteRemote using the client_credentials authentication, which is configured via PlatformConfig.

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config

@eager(
    remote=UnionRemote(config=Config.auto(config_file="config.yaml")),
    client_secret_group="my_client_secret_group",
    client_secret_key="my_client_secret_key",
)
async def eager_workflow(x: int) -> int:
    out = await add_one(x)
    return await double(one)

Where config.yaml contains is a flytectl-compatible config file. For more details, see here.

When using a sandbox cluster started with flytectl demo start, however, the client_secret_group and client_secret_key are not needed, :

@eager(remote=UnionRemote(config=Config.for_sandbox()))
async def eager_workflow(x: int) -> int:
    ...

Important

When using local_entrypoint=True you also need to specify the remote argument. In this case, the eager workflow runtime will be local, but all task/subworkflow invocations will occur on the specified Flyte cluster. This argument is primarily used for testing and debugging eager workflow logic locally.

class flytekit.experimental.EagerException#

Raised when a node in an eager workflow encounters an error.

This exception should be used in an @eager workflow function to catch exceptions that are raised by tasks or subworkflows.

from flytekit import task
from flytekit.experimental import eager, EagerException

@task
def add_one(x: int) -> int:
    if x < 0:
        raise ValueError("x must be positive")
    return x + 1

@task
def double(x: int) -> int:
    return x * 2

@eager
async def eager_workflow(x: int) -> int:
    try:
        out = await add_one(x=x)
    except EagerException:
        # The ValueError error is caught
        # and raised as an EagerException
        raise
    return await double(x=out)