Run context
Every Flyte run has a run context — a set of invocation-time parameters that control where the run executes, where its outputs are stored, how caching behaves, and more.
There are two sides to run context:
- Write side:
flyte.with_runcontext()— set run parameters before the run starts (programmatic) or via CLI flags. - Read side:
flyte.ctx()— access run parameters inside a running task.
Configuring a run with flyte.with_runcontext()
flyte.with_runcontext() returns a runner object. Call .run(task, ...) on it to start the run with the specified context:
import flyte
env = flyte.TaskEnvironment("run-context-example")
@env.task
async def process(n: int) -> int:
return n * 2
@env.task
async def root() -> int:
return await process(21)
if __name__ == "__main__":
flyte.init_from_config()
flyte.with_runcontext(
name="my-run",
project="my-project",
domain="development",
).run(root)
All parameters are optional. Unset parameters inherit from the configuration file (config.yaml) or system defaults.
Execution target
| Parameter | Type | Default | Description |
|---|---|---|---|
mode |
"local" | "remote" | "hybrid" |
from config | Where the run executes. "remote" runs on the Flyte backend; "local" runs in-process. |
project |
str |
from config | Project to run in. |
domain |
str |
from config | Domain to run in (e.g. "development", "production"). |
name |
str |
auto-generated | Custom name for the run, visible in the UI. |
version |
str |
from code bundle | Version string for the ephemeral task deployment. |
queue |
str |
from config | Cluster queue to schedule tasks on. |
interruptible |
bool |
per-task setting | Override the interruptible setting for all tasks in the run. True allows spot/preemptible instances; False forces non-interruptible instances. |
Storage
| Parameter | Type | Default | Description |
|---|---|---|---|
raw_data_path |
str |
from config | Storage prefix for offloaded data types (
Files,
Dirs,
DataFrames, checkpoints). Accepts s3://, gs://, or local paths. |
run_base_dir |
str |
auto-generated | Base directory for run metadata passed between tasks. Distinct from raw_data_path. |
To direct all task outputs to a specific bucket for a run:
if __name__ == "__main__":
flyte.init_from_config()
flyte.with_runcontext(
# Store all task outputs in a dedicated S3 prefix for this run
raw_data_path="s3://my-bucket/runs/experiment-42/",
).run(root)
The equivalent CLI flag is --raw-data-path. See
Run command options for CLI usage.
Caching
| Parameter | Type | Default | Description |
|---|---|---|---|
overwrite_cache |
bool |
False |
Re-execute all tasks even if a cached result exists, and overwrite the cache with new results. |
disable_run_cache |
bool |
False |
Skip cache lookups and writes entirely for this run. |
cache_lookup_scope |
"global" | … |
"global" |
Scope for cache lookups. |
Identity and resources
| Parameter | Type | Default | Description |
|---|---|---|---|
service_account |
str |
from config | Kubernetes service account for task pods. |
env_vars |
Dict[str, str] |
None |
Additional environment variables to inject into task containers. |
labels |
Dict[str, str] |
None |
Kubernetes labels to apply to task pods. |
annotations |
Dict[str, str] |
None |
Kubernetes annotations to apply to task pods. |
Logging
| Parameter | Type | Default | Description |
|---|---|---|---|
log_level |
int |
from config | Python log level (e.g. logging.DEBUG). |
log_format |
"console" | … |
"console" |
Log output format. |
reset_root_logger |
bool |
False |
If True, preserve the root logger unchanged. |
Code bundling
| Parameter | Type | Default | Description |
|---|---|---|---|
copy_style |
"loaded_modules" | "all" | "none" |
"loaded_modules" |
Code bundling strategy. See Run command options. |
dry_run |
bool |
False |
Build and upload the code bundle without executing the run. |
copy_bundle_to |
Path |
None |
When dry_run=True, copy the bundle to this local path. |
interactive_mode |
bool |
auto-detected | Override interactive mode detection (set automatically for Jupyter notebooks). |
preserve_original_types |
bool |
False |
Keep native DataFrame types (e.g. pd.DataFrame) rather than converting to flyte.io.DataFrame when deserializing outputs. |
Context propagation
| Parameter | Type | Default | Description |
|---|---|---|---|
custom_context |
Dict[str, str] |
None |
Metadata propagated through the entire task hierarchy. Readable inside any task via flyte.ctx().custom_context. See
Custom context. |
Reading context inside a task with flyte.ctx()
Inside a running task, flyte.ctx() returns a TaskContext object with information about the current execution. Outside of a task, it returns None.
@env.task
async def inspect_context() -> str:
ctx = flyte.ctx()
action = ctx.action
return (
f"run={action.run_name}, "
f"action={action.name}, "
f"mode={ctx.mode}, "
f"in_cluster={ctx.is_in_cluster()}"
)
TaskContext fields
| Field | Type | Description |
|---|---|---|
action |
ActionID |
Identity of this specific action (task invocation) within the run. |
mode |
"local" | "remote" | "hybrid" |
Execution mode of the current run. |
version |
str |
Version of the deployed task code bundle. |
raw_data_path |
str |
Storage prefix where offloaded outputs are written. |
run_base_dir |
str |
Base directory for run metadata. |
custom_context |
Dict[str, str] |
Propagated context metadata from with_runcontext(). |
disable_run_cache |
bool |
Whether run caching is disabled for this run. |
is_in_cluster() |
method | Returns True when mode == "remote". Useful for branching local/remote behavior. |
ActionID fields
The ctx.action object identifies this specific task invocation:
| Field | Type | Description |
|---|---|---|
name |
str |
Unique identifier for this action. |
run_name |
str |
Name of the parent run (defaults to name if not set). |
project |
str | None |
Project the action runs in. |
domain |
str | None |
Domain the action runs in. |
org |
str | None |
Organization. |
Naming external resources
ctx.action.run_name is useful for tying external tool runs (experiment trackers, dashboards) to the corresponding Flyte run:
import wandb # type: ignore[import]
@env.task
async def train_model(epochs: int) -> float:
ctx = flyte.ctx()
# Use run_name to tie the W&B run to this Flyte run
run = wandb.init(
project="my-project",
name=ctx.action.run_name,
config={"epochs": epochs},
)
# ... training logic ...
loss = 0.42
run.log({"loss": loss})
run.finish()
return loss
This ensures that when you look up a run in Weights & Biases (or any other tool), its name matches what you see in the Flyte UI.