Declaring artifacts#

In order to define a task or workflow that emits an artifact, you must first declare the artifact and the keys for any partitions you wish for it to have. For the Artifact class parameters and methods, see the Flytekit Artifact documentation.

Basic artifact#

In the following example, an artifact called BasicTaskData is declared, along with a task that emits that artifact. Since it is a basic artifact, it doesn’t have any partitions:

import pandas as pd
from flytekit.core.artifact import Artifact
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated

BasicTaskData = Artifact(
    name="my_basic_artifact"
)


@task
def t1() -> Annotated[pd.DataFrame, BasicTaskData]:
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicTaskData.create_from(my_df)


@workflow
def wf() -> pd.DataFrame:
    return t1()

Time-partitioned artifact#

By default, time partitioning is not enabled for artifacts. To enable it, declare the artifact with time_partitioned set to True. You can optionally set the granularity for the time partition to MINUTE, HOUR, DAY, or MONTH; the default is DAY.

You must also pass a value to time_partition, which you can do at runtime or by binding time_partition to an input.

Passing a value to time_partition at runtime#

from datetime import datetime

import pandas as pd
from flytekit.core.artifact import Artifact, Granularity
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated

BasicArtifact = Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR
)


@task
def t1() -> Annotated[pd.DataFrame, BasicArtifact]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    dt = datetime.now()
    return BasicArtifact.create_from(df, time_partition=dt)


@workflow
def wf() -> pd.DataFrame:
    return t1()

Passing a value to time_partition by input#

from datetime import datetime

import pandas as pd
from flytekit.core.artifact import Artifact, Granularity
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated

BasicArtifact = Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR
)


@task
def t1(date: datetime)\
     -> Annotated[pd.DataFrame, BasicArtifact]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(df, time_partition=date)


@workflow
def wf(run_date: datetime):
    return t1(date=run_date)

Artifact with custom partition keys#

You can specify up to 10 custom partition keys when declaring an artifact. Custom partition keys can be set at runtime or be passed as inputs.

Passing a value to a custom partition key at runtime#

from datetime import datetime

import pandas as pd
from flytekit.core.artifact import Artifact, Inputs, Granularity
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated


BasicArtifact = Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR,
    partition_keys=["key1"]
)


@task
def t1(
    key1: str, date: datetime
) -> Annotated[pd.DataFrame, BasicArtifact(key1=Inputs.key1)]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(
        df,
        time_partition=date
    )


@workflow
def wf():
    run_date = datetime.now()
    values = ["value1", "value2", "value3"]
    for value in values:
        t1(key1=value, date=run_date)

Passing a value to a custom partition key by input#

from datetime import datetime

import pandas as pd
from flytekit.core.artifact import Artifact, Inputs, Granularity
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated


BasicArtifact = Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR,
    partition_keys=["key1"]
)


@task
def t1(
    key1: str, dt: datetime
) -> Annotated[pd.DataFrame, BasicArtifact(key1=Inputs.key1)]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(
        df,
        time_partition=dt,
        key1=key1
    )


@workflow
def wf(dt: datetime, val: str):
    t1(key1=val, dt=dt)

Artifact with model card example#

You can attach a model card with additional metadata to your artifact, formatted in Markdown:

import pandas as pd
from flytekit.core.artifact import Artifact
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from unionai.artifacts import ModelCard
from typing_extensions import Annotated

BasicArtifact = Artifact(name="my_basic_artifact")


def generate_md_contents(df: pd.DataFrame) -> str:
    contents = "# Dataset Card\n" "\n" "## Tabular Data\n"
    contents = contents + df.to_markdown()
    return contents


@task
def t1() -> Annotated[pd.DataFrame, BasicArtifact]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})

    return BasicArtifact.create_from(
        df,
        ModelCard(generate_md_contents(df))
    )


@workflow
def wf():
    t1()