Integrations
Flyte 2 is designed to be extensible by default. While the core platform covers the most common orchestration needs, many production workloads require specialized infrastructure, external services or execution semantics that go beyond the core runtime.
Flyte 2 exposes these capabilities through integrations.
Under the hood, integrations are implemented using Flyte 2’s plugin system, which provides a consistent way to extend the platform without modifying core execution logic.
An integration allows you to declaratively enable new capabilities such as distributed compute frameworks or third-party services without manually managing infrastructure. You specify what you need, and Flyte takes care of how it is provisioned, used and cleaned up.
This page covers:
- The types of integrations Flyte 2 supports today
- How integrations fit into Flyte 2’s execution model
- How to use integrations in your tasks
- The integrations available out of the box
If you need functionality that doesn’t exist yet, Flyte 2’s plugin system is intentionally open-ended. You can build and register your own integrations using the same architecture described here.
Integration categories
Flyte 2 integrations generally fall into three broad categories:
- Distributed compute: Provision transient compute clusters to run tasks across multiple nodes, with automatic lifecycle management.
- External services: Enable Flyte to interact with third-party systems such as APIs, platforms, and SaaS services.
- Connectors: Stateless, long‑running services that receive execution requests via gRPC and then submit work to external (or internal) systems.
Distributed compute
Distributed compute integrations allow tasks to run on dynamically provisioned clusters. These clusters are created just-in-time, scoped to the task execution and torn down automatically when the task completes.
This enables large-scale parallelism without requiring users to operate or maintain long-running infrastructure.
Supported distributed compute integrations
| Plugin | Description | Common use cases |
|---|---|---|
| Ray | Provisions Ray clusters via KubeRay | Distributed Python, ML training, hyperparameter tuning |
| Spark | Provisions Spark clusters via Spark Operator | Large-scale data processing, ETL pipelines |
| Dask | Provisions Dask clusters via Dask Operator | Parallel Python workloads, dataframe operations |
| PyTorch | Distributed PyTorch training with elastic launch | Single-node and multi-node training |
Each plugin encapsulates:
- Cluster provisioning
- Resource configuration
- Networking and service discovery
- Lifecycle management and teardown
From the task author’s perspective, these details are abstracted away.
How the plugin system works
At a high level, Flyte 2’s distributed compute plugin architecture follows a simple and consistent pattern.
1. Registration
Each plugin registers itself with Flyte 2’s core plugin registry:
TaskPluginRegistry: The central registry for all distributed compute plugins- Each plugin declares:
- Its configuration schema
- How that configuration maps to execution behavior
This registration step makes the plugin discoverable by the runtime.
2. Task environments and plugin configuration
Integrations are activated through a TaskEnvironment.
A TaskEnvironment bundles:
- A container image
- Execution settings
- A plugin configuration object enabled with
plugin_config
The plugin configuration describes what infrastructure or integration the task requires.
3. Automatic provisioning and execution
When a task associated with a TaskEnvironment runs:
- Flyte inspects the environment’s plugin configuration
- The plugin provisions the required infrastructure or integration
- The task executes with access to that capability
- Flyte cleans up all transient resources after completion
Example: Using the Dask plugin
Below is a complete example showing how a task gains access to a Dask cluster simply by running inside an environment configured with the Dask plugin.
from flyteintegrations.dask import Dask, WorkerGroup
import flyte
# Define the Dask cluster configuration
dask_config = Dask(
workers=WorkerGroup(number_of_workers=4)
)
# Create a task environment that enables Dask
env = flyte.TaskEnvironment(
name="dask_env",
plugin_config=dask_config,
image=image,
)
# Any task in this environment has access to the Dask cluster
@env.task
async def process_data(data: list) -> list:
from distributed import Client
client = Client() # Automatically connects to the provisioned cluster
futures = client.map(transform, data)
return client.gather(futures)When process_data executes, Flyte performs the following steps:
- Provisions a Dask cluster with 4 workers
- Executes the task with network access to the cluster
- Tears down the cluster once the task completes
No cluster management logic appears in the task code. The task only expresses intent.
Key design principle
All distributed compute integrations follow the same mental model:
- You declare the required capability via configuration
- You attach that configuration to a task environment
- Tasks decorated with that environment automatically gain access to the capability
This makes it easy to swap execution backends or introduce distributed compute incrementally without rewriting workflows.
External services
External service integrations allow Flyte to interact with third-party services in a structured, first-class way.
These integrations typically handle:
- Authentication and credentials
- API lifecycle management
- Standardized interfaces for task authors
Supported external integration integrations
| Plugin | Description | Common use cases |
|---|---|---|
| OpenAI | Drop-in replacement for OpenAI FunctionTool |
Agentic workflows |
| Weights and Biases | Weights & Biases integration | Experiment tracking and hyperparameter tuning |
Connectors
Connectors are stateless, long‑running services that receive execution requests via gRPC and then submit work to external (or internal) systems. Each connector runs as its own Kubernetes deployment, and is triggered when a Flyte task of the matching type is executed.
Although they normally run inside the control plane, you can also run connectors locally as long as the required secrets/credentials are present,because connectors are just Python services that can be spawned in‑process.
Connectors are designed to scale horizontally and reduce load on the core Flyte backend because they execute outside the core system. This decoupling makes connectors efficient, resilient and easy to iterate on. You can even test them locally without modifying backend configuration, which reduces friction during development.
Creating a new connector
If none of the existing connectors meet your needs, you can build your own.
Connectors communicate via Protobuf, so in theory they can be implemented in any language. Today, only Python connectors are supported.
Async connector interface
To implement a new async connector, extend AsyncConnector and implement the following methods, all of which must be idempotent:
| Method | Purpose |
|---|---|
create |
Launch the external job (via REST, gRPC, SDK, or other API) |
get |
Fetch current job state (return job status or output) |
delete |
Delete / cancel the external job |
To test the connector locally, the connector task should inherit from AsyncConnectorExecutorMixin. This mixin simulates how the Flyte 2 system executes asynchronous connector tasks, making it easier to validate your connector implementation before deploying it.
Example: Model training connector
The following example implements a connector that launches a model training job on an external training service.
import typing
from dataclasses import dataclass
import httpx
from flyte.connectors import AsyncConnector, Resource, ResourceMeta
from flyteidl2.core.execution_pb2 import TaskExecution, TaskLog
from flyteidl2.core.tasks_pb2 import TaskTemplate
from google.protobuf import json_format
@dataclass
class ModelTrainJobMeta(ResourceMeta):
job_id: str
endpoint: str
class ModelTrainingConnector(AsyncConnector):
"""
Example connector that launches a ML model training job on an external training service.
POST → launch training job
GET → poll training progress
DELETE → cancel training job
"""
name = "Model Training Connector"
task_type_name = "external_model_training"
metadata_type = ModelTrainJobMeta
async def create(
self,
task_template: TaskTemplate,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
**kwargs,
) -> ModelTrainJobMeta:
"""
Submit training job via POST.
Response returns job_id we later use in get().
"""
custom = json_format.MessageToDict(task_template.custom) if task_template.custom else None
async with httpx.AsyncClient() as client:
r = await client.post(
custom["endpoint"],
json={"dataset_uri": inputs["dataset_uri"], "epochs": inputs["epochs"]},
)
r.raise_for_status()
return ModelTrainJobMeta(job_id=r.json()["job_id"], endpoint=custom["endpoint"])
async def get(self, resource_meta: ModelTrainJobMeta, **kwargs) -> Resource:
"""
Poll external API until training job finishes.
Must be safe to call repeatedly.
"""
async with httpx.AsyncClient() as client:
r = await client.get(f"{resource_meta.endpoint}/{resource_meta.job_id}")
if r.status_code != 200:
return Resource(phase=TaskExecution.RUNNING)
data = r.json()
if data["status"] == "finished":
return Resource(
phase=TaskExecution.SUCCEEDED,
log_links=[TaskLog(name="training-dashboard", uri=f"https://example-mltrain.com/train/{resource_meta.job_id}")],
outputs={"results": data["results"]},
)
return Resource(phase=TaskExecution.RUNNING)
async def delete(self, resource_meta: ModelTrainJobMeta, **kwargs):
"""
Optionally call DELETE on external API.
Safe even if job already completed.
"""
async with httpx.AsyncClient() as client:
await client.delete(f"{resource_meta.endpoint}/{resource_meta.job_id}")To actually use this connector, you must also define a task whose task_type matches the connector.
import flyte.io
from typing import Any, Dict, Optional
from flyte.extend import TaskTemplate
from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.models import NativeInterface, SerializationContext
class ModelTrainTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "external_model_training"
def __init__(
self,
name: str,
endpoint: str,
**kwargs,
):
super().__init__(
name=name,
interface=NativeInterface(
inputs={"epochs": int, "dataset_uri": str},
outputs={"results": flyte.io.File},
),
task_type=self._TASK_TYPE,
**kwargs,
)
self.endpoint = endpoint
def custom_config(self, sctx: SerializationContext) -> Optional[Dict[str, Any]]:
return {"endpoint": self.endpoint}Here is an example of how to use the ModelTrainTask:
import flyte
env = flyte.TaskEnvironment(name="hello_world", resources=flyte.Resources(memory="250Mi"))
model_train_task = ModelTrainTask(
name="model_train",
endpoint="https://example-mltrain.com",
)
@env.task
def data_prep() -> str:
return "gs://my-bucket/dataset.csv"
@env.task
def train_model(epochs: int) -> flyte.io.File:
dataset_uri = data_prep()
return model_train_task(epochs=epochs, dataset_uri=dataset_uri)Build a custom connector image
Build a custom image when you’re ready to deploy your connector to your cluster. To build the Docker image for your connector, run the following script:
import asyncio
from flyte import Image
from flyte.extend import ImageBuildEngine
async def build_flyte_connector_image(
registry: str, name: str, builder: str = "local"
):
"""
Build the SDK default connector image, optionally overriding
the container registry and image name.
Args:
registry: e.g. "ghcr.io/my-org" or "123456789012.dkr.ecr.us-west-2.amazonaws.com".
name: e.g. "my-connector".
builder: e.g. "local" or "remote".
"""
default_image = Image.from_debian_base(registry=registry, name=name).with_pip_packages(
"flyteintegrations-connectors[bigquery]", pre=True
)
await ImageBuildEngine.build(default_image, builder=builder)
if __name__ == "__main__":
print("Building connector image...")
asyncio.run(build_flyte_connector_image(registry="<YOUR_REGISTRY>", name="flyte-connectors", builder="local"))