
As with most type systems, Python has primitives, container types like maps and tuples, and support for user-defined structures. However, while there’s a rich variety of dataframe classes (Pandas, Spark, Pandera, etc.), there’s no native Python type that represents a dataframe in the abstract. This is the gap that the StructuredDataset type is meant to fill. It offers the following benefits:

  • Eliminate boilerplate code you would otherwise need to write to serialize/deserialize from file objects into dataframe instances,

  • Eliminate additional inputs/outputs that convey metadata around the format of the tabular data held in those files,

  • Add flexibility around how dataframe files are loaded,

  • Offer a range of dataframe specific functionality - enforce compatibility of different schemas (not only at compile time, but also runtime since type information is carried along in the literal), store third-party schema definitions, and potentially in the future, render sample data, provide summary statistics, etc.


To use the StructuredDataset type, import pandas and define a task that returns a Pandas Dataframe. Union will detect the Pandas DataFrame return signature and convert the interface for the task to the StructuredDataset type.


This example demonstrates how to work with a structured dataset using Union entities.


To use the StructuredDataset type, you only need to import pandas. The other imports specified below are only necessary for this specific example.

To begin, import the dependencies for the example:

import typing
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import union
from flytekit.models import literals
from flytekit.models.literals import StructuredDatasetMetadata
from flytekit.types.structured.structured_dataset import (
from typing_extensions import Annotated

Define a task that returns a Pandas DataFrame.

def generate_pandas_df(a: int) -> pd.DataFrame:
    return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]})

Using this simplest form, however, the user is not able to set the additional dataframe information alluded to above,

  • Column type information

  • Serialized byte format

  • Storage driver and location

  • Additional third party schema information

This is by design as we wanted the default case to suffice for the majority of use-cases, and to require as few changes to existing code as possible. Specifying these is simple, however, and relies on Python variable annotations, which is designed explicitly to supplement types with arbitrary metadata.

Column type information#

If you want to extract a subset of actual columns of the dataframe and specify their types for type validation, you can just specify the column names and their types in the structured dataset type annotation.

First, initialize column types you want to extract from the StructuredDataset.

all_cols = union.kwtypes(Name=str, Age=int, Height=int)
col = union.kwtypes(Age=int)

Define a task that opens a structured dataset by calling all(). When you invoke all() with pandas.DataFrame, the Union engine downloads the parquet file on S3, and deserializes it to pandas.DataFrame. Keep in mind that you can invoke open() with any dataframe type that’s supported or added to structured dataset. For instance, you can use pa.Table to convert the Pandas DataFrame to a PyArrow table.

def get_subset_pandas_df(df: Annotated[StructuredDataset, all_cols]) -> Annotated[StructuredDataset, col]:
    df =
    df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
    return StructuredDataset(dataframe=df)

def simple_sd_wf(a: int = 19) -> Annotated[StructuredDataset, col]:
    pandas_df = generate_pandas_df(a=a)
    return get_subset_pandas_df(df=pandas_df)

The code may result in runtime failures if the columns do not match. The input df has Name, Age and Height columns, whereas the output structured dataset will only have the Age column.

Serialized byte format#

You can use a custom serialization format to serialize your dataframes. Here’s how you can register the Pandas to CSV handler, which is already available, and enable the CSV serialization by annotating the structured dataset with the CSV format:

from flytekit.types.structured import register_csv_handlers
from flytekit.types.structured.structured_dataset import CSV


def pandas_to_csv(df: pd.DataFrame) -> Annotated[StructuredDataset, CSV]:
    return StructuredDataset(dataframe=df)

def pandas_to_csv_wf() -> Annotated[StructuredDataset, CSV]:
    pandas_df = generate_pandas_df(a=19)
    return pandas_to_csv(df=pandas_df)

Storage driver and location#

By default, the data will be written to the same place that all other pointer-types (FlyteFile, FlyteDirectory, etc.) are written to. This is controlled by the output data prefix option in Union which is configurable on multiple levels.

That is to say, in the simple default case, Union will,

  • Look up the default format for say, Pandas dataframes,

  • Look up the default storage location based on the raw output prefix setting,

  • Use these two settings to select an encoder and invoke it.

So what’s an encoder? To understand that, let’s look into how the structured dataset plugin works.

Inner workings of a structured dataset plugin#

Two things need to happen with any dataframe instance when interacting with Union:

  • Serialization/deserialization from/to the Python instance to bytes (in the format specified above).

  • Transmission/retrieval of those bits to/from somewhere.

