Example code components#

The ML workflow example code contains a @workflow-decorated function made up of several tasks decorated with the @task decorator. The code also contains an ImageSpec block, and tasks with the enable_deck=True parameter set, which enables visualizations in the Union UI.

Note

You can find the full ML workflow example code on Github

Workflow#

The @workflow decorator indicates a function that defines a workflow. This function contains references to the tasks defined earlier in the code.

A workflow appears to be a Python function but is actually a DSL that only supports a subset of Python syntax and semantics.

When deployed to Union, the workflow function is “compiled” to construct the directed acyclic graph (DAG) of tasks, defining the order of execution of task pods and the data flow dependencies between them.

@workflow
def main(max_bins: int = 64) -> float:
    train, test = get_dataset()
    model = train_model(dataset=train, max_bins=max_bins)
    return evaluate_model(model=model, dataset=test)

Workflow parameters are available for configuration on the command line. In this example, the main workflow’s max_bins parameter can be set to a different value from the default:

$ union run --remote guide/first_workflow/ml_workflow/ml_workflow.py main --max_bins 128

@task and @workflow syntax

  • The @task and @workflow decorators will only work on functions at the top-level scope of the module.

  • You can invoke tasks and workflows as regular Python functions and even import and use them in other Python modules or scripts.

  • Task and workflow function signatures must be type-annotated with Python type hints.

  • Task and workflow functions must be invoked with keyword arguments.

Tasks#

The @task decorator indicates a Python function that defines a task. A task tasks some input and produces an output. When deployed to a Kubernetes cluster, each task runs in its own Kubernetes pod.

train_model#

The train_model task has the parameter requests set to Resources(cpu="3", mem="2Gi"), which is declarative infrastructure that allocates 3 CPUs and 2Gi of memory for the task. This task also has the container_image parameter set, which specifies the image (defined in an ImageSpec block) to use for the task.

@task(
    container_image=image,
    requests=Resources(cpu="3", mem="2Gi"),
)
def train_model(dataset: pd.DataFrame, max_bins: int) -> BaseEstimator:
    X_train, y_train = dataset.drop("species", axis="columns"), dataset["species"]
    hist = HistGradientBoostingClassifier(
        random_state=0, max_bins=max_bins, categorical_features="from_dtype"
    )
    return hist.fit(X_train, y_train)

get_dataset#

get_dataset returns the training and test data as pandas DataFrames. cache=True means that the task output is cached by Union. With caching, future executions of the workflow will use the cached data instead of running the task again.

@task(
    cache=True,
    cache_version="4",
    container_image=image,
    requests=Resources(cpu="2", mem="2Gi"),
)
def get_dataset() -> tuple[pd.DataFrame, pd.DataFrame]:
    dataset = fetch_openml(name="penguins", version=1, as_frame=True)
    train_dataset, test_dataset = train_test_split(
        dataset.frame, random_state=0, stratify=dataset.target
    )
    return train_dataset, test_dataset

Note

For a full list of task parameters, see Task parameters.

ImageSpec#

The ImageSpec object is used to define the container image that will run the tasks in the workflow. The tasks require custom dependencies, which are included in the ImageSpec:

image = ImageSpec(
    requirements=Path(__file__).parent / "requirements.txt",
)

The requirements parameter is set to the location of the requirements file that will be used to build the image. In this case, the requirements file is the same one that is used to configure the local development environment.

You build the image on your local machine based on the ImageSpec definition and push it to the specified registry. When Union executes the workflow, it will pull the image from the registry and use it for the designated task.

Visualizations#

The Flyte Deck feature allows you to enable visualizations in Union’s user interface by setting enable_deck=True in a task’s parameters:

@task(
    container_image=image,
    enable_deck=True,
    requests=Resources(cpu="2", mem="2Gi"),
)
def evaluate_model(model: BaseEstimator, dataset: pd.DataFrame) -> float:
    ctx = current_context()

    X_test, y_test = dataset.drop("species", axis="columns"), dataset["species"]
    y_pred = model.predict(X_test)

    # Plot confusion matrix in deck
    fig, ax = plt.subplots()
    ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)

    metrics_deck = Deck("Metrics")
    metrics_deck.append(_convert_fig_into_html(fig))

    # Add classification report
    report = html.escape(classification_report(y_test, y_pred))
    html_report = dedent(
        f"""\
    <h2>Classification report</h2>
    <pre>{report}</pre>"""
    )
    metrics_deck.append(html_report)

    ctx.decks.insert(0, metrics_deck)

    return accuracy_score(y_test, y_pred)