Task types#
Task types include:
PythonFunctionTask
: This Python class represents the standard default task. It is the type that is created when you use the@task
decorator.ContainerTask
: This Python class represents a raw container. It allows you to install any image you like, giving you complete control of the task.Map tasks: The map task functionality enables you to run multiple copies of the same task across multiple containers in parallel.
Specialized plugin tasks: These include both specialized classes and specialized configurations of the
PythonFunctionTask
. They implement integrations with third-party systems.
PythonFunctionTask#
This is the task type that is created when you add the @task
decorator to a Python function.
It represents a Python function that will be run within a single container. For example::
@task
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
See the Python Function Task example.
This is the most common task variant and the one that, thus far, we have focused on in this documentation.
ContainerTask#
This task variant represents a raw container, with no assumptions made about what is running within it.
Here is an example of declaring a ContainerTask
:
greeting_task = ContainerTask(
name="echo_and_return_greeting",
image="alpine:latest",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(name=str),
outputs=kwtypes(greeting=str),
command=["/bin/sh", "-c", "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"],
)
The ContainerTask
enables you to include a task in your workflow that executes arbitrary code in any language, not just Python.
See the Container Task example.
Map tasks#
A map task allows you to execute many instances of a task within a single workflow node. This enables you to execute a task across a set of inputs without having to create a node for each input, resulting in significant performance improvements.
Map tasks find application in various scenarios, including:
When multiple inputs require running through the same code logic.
Processing multiple data batches concurrently.
Conducting hyperparameter optimization.
Just like normal tasks, map tasks are automatically parallelized to the extent possible given resources available in the cluster.
THRESHOLD = 11
@task
def detect_anomalies(data_point: int) -> bool:
return data_point > THRESHOLD
@workflow
def map_workflow(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]:
# Use the map task to apply the anomaly detection function to each data point
return map_task(detect_anomalies)(data_point=data)
Note
Map tasks can also map over launch plans. For more information and example code, see Mapping over launch plans.
For more details see Map Task example in the unionai-examples
repository and Map Tasks in the Flyte docs.
Specialized plugin task classes and configs#
Union supports a wide variety of plugin tasks.
Some of these are enabled as specialized task classes, others as specialized configurations of the default @task
(PythonFunctionTask
).
They enable things like:
Querying external databases (AWS Athena, BigQuery, DuckDB, SQL, Snowflake, Hive).
Executing specialized processing right in Union (Spark in virtual cluster, Dask in Virtual cluster, Sagemaker, Airflow, Modin, Ray, MPI and Horovod).
Handing off processing to external services(AWS Batch, Spark on Databricks, Ray on external cluster).
Data transformation(Great Expectations, DBT, Dolt, ONNX, Pandera).
Data tracking and presentation (MLFlow, Papermill).
See the Integration section of the Flyte documentation for examples.