Each structured dataset plugin (called encoder or decoder) needs to perform both of these steps. Union decides which of the loaded plugins to invoke based on three attributes:

  • The byte format

  • The storage location

  • The Python type in the task or workflow signature.

These three keys uniquely identify which encoder (used when converting a dataframe in Python memory to a Union value, e.g. when a task finishes and returns a dataframe) or decoder (used when hydrating a dataframe in memory from a Union value, e.g. when a task starts and has a dataframe input) to invoke.

However, it is awkward to require users to use typing.Annotated on every signature. Therefore, Union has a default byte-format for every Python dataframe type registered with .

The uri argument#

BigQuery uri allows you to load and retrieve data from cloud using the uri argument. The uri comprises of the bucket name and the filename prefixed with gs://. If you specify BigQuery uri for structured dataset, BigQuery creates a table in the location specified by the uri. The uri in structured dataset reads from or writes to S3, GCP, BigQuery or any storage.

Before writing DataFrame to a BigQuery table,

  1. Create a GCP account and create a service account.

  2. Create a project and add the GOOGLE_APPLICATION_CREDENTIALS environment variable to your .bashrc file.

  3. Create a dataset in your project.

Here’s how you can define a task that converts a pandas DataFrame to a BigQuery table:

def pandas_to_bq() -> StructuredDataset:
    df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
    return StructuredDataset(dataframe=df, uri="gs://<BUCKET_NAME>/<FILE_NAME>")

Replace BUCKET_NAME with the name of your GCS bucket and FILE_NAME with the name of the file the dataframe should be copied to.

Note that no format was specified in the structured dataset constructor, or in the signature. So how did the BigQuery encoder get invoked?#

This is because the stock BigQuery encoder is loaded into Union with an empty format. The Union StructuredDatasetTransformerEngine interprets that to mean that it is a generic encoder (or decoder) and can work across formats, if a more specific format is not found.

And here’s how you can define a task that converts the BigQuery table to a pandas DataFrame:

def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:


Union creates a table inside the dataset in the project upon BigQuery query execution.

How to return multiple dataframes from a task?#

For instance, how would a task return say two dataframes:

  • The first dataframe be written to BigQuery and serialized by one of their libraries,

  • The second needs to be serialized to CSV and written at a specific location in GCS different from the generic pointer-data bucket

If you want the default behavior (which is itself configurable based on which plugins are loaded), you can work just with your current raw dataframe classes.

def t1() -> typing.Tuple[StructuredDataset, StructuredDataset]:
   return StructuredDataset(df1, uri="bq://project:flyte.table"), \
          StructuredDataset(df2, uri="gs://auxiliary-bucket/data")

If you want to customize the Union interaction behavior, you’ll need to wrap your dataframe in a StructuredDataset wrapper object.

How to define a custom structured dataset plugin?#

StructuredDataset ships with an encoder and a decoder that handles the conversion of a Python value to a Union literal and vice-versa, respectively. Here is a quick demo showcasing how one might build a NumPy encoder and decoder, enabling the use of a 2D NumPy array as a valid type within structured datasets.

NumPy encoder#

Extend StructuredDatasetEncoder and implement the encode function. The encode function converts NumPy array to an intermediate format (parquet file format in this case).

