Example code components#
The ML workflow example code contains a @workflow
-decorated function made up of several tasks decorated with the @task
decorator. The code also contains an ImageSpec
block, and tasks with the enable_deck=True
parameter set, which enables visualizations in the Union UI.
Note
You can find the full ML workflow example code on Github
Workflow#
The @workflow
decorator indicates a function that defines a workflow. This function contains references to the tasks defined earlier in the code.
A workflow appears to be a Python function but is actually a DSL that only supports a subset of Python syntax and semantics.
When deployed to Union, the workflow function is “compiled” to construct the directed acyclic graph (DAG) of tasks, defining the order of execution of task pods and the data flow dependencies between them.
@workflow
def main(max_bins: int = 64) -> float:
train, test = get_dataset()
model = train_model(dataset=train, max_bins=max_bins)
return evaluate_model(model=model, dataset=test)
Workflow parameters are available for configuration on the command line. In this example, the main
workflow’s max_bins
parameter can be set to a different value from the default:
$ union run --remote guide/first_workflow/ml_workflow/ml_workflow.py main --max_bins 128
@task
and @workflow
syntax
The
@task
and@workflow
decorators will only work on functions at the top-level scope of the module.You can invoke tasks and workflows as regular Python functions and even import and use them in other Python modules or scripts.
Task and workflow function signatures must be type-annotated with Python type hints.
Task and workflow functions must be invoked with keyword arguments.
Tasks#
The @task
decorator indicates a Python function that defines a task. A task tasks some input and produces an output. When deployed to a Kubernetes cluster, each task runs in its own Kubernetes pod.
train_model
#
The train_model
task has the parameter requests
set to Resources(cpu="3", mem="2Gi")
, which is declarative infrastructure that allocates 3 CPUs and 2Gi
of memory for the task. This task also has the container_image
parameter set, which specifies the image (defined in an ImageSpec
block) to use for the task.
@task(
container_image=image,
requests=Resources(cpu="3", mem="2Gi"),
)
def train_model(dataset: pd.DataFrame, max_bins: int) -> BaseEstimator:
X_train, y_train = dataset.drop("species", axis="columns"), dataset["species"]
hist = HistGradientBoostingClassifier(
random_state=0, max_bins=max_bins, categorical_features="from_dtype"
)
return hist.fit(X_train, y_train)
get_dataset
#
get_dataset
returns the training and test data as pandas DataFrames. cache=True
means that the task output is cached by Union. With caching, future executions of the workflow will use the cached data instead of running
the task again.
@task(
cache=True,
cache_version="4",
container_image=image,
requests=Resources(cpu="2", mem="2Gi"),
)
def get_dataset() -> tuple[pd.DataFrame, pd.DataFrame]:
dataset = fetch_openml(name="penguins", version=1, as_frame=True)
train_dataset, test_dataset = train_test_split(
dataset.frame, random_state=0, stratify=dataset.target
)
return train_dataset, test_dataset
Note
For a full list of task parameters, see Task parameters.
ImageSpec#
The ImageSpec
object is used to define the container image that will run the tasks in the workflow. The tasks require custom dependencies, which are included in the ImageSpec
:
image = ImageSpec(
requirements=Path(__file__).parent / "requirements.txt",
)
The requirements
parameter is set to the location of the requirements file that will be used to build the image. In this case, the requirements
file is the same one that is used to configure the local development environment.
You build the image on your local machine based on the ImageSpec
definition and push it to the specified registry. When Union executes the workflow, it will pull the image from the registry and use it for the designated task.
Visualizations#
The Flyte Deck feature allows you to enable visualizations in Union’s user interface by setting enable_deck=True
in a task’s parameters:
@task(
container_image=image,
enable_deck=True,
requests=Resources(cpu="2", mem="2Gi"),
)
def evaluate_model(model: BaseEstimator, dataset: pd.DataFrame) -> float:
ctx = current_context()
X_test, y_test = dataset.drop("species", axis="columns"), dataset["species"]
y_pred = model.predict(X_test)
# Plot confusion matrix in deck
fig, ax = plt.subplots()
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
metrics_deck = Deck("Metrics")
metrics_deck.append(_convert_fig_into_html(fig))
# Add classification report
report = html.escape(classification_report(y_test, y_pred))
html_report = dedent(
f"""\
<h2>Classification report</h2>
<pre>{report}</pre>"""
)
metrics_deck.append(html_report)
ctx.decks.insert(0, metrics_deck)
return accuracy_score(y_test, y_pred)