Google BigQuery agent example

Google BigQuery agent example#

This example shows how to use a BigQueryTask to execute a query.

import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
from typing_extensions import Annotated

# A simple query.
# Note that in order for registration to work properly, you'll need to give your
# BigQuery task a name that's unique across your project/domain for your deployment.
bigquery_task_no_io = BigQueryTask(
    query_template="SELECT 1",

def no_io_wf():
    return bigquery_task_no_io()

# Of course, in real world applications we are usually more interested in using BigQuery to query a dataset.
# In this case we use crypto_dogecoin data, which is a public dataset in BigQuery available at

# You can parameterize your query to filter results for a specific transaction version, provided as a user input specifying a version.
DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)]

bigquery_task_templatized_query = BigQueryTask(
    # Define inputs as well as their types that can be used to customize the query.
    query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",

# The StructuredDataset transformer can convert the query result to a pandas dataframe here.
# We can also change "pandas.dataframe" to "pyarrow.Table", and convert the result to an Arrow table.
def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame:

def full_bigquery_wf(version: int) -> pd.DataFrame:
    sd = bigquery_task_templatized_query(version=version)
    return convert_bq_table_to_pandas_dataframe(sd=sd)

After running the workflow, check the query result on BigQuery console.