Pydantic BaseModel#


You can put Dataclass and UnionTypes (FlyteFile, FlyteDirectory, FlyteSchema, and StructuredDataset) in a pydantic BaseModel.

To begin, import the necessary dependencies:

import os
import tempfile
import pandas as pd
from union
from flytekit.types.structured import StructuredDataset
from pydantic import BaseModel

Build your custom image with ImageSpec:

image_spec = union.ImageSpec(
    packages=["pandas", "pyarrow", "pydantic"],

Python types#

We define a pydantic basemodel with int, str and dict as the data types.

class Datum(BaseModel):
    x: int
    y: str
    z: dict[int, str]

You can send a pydantic basemodel between different tasks written in various languages, and input it through the Union console as raw JSON.


All variables in a data class should be annotated with their type. Failure to do will result in an error.

Once declared, a dataclass can be returned as an output or accepted as an input.

def stringify(s: int) -> Datum:
    A Pydantic model return will be treated as a single complex JSON return.
    return Datum(x=s, y=str(s), z={s: str(s)})

def add(x: Datum, y: Datum) -> Datum:
    return Datum(x=x.x + y.x, y=x.y + y.y, z=x.z)

Union types#

We also define a data class that accepts StructuredDataset, FlyteFile and FlyteDirectory.

class UnionTypes(BaseModel):
    dataframe: StructuredDataset
    file: union.FlyteFile
    directory: union.FlyteDirectory

def upload_data() -> UnionTypes:
    df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})

    temp_dir = tempfile.mkdtemp(prefix="union-")
    df.to_parquet(os.path.join(temp_dir, "df.parquet"))

    file_path = tempfile.NamedTemporaryFile(delete=False)
    file_path.write(b"Hello, World!")

    fs = FlyteTypes(
    return fs

def download_data(res: UnionTypes):
    expected_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
    actual_df =
    assert expected_df.equals(actual_df), "DataFrames do not match!"

    with open(res.file, "r") as f:
        assert == "Hello, World!", "File contents do not match!"

    assert os.listdir( == ["df.parquet"], "Directory contents do not match!"

A data class supports the usage of data associated with Python types, data classes, FlyteFile, FlyteDirectory and StructuredDataset.

We define a workflow that calls the tasks created above.

def basemodel_wf(x: int, y: int) -> (Datum, UnionTypes):
    o1 = add(x=stringify(s=x), y=stringify(s=y))
    o2 = upload_data()
    return o1, o2

You can run the workflow locally as follows:

if __name__ == "__main__":
    basemodel_wf(x=10, y=20)

To trigger a task that accepts a dataclass as an input with union run, you can provide a JSON file as an input:

union run \ \
  basemodel_wf --x 1 --y 2