Abort and cancel actions
When running complex workflows, you may need to stop actions that are no longer needed. This can happen when one branch of your workflow makes others redundant, when a task fails and its siblings should not continue, or when you need to manually intervene in a running workflow.
Flyte provides three mechanisms for stopping actions:
- Automatic cleanup: When a root action completes, all its in-progress descendant actions are automatically aborted.
- Programmatic cancellation: Cancel specific
asynciotasks from within your workflow code. - External abort: Stop individual actions via the CLI, the UI, or the API.
For background on runs and actions, see Runs and actions.
Action lifetime
The lifetime of all actions in a run is tied to the lifetime of the root action (the first task that was invoked). When the root action exits—whether it succeeds, fails, or returns early—all in-progress descendant actions are automatically aborted and no new actions can be enqueued.
This means you don’t need to manually clean up child actions. Flyte handles it for you.
Consider this example where main exits after 10 seconds, but it has spawned a sleep_for action that is set to run for 30 seconds:
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "seconds = 30"
# ///
import asyncio
import flyte
env = flyte.TaskEnvironment(name="action_lifetime")
@env.task
async def do_something():
print("Doing something")
await asyncio.sleep(5)
print("Finished doing something")
@env.task
async def sleep_for(seconds: int):
print(f"Sleeping for {seconds} seconds")
try:
await asyncio.sleep(seconds)
await do_something()
except asyncio.CancelledError:
print("sleep_for was cancelled")
return
print(f"Finished sleeping for {seconds} seconds")
@env.task
async def main(seconds: int):
print("Starting main")
asyncio.create_task(sleep_for(seconds))
await asyncio.sleep(10)
print("Main finished")
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(main, seconds=30)
print(run.url)
run.wait()
When main returns after 10 seconds, the sleep_for action (which still has 20 seconds remaining) is automatically aborted.
The sleep_for task receives an asyncio.CancelledError, giving it a chance to handle the cancellation gracefully.
Canceling actions programmatically
As a workflow author, you can cancel specific in-progress actions by canceling their corresponding asyncio tasks.
This is useful in scenarios like hyperparameter optimization (HPO), where one action converges to the desired result and the remaining actions can be stopped to save compute.
To cancel actions programmatically:
- Launch actions using
asyncio.create_task()and retain references to the returned task objects. - When the desired condition is met, call
.cancel()on the tasks you want to stop.
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n = 30, f = 10.0"
# ///
import asyncio
import flyte
import flyte.errors
env = flyte.TaskEnvironment("cancel")
@env.task
async def sleepers(f: float, n: int):
await asyncio.sleep(f)
@env.task
async def failing_task(f: float):
raise ValueError("I will fail!")
@env.task
async def main(n: int, f: float):
sleeping_tasks = []
for i in range(n):
sleeping_tasks.append(asyncio.create_task(sleepers(f, i)))
await asyncio.sleep(f)
try:
await failing_task(f)
await asyncio.gather(*sleeping_tasks)
except flyte.errors.RuntimeUserError as e:
if e.code == "ValueError":
print(f"Received ValueError, canceling {len(sleeping_tasks)} sleeping tasks")
for t in sleeping_tasks:
t.cancel()
return
if __name__ == "__main__":
flyte.init_from_config()
print(flyte.run(main, 30, 10.0))
In this code:
- The
maintask launches 30sleepersactions in parallel usingasyncio.create_task(). - It then calls
failing_task, which raises aValueError. - The error is caught as a
flyte.errors.RuntimeUserError(since user-raised exceptions are wrapped by Flyte). - On catching the error,
maincancels all sleeping tasks by calling.cancel()on each one, freeing their compute resources.
This pattern lets you react to runtime conditions and stop unnecessary work. For more on handling errors within workflows, see Error handling.
External abort
Sometimes you need to stop an action manually, outside the workflow code itself. You can abort individual actions using the CLI, the UI, or the API.
When an action is externally aborted, the parent action that awaits it receives a
flyte.errors.ActionAbortedError. You can catch this error to handle the abort gracefully.
Aborting via the CLI
To abort a specific action:
flyte abort <run-name> <action-name>Handling external aborts
When using asyncio.gather() with return_exceptions=True, externally aborted actions return an ActionAbortedError instead of raising it. This lets you inspect results and handle aborts on a per-action basis:
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n = 10, sleep_for = 30.0"
# ///
import asyncio
import flyte
import flyte.errors
env = flyte.TaskEnvironment("external_abort")
@env.task
async def long_sleeper(sleep_for: float):
await asyncio.sleep(sleep_for)
@env.task
async def main(n: int, sleep_for: float) -> str:
coros = [long_sleeper(sleep_for) for _ in range(n)]
results = await asyncio.gather(*coros, return_exceptions=True)
for i, r in enumerate(results):
if isinstance(r, flyte.errors.ActionAbortedError):
print(f"Action [{i}] was externally aborted")
return "Hello World!"
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(main, 10, 30.0)
print(run.url)
run.wait()
In this code:
- The
maintask launches 10long_sleeperactions in parallel. - If any action is externally aborted (via the CLI, the UI, or the API) while running,
asyncio.gathercaptures theActionAbortedErroras a result instead of propagating it. - The
maintask iterates over the results and logs which actions were aborted. - Because the abort is handled,
maincan continue executing and return its result normally.
Without return_exceptions=True, an external abort would raise ActionAbortedError directly, which you can handle with a standard try...except block.