Skip to content

Connecting workflows with artifact event triggers

In the following example, we define an upstream workflow and a downstream workflow, and define a trigger in a launch plan to connect the two workflows via an artifact event.

Upstream artifact and workflow definition

First we define an upstream artifact and a workflow that emits a new version of Upstream when executed:

py
from datetime import datetime

import pandas as pd
from flytekit import LaunchPlan
from flytekit.core.artifact import Artifact, Inputs
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated
from unionai.artifacts import OnArtifact

UpstreamArtifact = Artifact(
    name="my_upstream_artifact",
    time_partitioned=True,
    partition_keys=["key1"],
)


@task
def upstream_t1(key1: str) -> Annotated[pd.DataFrame,
                                        UpstreamArtifact(key1=Inputs.key1)]:
    dt = datetime.now()
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return UpstreamArtifact.create_from(my_df, key1=key1,
                                        time_partition=dt)


@workflow
def upstream_wf() -> pd.DataFrame:
    return upstream_t1(key1="value1")

Artifact event definition

Next we define the artifact event that will link the upstream and downstream workflows together:

py
on_upstream_artifact = OnArtifact(
    trigger_on=UpstreamArtifact,
)

Downstream workflow definition

Then we define the downstream task and workflow that will be triggered when the upstream artifact is created:

py
@task
def downstream_t1():
    print("Downstream task triggered")


@workflow
def downstream_wf():
    downstream_t1()

Launch plan with trigger definition

Finally we create a launch plan with a trigger set to an OnArtifact object to link the two workflows via the Upstream artifact. The trigger will initiate an execution of the downstream downstream_wf workflow upon the creation of a new version of the Upstream artifact.

py
downstream_triggered = LaunchPlan.create(
    "downstream_with_trigger_lp",
    downstream_wf,
    trigger=on_upstream_artifact
)

Note

The OnArtifact object must be attached to a launch plan in order for the launch plan to be triggered by the creation of a new version of the artifact.