Serving a model from a Workflow#
In this section, we create a Union app to serve a scikit-learn model created by a Union workflow.
Example app#
In this example, we first use a Union workflow to train a model and output it as a Union Artifact
.
We then use a Union app to serve the model using FastAPI
.
In a local directory, create the following files:
├── app.py
├── main.py
└── train_wf.py
App configuration#
First, we declare the resources, runtime image, and the Scikit-learn model required by the FastAPI app.
"""A Union app that uses FastAPI to serve model created by a Union workflow."""
import os
import union
SklearnModel = union.Artifact(name="sklearn-model")
# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency,
# in addition to any other dependencies needed by the app code.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image_spec = union.ImageSpec(
name="union-serve-sklearn-fastapi",
packages=["union-runtime>=0.1.10", "scikit-learn==1.5.2", "fastapi[standard]"],
registry=os.getenv("REGISTRY"),
)
# The `App` declaration.
# Uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared
# in the `include` parameter, in this case, `main.py`.
# Input artifacts are declared in the `inputs` parameter
fast_api_app = union.app.App(
name="simple-fastapi-sklearn",
inputs=[
union.app.Input(
value=SklearnModel.query(),
download=True,
env_var="SKLEARN_MODEL",
)
],
container_image=image_spec,
limits=union.Resources(cpu="1", mem="1Gi"),
port=8082,
include=["main.py"],
args="fastapi dev --port 8082",
)
Note that the Artifact is provided as an Input
to the App definition. With download=True
,
the model is downloaded to the container’s working directory. The full local path to the
model is set to SKLEARN_MODEL
by the runtime.
FastAPI App#
During startup, the FastAPI app loads the model using the SKLEARN_MODEL
environment
variable. Then it serves an endpoint
"""Set up the FastAPI app."""
from contextlib import asynccontextmanager
import os
import joblib
from fastapi import FastAPI
import union_runtime
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
model_file = os.getenv("SKLEARN_MODEL")
ml_models["model"] = joblib.load(model_file)
yield
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float, y: float) -> float:
result = ml_models["model"]([[x, y]])
return {"result": result}
Training workflow#
The training workflow trains a random forest regression and saves it to an Union
Artifact
.
"""A Union workflow that trains a model."""
import os
from pathlib import Path
from typing import Annotated
import joblib
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor
import union
# Declare the `Artifact`.
SklearnModel = union.Artifact(name="sklearn-model")
# The `ImageSpec` for the container that runs the tasks.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image_spec = union.ImageSpec(
packages=["scikit-learn==1.5.2"],
registry=os.getenv("REGISTRY"),
)
# The `task` that trains a `RandomForestRegressor` model.
@union.task(
limits=union.Resources(cpu="2", mem="2Gi"),
container_image=image_spec,
)
def train_model() -> Annotated[union.FlyteFile, SklearnModel]:
"""Train a RandomForestRegressor model and save it as a file."""
X, y = make_regression(n_features=2, random_state=42)
working_dir = Path(union.current_context().working_directory)
model_file = working_dir / "model.joblib"
rf = RandomForestRegressor().fit(X, y)
joblib.dump(rf, model_file)
return model_file
Run the example#
To run this example you will need to register and run the workflow first:
$ union run --remote train_wf.py train_model
This task trains a RandomForestRegressor
, saves it to a file, and uploads it to
a Union Artifact
. This Union
Artifact is retrieved layer by the FastAPI app for
serving the model.
Once the workflow has completed, you can deploy the app:
$ union deploy apps app.py simple-fastapi-sklearn
The output displays the console URL and endpoint for the FastAPI App:
✨ Deploying Application: simple-fastapi-sklearn
🔎 Console URL: https://<union-host-url>/org/...
[Status] Pending: OutOfDate: The Configuration is still working to reflect the latest desired
specification.
[Status] Pending: IngressNotConfigured: Ingress has not yet been reconciled.
[Status] Pending: Uninitialized: Waiting for load balancer to be ready
[Status] Started: Service is ready
🚀 Deployed Endpoint: https://<unique-subhost>.apps.<union-host-url>
You can see the Swagger docs of the FastAPI endpoint, by going to /docs
: