Serving a model#

In this section we use a Union app to serve a model created with a Union workflow.

Example app#

In this example we first use a Union workflow to train a model and output it as a Union Artifact. We then use a Union app to serve the model using FastAPI.

In a local directory, create the following files:

├── app.py
├── main.py
└── wf.py

App declaration#

app.py#
"""A Union app that uses FastAPI to serve model created by a Union workflow."""

import os
import union

SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency, 
# in addition to any other dependencies needed by the app code.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
image_spec = union.ImageSpec(
    name="union-serve-sklearn-fastapi",
    packages=["union-runtime>=0.1.10", "scikit-learn==1.5.2", "fastapi[standard]"],
    registry=os.getenv("REGISTRY"),
)

# The `App` declaration.
# Uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared 
# in the `include` parameter, in this case, `main.py`.
# Input arttifacts are declared in the `inputs` parameter
fast_api_app = union.app.App(
    name="simple-fastapi-sklearn",
    inputs=[
        union.app.Input(
            name="sklearn_model",
            value=SklearnModel.query(),
            download=True,
        )
    ],
    container_image=image_spec,
    limits=Resources(cpu="1", mem="1Gi"),
    port=8082,
    include=["main.py"],
    args=["fastapi", "dev", "--port", "8082"],
)

main.py#

main.py#
"""Set up the FastAPI app."""

from contextlib import asynccontextmanager

import joblib
from fastapi import FastAPI
import union_runtime

ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    model_file = union_runtime.get_input("sklearn_model")
    ml_models["model"] = joblib.load(model_file)
    yield


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float, y: float) -> float:
    result = ml_models["model"]([[x, y]])
    return {"result": result}

wf.py#

wf.py#
"""A Union workflow that trains a model and evaluates it."""

from pathlib import Path
from typing import Annotated

import joblib
import numpy as np
import union
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.model_selection import train_test_split

# Declare the `Artifact`.
SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that runs the tasks.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
image_spec = union.ImageSpec(
    packages=["scikit-learn==1.5.2"],
    registry=os.getenv("REGISTRY"),
)


# The `task` that generates the example data
# and splits it into training and testing sets.
@union.task(
    cache=True,
    cache_version="2",
    container_image=image_spec,
)
def load_data() -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Generate the example using sklearn.datasets.make_regression and split it into training and testing sets."""
    X, y = make_regression(n_samples=3)
    return train_test_split(X, y, random_state=42)


# The `task` that trains a `RandomForestRegressor` model.
@union.task(
    limits=union.Resources(cpu="2", mem="4Gi"),
    cache=True,
    cache_version="2",
    container_image=image_spec,
)
def train_model(X_train: np.ndarray, y_train: np.ndarray) -> Annotated[union.FlyteFile, SklearnModel]:
    """Train a RandomForestRegressor model and save it as a file."""
    working_dir = Path(union.current_context().working_directory)
    model_file = working_dir / "model.joblib"

    rf = RandomForestRegressor().fit(X_train, y_train)
    joblib.dump(rf, model_file)
    return model_file


# The `task` that evaluates the model.
@union.task(
    container_image=image_spec,
    limits=union.Resources(cpu="2", mem="2Gi"),
    cache=True,
    cache_version="2",
)
def evaluate_model(model: union.FlyteFile, X_test: np.ndarray, y_test: np.ndarray) -> float:
    """Evaluate the model using mean absolute percentage error."""
    model_ = joblib.load(model.download())
    y_pred = model_.predict(X_test)
    return float(mean_absolute_percentage_error(y_test, y_pred))

# The `workflow` that trains a model and evaluates it.
@union.workflow
def wf() -> float:
    """Train a model and evaluate it."""
    X_train, X_test, y_train, y_test = load_data()
    model = train_model(X_train=X_train, y_train=y_train)
    return evaluate_model(model=model, X_test=X_test, y_test=y_test)

Run the example#

To run this example you will need to register and run the workflow first:

Run the workflow#
$ union run --remote wf.py wf

The workflow first uses sklearn.datasets.make_regression to generate a random regression problem. It then trains a model against that problem, evaluates the model and saves it as a file. The file is defined as a Union Artifact, enabling it to be retrieved later by the serving app for display.

Once the workflow has completed, you can deploy the app:

$ union deploy apps app.py simple-fastapi-sklearn