Serving a model from a Workflow#

In this section, we create a Union app to serve a scikit-learn model created by a Union workflow.

Example app#

In this example, we first use a Union workflow to train a model and output it as a Union Artifact. We then use a Union app to serve the model using FastAPI.

In a local directory, create the following files:

├── app.py
├── main.py
└── train_wf.py

App configuration#

First, we declare the resources, runtime image, and the Scikit-learn model required by the FastAPI app.

app.py#
"""A Union app that uses FastAPI to serve model created by a Union workflow."""

import os
import union

SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency,
# in addition to any other dependencies needed by the app code.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image_spec = union.ImageSpec(
    name="union-serve-sklearn-fastapi",
    packages=["union-runtime>=0.1.10", "scikit-learn==1.5.2", "fastapi[standard]"],
    registry=os.getenv("REGISTRY"),
)

# The `App` declaration.
# Uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared
# in the `include` parameter, in this case, `main.py`.
# Input artifacts are declared in the `inputs` parameter
fast_api_app = union.app.App(
    name="simple-fastapi-sklearn",
    inputs=[
        union.app.Input(
            value=SklearnModel.query(),
            download=True,
            env_var="SKLEARN_MODEL",
        )
    ],
    container_image=image_spec,
    limits=union.Resources(cpu="1", mem="1Gi"),
    port=8082,
    include=["main.py"],
    args="fastapi dev --port 8082",
)

Note that the Artifact is provided as an Input to the App definition. With download=True, the model is downloaded to the container’s working directory. The full local path to the model is set to SKLEARN_MODEL by the runtime.

FastAPI App#

During startup, the FastAPI app loads the model using the SKLEARN_MODEL environment variable. Then it serves an endpoint

main.py#
"""Set up the FastAPI app."""

from contextlib import asynccontextmanager
import os

import joblib
from fastapi import FastAPI
import union_runtime

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    model_file = os.getenv("SKLEARN_MODEL")
    ml_models["model"] = joblib.load(model_file)
    yield

app = FastAPI(lifespan=lifespan)

@app.get("/predict")
async def predict(x: float, y: float) -> float:
    result = ml_models["model"]([[x, y]])
    return {"result": result}

Training workflow#

The training workflow trains a random forest regression and saves it to an Union Artifact.

train_wf.py#
"""A Union workflow that trains a model."""

import os
from pathlib import Path
from typing import Annotated

import joblib
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

import union

# Declare the `Artifact`.
SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that runs the tasks.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image_spec = union.ImageSpec(
    packages=["scikit-learn==1.5.2"],
    registry=os.getenv("REGISTRY"),
)

# The `task` that trains a `RandomForestRegressor` model.
@union.task(
    limits=union.Resources(cpu="2", mem="2Gi"),
    container_image=image_spec,
)
def train_model() -> Annotated[union.FlyteFile, SklearnModel]:
    """Train a RandomForestRegressor model and save it as a file."""
    X, y = make_regression(n_features=2, random_state=42)
    working_dir = Path(union.current_context().working_directory)
    model_file = working_dir / "model.joblib"

    rf = RandomForestRegressor().fit(X, y)
    joblib.dump(rf, model_file)
    return model_file

Run the example#

To run this example you will need to register and run the workflow first:

Run the workflow#
$ union run --remote train_wf.py train_model

This task trains a RandomForestRegressor, saves it to a file, and uploads it to a Union Artifact. This Union Artifact is retrieved layer by the FastAPI app for serving the model.

scikit-learn Artifact

Once the workflow has completed, you can deploy the app:

$ union deploy apps app.py simple-fastapi-sklearn

The output displays the console URL and endpoint for the FastAPI App:

 Deploying Application: simple-fastapi-sklearn
🔎 Console URL: https://<union-host-url>/org/...
[Status] Pending: OutOfDate: The Configuration is still working to reflect the latest desired
specification.
[Status] Pending: IngressNotConfigured: Ingress has not yet been reconciled.
[Status] Pending: Uninitialized: Waiting for load balancer to be ready
[Status] Started: Service is ready

🚀 Deployed Endpoint: https://<unique-subhost>.apps.<union-host-url>

You can see the Swagger docs of the FastAPI endpoint, by going to /docs:

scikit-learn FastAPI App