Consuming artifacts#

Defining a workflow that consumes an artifact#

You can define a workflow that consumes an artifact by defining a query and passing it as an input to the consuming workflow.

The following code defines a query, data_query, that searches across all versions of BasicArtifact that match the partition values. This query binds parameters to the workflow’s key1 and time_partition inputs and returns the most recent version of the artifact:

from datetime import datetime

import pandas as pd
from flytekit.core.artifact import Artifact, Inputs
from flytekit.core.task import task
from flytekit.core.workflow import workflow


BasicArtifact = Artifact(
    name="my_basic_artifact"
)


@task
def t1(key1: str, dt: datetime, data: pd.DataFrame):
    print(f"key1: {key1}")
    print(f"Date: {dt}")
    print(f"Data retrieved from query: {data}")


data_query = BasicArtifact.query(
    time_partition=Inputs.dt,
    key1=Inputs.key1,
)


@workflow
def query_wf(
    key1: str,
    dt: datetime,
    data: pd.DataFrame = data_query
):
    t1(key1=key1, dt=dt, data=data)

You can also directly reference a particular artifact version in a query using the get() method:

data = BasicArtifact.get(<organization>/<domain>/BasicArtifact@<artifact-version>)

Note

For a full list of Artifact class methods, see the Flytekit Artifact documentation.

Launching a workflow that consumes an artifact#

To launch a workflow that consumes an artifact as one of its inputs, navigate to the workflow in the Union console and click Launch Workflow:

Launch workflow UI with artifact query

In the query_wf example, the workflow takes three inputs: key1, dt, and a BasicArtifact artifact query. In order to create the workflow execution, you would enter values for key1 and dt and click Launch. The artifacts service will supply the latest version of the BasicData artifact that meets the partition query criteria.

You can also override the artifact query from the launch form by clicking Override, directly supplying the input that the artifact references (in this case, a blob store URI), and clicking Launch:

Launch workflow UI with artifact query override