Artifacts#

class flytekit.Artifact(project=None, domain=None, name=None, version=None, time_partitioned=False, time_partition=None, time_partition_granularity=None, partition_keys=None, partitions=None)#

An Artifact is effectively just a metadata layer on top of data that exists in Flyte. Most data of interest will be the output of tasks and workflows. The other category is user uploads.

This Python class has limited purpose, as a way for users to specify that tasks/workflows create Artifacts and the manner (i.e. name, partitions) in which they are created.

Control creation parameters at task/workflow execution time

@task
def t1() -> Annotated[nn.Module, Artifact(name="my.artifact.name")]:
    ...
Parameters:
  • project (Optional[str])

  • domain (Optional[str])

  • name (Optional[str])

  • version (Optional[str])

  • time_partitioned (bool)

  • time_partition (Optional[TimePartition])

  • time_partition_granularity (Optional[Granularity])

  • partition_keys (Optional[List[str]])

  • partitions (Optional[Union[Partitions, Dict[str, str]]])

create_from(o, card=None, *args, **kwargs)#

This function allows users to declare partition values dynamically from the body of a task. Note that you’ll still need to annotate your task function output with the relevant Artifact object. Below, one of the partition values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve done so.

Pricing = Artifact(name=”pricing”, partition_keys=[“region”]) EstError = Artifact(name=”estimation_error”, partition_keys=[“dataset”], time_partitioned=True)

@task def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]:

df = get_pricing_results() dt = get_time() return Pricing.create_from(df, region=”dubai”), EstError.create_from(msq_error, dataset=”train”, time_partition=dt)

You can mix and match with the input syntax as well.

@task def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]:

… return RideCountData.create_from(df, time_partition=datetime.datetime.now())

Parameters:
  • o (O)

  • card (SerializableToString | None)

  • args (SerializableToString)

Return type:

O

embed_as_query(partition=None, bind_to_time_partition=None, expr=None, op=None)#

This should only be called in the context of a Trigger. The type of query this returns is different from the query() function. This type of query is used to reference the triggering artifact, rather than running a query. :param partition: Can embed a time partition :param bind_to_time_partition: Set to true if you want to bind to a time partition :param expr: Only valid if there’s a time partition. :param op: If expr is given, then op is what to do with it.

Parameters:
  • partition (str | None)

  • bind_to_time_partition (bool | None)

  • expr (str | None)

  • op (<google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x7f49ba9484d0> | None)

Return type:

ArtifactQuery

to_id_idl()#

Converts this object to the IDL representation. This is here instead of translator because it’s in the interface, a relatively simple proto object that’s exposed to the user.

Return type:

ArtifactID