Snowflake agent example

Snowflake agent example#

This example shows how to use the SnowflakeTask to execute a query in Snowflake.

The output of a Snowflake task can be configured to be represented as a StructuredDataset. This StructuredDataset serves as a reference to the results generated by the Snowflake query, allowing for seamless integration and manipulation of the query data within subsequent processes or analytical workflows.

  1. Instantiate a flytekitplugins.snowflake.SnowflakeTask to execute a query.

  2. Incorporate flytekitplugins.snowflake.SnowflakeConfig within the task to define the appropriate configuration.

from flytekit import kwtypes, workflow
from flytekitplugins.snowflake import SnowflakeConfig, SnowflakeTask

snowflake_task_no_io = SnowflakeTask(
    name="sql.snowflake.no_io",
    inputs={},
    query_template="SELECT 1",
    output_schema_type=None,
    task_config=SnowflakeConfig(
        account="<SNOWFLAKE_ACCOUNT_ID>",
        user="<SNOWFLAKE_USER>",
        database="SNOWFLAKE_SAMPLE_DATA",
        schema="TPCH_SF1000",
        warehouse="COMPUTE_WH",
    ),
)

You can parameterize the query to filter results for a specific country. This country will be provided as user input, using a nation key to specify it.

snowflake_task_templatized_query = SnowflakeTask(
    name="sql.snowflake.w_io",
    # Define inputs as well as their types that can be used to customize the query.
    inputs=kwtypes(nation_key=int),
    output_schema_type=StructuredDataset,
    task_config=SnowflakeConfig(
        account="<SNOWFLAKE_ACCOUNT_ID>",
        user="<SNOWFLAKE_USER>",
        database="SNOWFLAKE_SAMPLE_DATA",
        schema="TPCH_SF1000",
        warehouse="COMPUTE_WH",
    ),
    query_template="SELECT * from CUSTOMER where C_NATIONKEY =  %(nation_key)s limit 100",
)

Insert data into a Snowflake table.

snowflake_task_insert_query = SnowflakeTask(
    name="insert-query",
    inputs=kwtypes(id=int, name=str, age=int),
    task_config=SnowflakeConfig(
        user="FLYTE",
        account="FLYTE_SNOFLAKE_ACCOUNT",
        database="FLYTEAGENT",
        schema="PUBLIC",
        warehouse="COMPUTE_WH",
    ),
    query_template="""
            INSERT INTO FLYTEAGENT.PUBLIC.TEST (ID, NAME, AGE)
            VALUES (%(id)s, %(name)s, %(age)s);
            """,
)

Note

Make sure to create a secret for the python task to access the Snowflake table.

unionai create secret snowflake --value-file <SNOWFLAKE_PRIVATE_KEY>
image = ImageSpec(
    registry="ghcr.io/unionai",
    packages=["flytekitplugins-snowflake", "pyarrow", "pandas"],
)

@task(container_image=image, secret_requests=[Secret(key="snowflake")])
def print_table(input_sd: StructuredDataset):
    df = input_sd.open(pd.DataFrame).all()
    print(df)


@workflow
def snowflake_wf(nation_key: int):
    sd = snowflake_task_templatized_query(nation_key=nation_key)
    print_table(sd)

# To review the query results, access the Snowflake console at
# https://<SNOWFLAKE_ACCOUNT_ID>.snowflakecomputing.com/console#/monitoring/queries/detail
# You can also execute the task and workflow locally.
if __name__ == "__main__":
    print(snowflake_task_no_io())
    print(snowflake_wf(nation_key=10))

Writing data to Snowflake#

You can also create a pandas DataFrame and save it as a table within your Snowflake data warehouse to enable further high-performance data analysis and storage.

image = ImageSpec(
    registry="ghcr.io/unionai",
    packages=["flytekitplugins-snowflake", "pyarrow", "pandas"],
)

@task(container_image=image, secret_requests=[Secret(key="snowflake")])
def write_table() -> StructuredDataset:
    df = pd.DataFrame({
        "ID": [1, 2, 3],
        "NAME": ["union", "union", "union"],
        "AGE": [30, 30, 30]
    })
    return StructuredDataset(dataframe=df, uri="snowflake://<USER>/<ACCOUNT>/<WAREHOUSE>/<DATABASE>/<SCHEMA>/<TABLE>")