Connecting workflows with artifact event triggers#

In the following example, we define an upstream workflow and a downstream workflow, and define a trigger in a launch plan to connect the two workflows via an artifact event.

Imports#

First we import the required packages:

from datetime import datetime

import pandas as pd
from flytekit import LaunchPlan
from flytekit.core.artifact import Artifact, Inputs
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated
from unionai.artifacts import OnArtifact

Upstream artifact and workflow definition#

Then we define an upstream artifact and a workflow that emits a new version of Upstream when executed:

UpstreamArtifact = Artifact(
    name="my_upstream_artifact",
    time_partitioned=True,
    partition_keys=["key1"],
)

@task
def upstream_t1(key1: str) -> Annotated[pd.DataFrame,
                                        UpstreamArtifact(key1=Inputs.key1)]:
    dt = datetime.now()
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return UpstreamArtifact.create_from(my_df, key1=key1,
                                        time_partition=dt)

@workflow
def upstream_wf() -> pd.DataFrame:
    return upstream_t1(key1="value1")

Artifact event definition#

Next we define the artifact event that will link the upstream and downstream workflows together:

on_upstream_artifact = OnArtifact(
    trigger_on=UpstreamArtifact,
)

Downstream workflow definition#

Then we define the downstream task and workflow that will be triggered when the upstream artifact is created:

@task
def downstream_t1():
    print("Downstream task triggered")

@workflow
def downstream_wf():
    downstream_t1()

Launch plan with trigger definition#

Finally we create a launch plan with a trigger set to an OnArtifact object to link the two workflows via the Upstream artifact. The trigger will initiate an execution of the downstream downstream_wf workflow upon the creation of a new version of the Upstream artifact.

downstream_triggered = LaunchPlan.create(
    "downstream_with_trigger_lp",
    downstream_wf,
    trigger=on_upstream_artifact
)

Note

The OnArtifact object must be attached to a launch plan in order for the launch plan to be triggered by the creation of a new version of the artifact.

Full example code#

Here is the full example code file:

from datetime import datetime

import pandas as pd
from flytekit import LaunchPlan
from flytekit.core.artifact import Artifact, Inputs
from flytekit.core.task import task
from flytekit.core.workflow import workflow
from typing_extensions import Annotated
from unionai.artifacts import OnArtifact

UpstreamArtifact = Artifact(
    name="my_upstream_artifact",
    time_partitioned=True,
    partition_keys=["key1"],
)

@task
def upstream_t1(key1: str) -> Annotated[pd.DataFrame,
                                        UpstreamArtifact(key1=Inputs.key1)]:
    dt = datetime.now()
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return UpstreamArtifact.create_from(my_df, key1=key1,
                                        time_partition=dt)

@workflow
def upstream_wf() -> pd.DataFrame:
    return upstream_t1(key1="value1")

on_upstream_artifact = OnArtifact(
    trigger_on=UpstreamArtifact,
)

@task
def downstream_t1():
    print("Downstream task triggered")

@workflow
def downstream_wf():
    downstream_t1()

downstream_triggered = LaunchPlan.create(
    "downstream_with_trigger_lp",
    downstream_wf,
    trigger=on_upstream_artifact
)