class NumpyEncodingHandler(StructuredDatasetEncoder):
    def encode(
        ctx: union.FlyteContext,
        structured_dataset: StructuredDataset,
        structured_dataset_type: union.StructuredDatasetType,
    ) -> literals.StructuredDataset:
        df = typing.cast(np.ndarray, structured_dataset.dataframe)
        name = ["col" + str(i) for i in range(len(df))]
        table = pa.Table.from_arrays(df, name)
        path = ctx.file_access.get_random_remote_directory()
        local_dir = ctx.file_access.get_random_local_directory()
        local_path = Path(local_dir) / f"{0:05}"
        pq.write_table(table, str(local_path))
        ctx.file_access.upload_directory(local_dir, path)
        return literals.StructuredDataset(

NumPy decoder#

Extend StructuredDatasetDecoder and implement the decode() function. The decode() function converts the parquet file to a numpy.ndarray.

class NumpyDecodingHandler(StructuredDatasetDecoder):
    def decode(
        ctx: union.FlyteContext,
        flyte_value: literals.StructuredDataset,
        current_task_metadata: StructuredDatasetMetadata,
    ) -> np.ndarray:
        local_dir = ctx.file_access.get_random_local_directory()
        ctx.file_access.get_data(flyte_value.uri, local_dir, is_multipart=True)
        table = pq.read_table(local_dir)
        return table.to_pandas().to_numpy()

NumPy renderer#

Create a default renderer for numpy array, then Union will use this renderer to display schema of NumPy array on the Flyte deck.

class NumpyRenderer:
    def to_html(self, df: np.ndarray) -> str:
        assert isinstance(df, np.ndarray)
        name = ["col" + str(i) for i in range(len(df))]
        table = pa.Table.from_arrays(df, name)
        return pd.DataFrame(table.schema).to_html(index=False)

In the end, register the encoder, decoder and renderer with the StructuredDatasetTransformerEngine. Specify the Python type you want to register this encoder with (np.ndarray), the storage engine to register this against (if not specified, it is assumed to work for all the storage backends), and the byte format, which in this case is PARQUET.

StructuredDatasetTransformerEngine.register(NumpyEncodingHandler(np.ndarray, None, PARQUET))
StructuredDatasetTransformerEngine.register(NumpyDecodingHandler(np.ndarray, None, PARQUET))
StructuredDatasetTransformerEngine.register_renderer(np.ndarray, NumpyRenderer())

You can now use numpy.ndarray to deserialize the parquet file to NumPy and serialize a task’s output (NumPy array) to a parquet file.

def generate_pd_df_with_str() -> pd.DataFrame:
    return pd.DataFrame({"Name": ["Tom", "Joseph"]})

def to_numpy(sd: StructuredDataset) -> Annotated[StructuredDataset, None, PARQUET]:
    numpy_array =
    return StructuredDataset(dataframe=numpy_array)

def numpy_wf() -> Annotated[StructuredDataset, None, PARQUET]:
    return to_numpy(sd=generate_pd_df_with_str())


pyarrow raises an Expected bytes, got a 'int' object error when the dataframe contains integers.

You can run the code locally as follows:

if __name__ == "__main__":
    sd = simple_sd_wf()
    print(f"A simple Pandas dataframe workflow: {}")
    print(f"Using CSV as the serializer: {pandas_to_csv_wf().open(pd.DataFrame).all()}")
    print(f"NumPy encoder and decoder: {numpy_wf().open(np.ndarray).all()}")

The nested typed columns#

Like most storage formats (e.g. Avro, Parquet, and BigQuery), StructuredDataset support nested field structures.

data = [
        "company": "XYZ pvt ltd",
        "location": "London",
        "info": {"president": "Rakesh Kapoor", "contacts": {"email": "[email protected]", "tel": "9876543210"}},
        "company": "ABC pvt ltd",
        "location": "USA",
        "info": {"president": "Kapoor Rakesh", "contacts": {"email": "[email protected]", "tel": "0123456789"}},

class ContactsField:
    email: str
    tel: str

class InfoField:
    president: str
    contacts: ContactsField

class CompanyField:
    location: str
    info: InfoField
    company: str

MyArgDataset = Annotated[StructuredDataset, union.kwtypes(company=str)]
MyTopDataClassDataset = Annotated[StructuredDataset, CompanyField]
MyTopDictDataset = Annotated[StructuredDataset, {"company": str, "location": str}]

MyDictDataset = Annotated[StructuredDataset, union.kwtypes(info={"contacts": {"tel": str}})]
MyDictListDataset = Annotated[StructuredDataset, union.kwtypes(info={"contacts": {"tel": str, "email": str}})]
MySecondDataClassDataset = Annotated[StructuredDataset, union.kwtypes(info=InfoField)]
MyNestedDataClassDataset = Annotated[StructuredDataset, union.kwtypes(info=union.kwtypes(contacts=ContactsField))]

image = union.ImageSpec(packages=["pandas", "pyarrow", "pandas", "tabulate"], registry="")

def create_parquet_file() -> StructuredDataset:
    from tabulate import tabulate

    df = pd.json_normalize(data, max_level=0)
    print("original dataframe: \n", tabulate(df, headers="keys", tablefmt="psql"))

    return StructuredDataset(dataframe=df)

def print_table_by_arg(sd: MyArgDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MyArgDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MyDictDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def print_table_by_list_dict(sd: MyDictListDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MyDictListDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def print_table_by_top_dataclass(sd: MyTopDataClassDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MyTopDataClassDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def print_table_by_top_dict(sd: MyTopDictDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MyTopDictDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def print_table_by_second_dataclass(sd: MySecondDataClassDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MySecondDataClassDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def print_table_by_nested_dataclass(sd: MyNestedDataClassDataset) -> pd.DataFrame:
    from tabulate import tabulate

    t =
    print("MyNestedDataClassDataset dataframe: \n", tabulate(t, headers="keys", tablefmt="psql"))
    return t

def contacts_wf():
    sd = create_parquet_file()