# Flyte webhook

`FlyteWebhookAppEnvironment` is a pre-built `FastAPIAppEnvironment` that exposes
HTTP endpoints for common Union.ai operations. Instead of writing
your own FastAPI routes to interact with the control plane, you get a
ready-to-deploy webhook service with a single constructor call.

## Available endpoints

The webhook provides endpoints for the following operations:

| Group | Endpoints | Description |
|---|---|---|
| **core** | `GET /health`, `GET /me` | Health check and authenticated user info |
| **task** | `POST /run-task/{domain}/{project}/{name}`, `GET /task/{domain}/{project}/{name}` | Run tasks and retrieve task metadata |
| **run** | `GET /run/{name}`, `GET /run/{name}/io`, `POST /run/{name}/abort` | Get run status, inputs/outputs, and abort runs |
| **app** | `GET /app/{name}`, `POST /app/{name}/activate`, `POST /app/{name}/deactivate`, `POST /app/{name}/call` | Manage apps and call other app endpoints |
| **trigger** | `POST /trigger/{task_name}/{trigger_name}/activate`, `POST /trigger/{task_name}/{trigger_name}/deactivate` | Activate and deactivate triggers |
| **build** | `POST /build-image` | Build container images |
| **prefetch** | `POST /prefetch/hf-model`, `GET /prefetch/hf-model/{run_name}`, `GET /prefetch/hf-model/{run_name}/io`, `POST /prefetch/hf-model/{run_name}/abort` | Prefetch HuggingFace models |

All endpoints except `/health`, `/docs`, and `/openapi.json` use passthrough
authentication, forwarding the caller's credentials to the
Union.ai control plane.

## Basic usage

Create a webhook with all endpoints enabled:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

Deploy and activate it:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

Once running, the webhook exposes OpenAPI docs at `{endpoint}/docs` (Swagger UI)
and `{endpoint}/redoc`.

## Filtering endpoints

You can restrict which endpoints the webhook exposes using either **endpoint
groups** or **individual endpoints**.

### Endpoint groups

Enable groups of related endpoints with `endpoint_groups`:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

Available groups: `all`, `core`, `task`, `run`, `app`, `trigger`, `build`, `prefetch`.

### Individual endpoints

For finer control, specify exact endpoints with `endpoints`:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

> [!NOTE]
> You cannot specify both `endpoint_groups` and `endpoints` at the same time. Use
> one or the other.

## Allow-listing

Restrict which resources the webhook can access using allow-lists.

### Task allow-list

Limit which tasks can be run or queried through the webhook:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

Task identifiers support three formats:
- `domain/project/name` — exact match
- `project/name` — matches any domain
- `name` — matches any domain and project

### App allow-list

Limit which apps can be managed through the webhook:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

### Trigger allow-list

Limit which triggers can be activated or deactivated:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

Trigger identifiers support two formats:
- `task_name/trigger_name` — exact match
- `trigger_name` — matches any task

## Calling the webhook

Authenticate requests with a Union.ai API key passed as a Bearer
token:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0",
#    "fastapi",
#    "uvicorn",
#    "httpx",
# ]
# ///

"""Examples showing how to use FlyteWebhookAppEnvironment."""

import logging

import flyte
import flyte.app
from flyte.app.extras import FlyteWebhookAppEnvironment

# {{docs-fragment basic-webhook}}
webhook_env = FlyteWebhookAppEnvironment(
    name="my-webhook",
    title="My Flyte Webhook",
    description="A pre-built webhook service for Flyte operations",
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    scaling=flyte.app.Scaling(replicas=1),
)
# {{/docs-fragment basic-webhook}}

# {{docs-fragment endpoint-groups}}
task_runner_webhook = FlyteWebhookAppEnvironment(
    name="task-runner-webhook",
    title="Task Runner Webhook",
    endpoint_groups=["core", "task", "run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment endpoint-groups}}

# {{docs-fragment individual-endpoints}}
minimal_webhook = FlyteWebhookAppEnvironment(
    name="minimal-webhook",
    title="Minimal Webhook",
    endpoints=["health", "run_task", "get_run"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment individual-endpoints}}

# {{docs-fragment task-allowlist}}
restricted_webhook = FlyteWebhookAppEnvironment(
    name="restricted-webhook",
    title="Restricted Webhook",
    endpoint_groups=["core", "task", "run"],
    task_allowlist=[
        "production/my-project/allowed-task",
        "my-project/another-task",
        "any-domain-task",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment task-allowlist}}

# {{docs-fragment app-allowlist}}
app_manager_webhook = FlyteWebhookAppEnvironment(
    name="app-manager-webhook",
    title="App Manager Webhook",
    endpoint_groups=["core", "app"],
    app_allowlist=["my-app", "another-app"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment app-allowlist}}

# {{docs-fragment trigger-allowlist}}
trigger_manager_webhook = FlyteWebhookAppEnvironment(
    name="trigger-manager-webhook",
    title="Trigger Manager Webhook",
    endpoint_groups=["core", "trigger"],
    trigger_allowlist=["my-task/my-trigger", "another-trigger"],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
)
# {{/docs-fragment trigger-allowlist}}

# {{docs-fragment deploy-webhook}}
if __name__ == "__main__":
    import os

    import httpx

    flyte.init_from_config(log_level=logging.DEBUG)

    served_app = flyte.serve(webhook_env)
    url = served_app.url
    endpoint = served_app.endpoint
    print(f"Webhook is served on {url}")
    print(f"OpenAPI docs available at: {endpoint}/docs")

    served_app.activate(wait=True)
# {{/docs-fragment deploy-webhook}}

# {{docs-fragment call-webhook}}
    token = os.getenv("FLYTE_API_KEY")
    if not token:
        raise ValueError("FLYTE_API_KEY not set. Obtain with: flyte get api-key")

    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "flyte-webhook-client/1.0",
    }

    with httpx.Client(headers=headers) as client:
        # Health check (no auth required)
        health = client.get(f"{endpoint}/health")
        print(f"/health: {health.json()}")

        # Get current user info (requires auth)
        me = client.get(f"{endpoint}/me")
        print(f"/me: {me.json()}")

        # Run a task
        resp = client.post(
            f"{endpoint}/run-task/development/my-project/my-task",
            json={"x": 42, "y": "hello"},
        )
        result = resp.json()
        print(f"Run task: {result}")

        # Check run status
        run_name = result["name"]
        run = client.get(f"{endpoint}/run/{run_name}")
        print(f"Run status: {run.json()}")
# {{/docs-fragment call-webhook}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/flyte_webhook_examples.py*

## Authentication

`FlyteWebhookAppEnvironment` uses `FastAPIPassthroughAuthMiddleware`, which
extracts the caller's auth token from the `Authorization` header and sets up
a Union.ai context so that every control-plane call (e.g.
`remote.Task.get`, `flyte.run`) runs with the caller's identity.

The `/health`, `/docs`, `/openapi.json`, and `/redoc` endpoints are excluded
from authentication.

## Self-reference protection

App endpoints (`get_app`, `activate_app`, `deactivate_app`, `call_app`) prevent
the webhook from targeting itself. Attempting to activate, deactivate, or call
the webhook's own name returns a `400 Bad Request` error.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/build-apps/flyte-webhook.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/build-apps/flyte-webhook/
