Flyte webhook
FlyteWebhookAppEnvironment is a pre-built FastAPIAppEnvironment that exposes
HTTP endpoints for common Union.ai operations. Instead of writing
your own FastAPI routes to interact with the control plane, you get a
ready-to-deploy webhook service with a single constructor call.
Available endpoints
The webhook provides endpoints for the following operations:
| Group | Endpoints | Description |
|---|---|---|
| core | GET /health, GET /me |
Health check and authenticated user info |
| task | POST /run-task/{domain}/{project}/{name}, GET /task/{domain}/{project}/{name} |
Run tasks and retrieve task metadata |
| run | GET /run/{name}, GET /run/{name}/io, POST /run/{name}/abort |
Get run status, inputs/outputs, and abort runs |
| app | GET /app/{name}, POST /app/{name}/activate, POST /app/{name}/deactivate, POST /app/{name}/call |
Manage apps and call other app endpoints |
| trigger | POST /trigger/{task_name}/{trigger_name}/activate, POST /trigger/{task_name}/{trigger_name}/deactivate |
Activate and deactivate triggers |
| build | POST /build-image |
Build container images |
| prefetch | POST /prefetch/hf-model, GET /prefetch/hf-model/{run_name}, GET /prefetch/hf-model/{run_name}/io, POST /prefetch/hf-model/{run_name}/abort |
Prefetch HuggingFace models |
All endpoints except /health, /docs, and /openapi.json use passthrough
authentication, forwarding the caller’s credentials to the
Union.ai control plane.
Basic usage
Create a webhook with all endpoints enabled:
webhook_env = FlyteWebhookAppEnvironment(
name="my-webhook",
title="My Flyte Webhook",
description="A pre-built webhook service for Flyte operations",
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=True,
scaling=flyte.app.Scaling(replicas=1),
)
Deploy and activate it:
if __name__ == "__main__":
import os
import httpx
flyte.init_from_config(log_level=logging.DEBUG)
served_app = flyte.serve(webhook_env)
url = served_app.url
endpoint = served_app.endpoint
print(f"Webhook is served on {url}")
print(f"OpenAPI docs available at: {endpoint}/docs")
served_app.activate(wait=True)
Once running, the webhook exposes OpenAPI docs at {endpoint}/docs (Swagger UI)
and {endpoint}/redoc.
Filtering endpoints
You can restrict which endpoints the webhook exposes using either endpoint groups or individual endpoints.
Endpoint groups
Enable groups of related endpoints with endpoint_groups:
task_runner_webhook = FlyteWebhookAppEnvironment(
name="task-runner-webhook",
title="Task Runner Webhook",
endpoint_groups=["core", "task", "run"],
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=True,
)
Available groups: all, core, task, run, app, trigger, build, prefetch.
Individual endpoints
For finer control, specify exact endpoints with endpoints:
minimal_webhook = FlyteWebhookAppEnvironment(
name="minimal-webhook",
title="Minimal Webhook",
endpoints=["health", "run_task", "get_run"],
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=True,
)
You cannot specify both endpoint_groups and endpoints at the same time. Use
one or the other.
Allow-listing
Restrict which resources the webhook can access using allow-lists.
Task allow-list
Limit which tasks can be run or queried through the webhook:
restricted_webhook = FlyteWebhookAppEnvironment(
name="restricted-webhook",
title="Restricted Webhook",
endpoint_groups=["core", "task", "run"],
task_allowlist=[
"production/my-project/allowed-task",
"my-project/another-task",
"any-domain-task",
],
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=True,
)
Task identifiers support three formats:
domain/project/name— exact matchproject/name— matches any domainname— matches any domain and project
App allow-list
Limit which apps can be managed through the webhook:
app_manager_webhook = FlyteWebhookAppEnvironment(
name="app-manager-webhook",
title="App Manager Webhook",
endpoint_groups=["core", "app"],
app_allowlist=["my-app", "another-app"],
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=True,
)
Trigger allow-list
Limit which triggers can be activated or deactivated:
trigger_manager_webhook = FlyteWebhookAppEnvironment(
name="trigger-manager-webhook",
title="Trigger Manager Webhook",
endpoint_groups=["core", "trigger"],
trigger_allowlist=["my-task/my-trigger", "another-trigger"],
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=True,
)
Trigger identifiers support two formats:
task_name/trigger_name— exact matchtrigger_name— matches any task
Calling the webhook
Authenticate requests with a Union.ai API key passed as a Bearer token:
token = os.getenv("FLYTE_API_KEY")
if not token:
raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")
headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "flyte-webhook-client/1.0",
}
with httpx.Client(headers=headers) as client:
# Health check (no auth required)
health = client.get(f"{endpoint}/health")
print(f"/health: {health.json()}")
# Get current user info (requires auth)
me = client.get(f"{endpoint}/me")
print(f"/me: {me.json()}")
# Run a task
resp = client.post(
f"{endpoint}/run-task/development/my-project/my-task",
json={"x": 42, "y": "hello"},
)
result = resp.json()
print(f"Run task: {result}")
# Check run status
run_name = result["name"]
run = client.get(f"{endpoint}/run/{run_name}")
print(f"Run status: {run.json()}")
Authentication
FlyteWebhookAppEnvironment uses FastAPIPassthroughAuthMiddleware, which
extracts the caller’s auth token from the Authorization header and sets up
a Union.ai context so that every control-plane call (e.g.
remote.Task.get, flyte.run) runs with the caller’s identity.
The /health, /docs, /openapi.json, and /redoc endpoints are excluded
from authentication.
Self-reference protection
App endpoints (get_app, activate_app, deactivate_app, call_app) prevent
the webhook from targeting itself. Attempting to activate, deactivate, or call
the webhook’s own name returns a 400 Bad Request error.