Materializing artifacts

Materializing artifacts#

You can materialize an artifact by executing the task or workflow that emits the artifact.

In the example below, to materialize the BasicArtifact artifact, the t1 task must be executed. The wf workflow runs the t1 task three times with different values for the key1 partition each time. Note that each time t1 is executed, it emits a new version of the BasicArtifact artifact.

from datetime import datetime

import pandas as pd
from flytekit.core.artifact import Artifact, Inputs, Granularity
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated


BasicArtifact = Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR,
    partition_keys=["key1"]
)


@task
def t1(
    key1: str, date: datetime
) -> Annotated[pd.DataFrame, BasicArtifact(key1=Inputs.key1)]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(
        df,
        time_partition=date
    )


@workflow
def wf():
    run_date = datetime.now()
    values = ["value1", "value2", "value3"]
    for value in values:
        t1(key1=value, date=run_date)

Note

You can also materialize an artifact by executing the create_artifact method of UnionRemote. For more information, see the UnionRemote documentation.