Flyte webhook

FlyteWebhookAppEnvironment is a pre-built FastAPIAppEnvironment that exposes HTTP endpoints for common Union.ai operations. Instead of writing your own FastAPI routes to interact with the control plane, you get a ready-to-deploy webhook service with a single constructor call.

Available endpoints

The webhook provides endpoints for the following operations:

Group Endpoints Description
core GET /health, GET /me Health check and authenticated user info
task POST /run-task/{domain}/{project}/{name}, GET /task/{domain}/{project}/{name} Run tasks and retrieve task metadata
run GET /run/{name}, GET /run/{name}/io, POST /run/{name}/abort Get run status, inputs/outputs, and abort runs
app GET /app/{name}, POST /app/{name}/activate, POST /app/{name}/deactivate, POST /app/{name}/call Manage apps and call other app endpoints
trigger POST /trigger/{task_name}/{trigger_name}/activate, POST /trigger/{task_name}/{trigger_name}/deactivate Activate and deactivate triggers
build POST /build-image Build container images
prefetch POST /prefetch/hf-model, GET /prefetch/hf-model/{run_name}, GET /prefetch/hf-model/{run_name}/io, POST /prefetch/hf-model/{run_name}/abort Prefetch HuggingFace models

All endpoints except /health, /docs, and /openapi.json use passthrough authentication, forwarding the caller’s credentials to the Union.ai control plane.

Basic usage

Create a webhook with all endpoints enabled:

flyte_webhook_examples.py
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)

Deploy and activate it:

flyte_webhook_examples.py
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)

Once running, the webhook exposes OpenAPI docs at {endpoint}/docs (Swagger UI) and {endpoint}/redoc.

Filtering endpoints

You can restrict which endpoints the webhook exposes using either endpoint groups or individual endpoints.

Endpoint groups

Enable groups of related endpoints with endpoint_groups:

flyte_webhook_examples.py
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)

Available groups: all, core, task, run, app, trigger, build, prefetch.

Individual endpoints

For finer control, specify exact endpoints with endpoints:

flyte_webhook_examples.py
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)

You cannot specify both endpoint_groups and endpoints at the same time. Use one or the other.

Allow-listing

Restrict which resources the webhook can access using allow-lists.

Task allow-list

Limit which tasks can be run or queried through the webhook:

flyte_webhook_examples.py
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)

Task identifiers support three formats:

  • domain/project/name — exact match
  • project/name — matches any domain
  • name — matches any domain and project

App allow-list

Limit which apps can be managed through the webhook:

flyte_webhook_examples.py
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)

Trigger allow-list

Limit which triggers can be activated or deactivated:

flyte_webhook_examples.py
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)

Trigger identifiers support two formats:

  • task_name/trigger_name — exact match
  • trigger_name — matches any task

Calling the webhook

Authenticate requests with a Union.ai API key passed as a Bearer token:

flyte_webhook_examples.py
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")

Authentication

FlyteWebhookAppEnvironment uses FastAPIPassthroughAuthMiddleware, which extracts the caller’s auth token from the Authorization header and sets up a Union.ai context so that every control-plane call (e.g. remote.Task.get, flyte.run) runs with the caller’s identity.

The /health, /docs, /openapi.json, and /redoc endpoints are excluded from authentication.

Self-reference protection

App endpoints (get_app, activate_app, deactivate_app, call_app) prevent the webhook from targeting itself. Attempting to activate, deactivate, or call the webhook’s own name returns a 400 Bad Request error.