First project

This example demonstrates a two-stage RAG (Retrieval-Augmented Generation) pattern: an offline embedding pipeline that processes and stores quotes, followed by an online serving application that enables semantic search.

Concepts covered

  • TaskEnvironment for defining task execution environments
  • Dir artifacts for passing directories between tasks
  • AppEnvironment for serving applications
  • Parameter and RunOutput for connecting apps to task outputs
  • Semantic search with sentence-transformers and ChromaDB

Part 1: The embedding pipeline

The embedding pipeline fetches quotes from a public API, creates vector embeddings using sentence-transformers, and stores them in a ChromaDB database.

Setting up the environment

The TaskEnvironment defines the execution environment for all tasks in the pipeline. It specifies the container image, required packages, and resource allocations:

embed.py
# Define the embedding environment
embedding_env = flyte.TaskEnvironment(
    name="quote-embedding",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "sentence-transformers>=2.2.0",
        "chromadb>=0.4.0",
        "requests>=2.31.0",
    ),
    resources=flyte.Resources(cpu=2, memory="4Gi"),
    cache="auto",
)
# {{end-fragment}}


@embedding_env.task
async def fetch_quotes() -> list[dict]:
    """
    Fetch quotes from a public quotes API.

    Returns:
        List of quote dictionaries with 'quote' and 'author' fields.
    """
    import requests

    print("Fetching quotes from API...")
    response = requests.get("https://dummyjson.com/quotes?limit=100")
    response.raise_for_status()

    data = response.json()
    quotes = data.get("quotes", [])

    print(f"Fetched {len(quotes)} quotes")
    return quotes
# {{end-fragment}}


@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
    """
    Create embeddings for quotes and store them in ChromaDB.

    Args:
        quotes: List of quote dictionaries with 'quote' and 'author' fields.

    Returns:
        Directory containing the ChromaDB database.
    """
    import chromadb
    from sentence_transformers import SentenceTransformer

    print("Loading embedding model...")
    model = SentenceTransformer("all-MiniLM-L6-v2")

    # Create ChromaDB in a temporary directory
    db_dir = tempfile.mkdtemp()
    print(f"Creating ChromaDB at {db_dir}...")

    client = chromadb.PersistentClient(path=db_dir)
    collection = client.create_collection(
        name="quotes",
        metadata={"hnsw:space": "cosine"},
    )

    # Prepare data for insertion
    texts = [q["quote"] for q in quotes]
    ids = [str(q["id"]) for q in quotes]
    metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]

    print(f"Embedding {len(texts)} quotes...")
    embeddings = model.encode(texts, show_progress_bar=True)

    # Add to collection
    collection.add(
        ids=ids,
        embeddings=embeddings.tolist(),
        metadatas=metadatas,
        documents=texts,
    )

    print(f"Stored {len(quotes)} quotes in ChromaDB")
    return await Dir.from_local(db_dir)
# {{end-fragment}}


@embedding_env.task
async def embedding_pipeline() -> Dir:
    """
    Main pipeline that fetches quotes and creates embeddings.

    Returns:
        Directory containing the ChromaDB database with quote embeddings.
    """
    print("Starting embedding pipeline...")

    # Fetch quotes from API
    quotes = await fetch_quotes()

    # Create embeddings and store in ChromaDB
    db_dir = await embed_quotes(quotes)

    print("Embedding pipeline complete!")
    return db_dir
# {{end-fragment}}


if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(embedding_pipeline)
    print(f"Embedding run URL: {run.url}")
    run.wait()
    print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}

The environment uses:

  • Image.from_debian_base() to create a container with Python 3.12
  • with_pip_packages() to install sentence-transformers and ChromaDB
  • Resources to request 2 CPUs and 4GB of memory
  • cache="auto" to enable automatic caching of task outputs

Fetching data

The fetch_quotes task retrieves quotes from a public API:

embed.py
@embedding_env.task
async def fetch_quotes() -> list[dict]:
    """
    Fetch quotes from a public quotes API.

    Returns:
        List of quote dictionaries with 'quote' and 'author' fields.
    """
    import requests

    print("Fetching quotes from API...")
    response = requests.get("https://dummyjson.com/quotes?limit=100")
    response.raise_for_status()

    data = response.json()
    quotes = data.get("quotes", [])

    print(f"Fetched {len(quotes)} quotes")
    return quotes
# {{end-fragment}}


@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
    """
    Create embeddings for quotes and store them in ChromaDB.

    Args:
        quotes: List of quote dictionaries with 'quote' and 'author' fields.

    Returns:
        Directory containing the ChromaDB database.
    """
    import chromadb
    from sentence_transformers import SentenceTransformer

    print("Loading embedding model...")
    model = SentenceTransformer("all-MiniLM-L6-v2")

    # Create ChromaDB in a temporary directory
    db_dir = tempfile.mkdtemp()
    print(f"Creating ChromaDB at {db_dir}...")

    client = chromadb.PersistentClient(path=db_dir)
    collection = client.create_collection(
        name="quotes",
        metadata={"hnsw:space": "cosine"},
    )

    # Prepare data for insertion
    texts = [q["quote"] for q in quotes]
    ids = [str(q["id"]) for q in quotes]
    metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]

    print(f"Embedding {len(texts)} quotes...")
    embeddings = model.encode(texts, show_progress_bar=True)

    # Add to collection
    collection.add(
        ids=ids,
        embeddings=embeddings.tolist(),
        metadatas=metadatas,
        documents=texts,
    )

    print(f"Stored {len(quotes)} quotes in ChromaDB")
    return await Dir.from_local(db_dir)
# {{end-fragment}}


@embedding_env.task
async def embedding_pipeline() -> Dir:
    """
    Main pipeline that fetches quotes and creates embeddings.

    Returns:
        Directory containing the ChromaDB database with quote embeddings.
    """
    print("Starting embedding pipeline...")

    # Fetch quotes from API
    quotes = await fetch_quotes()

    # Create embeddings and store in ChromaDB
    db_dir = await embed_quotes(quotes)

    print("Embedding pipeline complete!")
    return db_dir
# {{end-fragment}}


if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(embedding_pipeline)
    print(f"Embedding run URL: {run.url}")
    run.wait()
    print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}

This task demonstrates:

  • Async task definition with async def
  • Returning structured data (list[dict]) from a task
  • Using the @embedding_env.task decorator to associate the task with its environment

Creating embeddings

The embed_quotes task creates vector embeddings and stores them in ChromaDB:

embed.py
@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
    """
    Create embeddings for quotes and store them in ChromaDB.

    Args:
        quotes: List of quote dictionaries with 'quote' and 'author' fields.

    Returns:
        Directory containing the ChromaDB database.
    """
    import chromadb
    from sentence_transformers import SentenceTransformer

    print("Loading embedding model...")
    model = SentenceTransformer("all-MiniLM-L6-v2")

    # Create ChromaDB in a temporary directory
    db_dir = tempfile.mkdtemp()
    print(f"Creating ChromaDB at {db_dir}...")

    client = chromadb.PersistentClient(path=db_dir)
    collection = client.create_collection(
        name="quotes",
        metadata={"hnsw:space": "cosine"},
    )

    # Prepare data for insertion
    texts = [q["quote"] for q in quotes]
    ids = [str(q["id"]) for q in quotes]
    metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]

    print(f"Embedding {len(texts)} quotes...")
    embeddings = model.encode(texts, show_progress_bar=True)

    # Add to collection
    collection.add(
        ids=ids,
        embeddings=embeddings.tolist(),
        metadatas=metadatas,
        documents=texts,
    )

    print(f"Stored {len(quotes)} quotes in ChromaDB")
    return await Dir.from_local(db_dir)
# {{end-fragment}}


@embedding_env.task
async def embedding_pipeline() -> Dir:
    """
    Main pipeline that fetches quotes and creates embeddings.

    Returns:
        Directory containing the ChromaDB database with quote embeddings.
    """
    print("Starting embedding pipeline...")

    # Fetch quotes from API
    quotes = await fetch_quotes()

    # Create embeddings and store in ChromaDB
    db_dir = await embed_quotes(quotes)

    print("Embedding pipeline complete!")
    return db_dir
# {{end-fragment}}


if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(embedding_pipeline)
    print(f"Embedding run URL: {run.url}")
    run.wait()
    print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}

Key points:

  • Uses the all-MiniLM-L6-v2 model from sentence-transformers (runs on CPU)
  • Creates a persistent ChromaDB database with cosine similarity
  • Returns a Dir artifact that captures the entire database directory
  • The await Dir.from_local() call uploads the directory to artifact storage

Orchestrating the pipeline

The main pipeline task composes the individual tasks:

embed.py
@embedding_env.task
async def embedding_pipeline() -> Dir:
    """
    Main pipeline that fetches quotes and creates embeddings.

    Returns:
        Directory containing the ChromaDB database with quote embeddings.
    """
    print("Starting embedding pipeline...")

    # Fetch quotes from API
    quotes = await fetch_quotes()

    # Create embeddings and store in ChromaDB
    db_dir = await embed_quotes(quotes)

    print("Embedding pipeline complete!")
    return db_dir
# {{end-fragment}}


if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(embedding_pipeline)
    print(f"Embedding run URL: {run.url}")
    run.wait()
    print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}

Running the pipeline

To run the embedding pipeline:

embed.py
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(embedding_pipeline)
    print(f"Embedding run URL: {run.url}")
    run.wait()
    print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}

uv run embed.py

The pipeline will:

  1. Fetch 100 quotes from the API
  2. Create embeddings using sentence-transformers
  3. Store everything in a ChromaDB database
  4. Return the database as a Dir artifact

Part 2: The serving application

The serving application provides a Streamlit web interface for searching quotes using the embeddings created by the pipeline.

App environment configuration

The AppEnvironment defines how the application runs:

serve.py
# Define the app environment
env = AppEnvironment(
    name="quote-search-app",
    description="Semantic search over quotes using embeddings",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit>=1.41.0",
        "sentence-transformers>=2.2.0",
        "chromadb>=0.4.0",
    ),
    args=["streamlit", "run", "app.py", "--server.port", "8080"],
    port=8080,
    resources=flyte.Resources(cpu=2, memory="4Gi"),
    parameters=[
        Parameter(
            name="quotes_db",
            value=RunOutput(task_name="quote-embedding.embedding_pipeline", type="directory"),
            download=True,
            env_var="QUOTES_DB_PATH",
        ),
    ],
    include=["app.py"],
    requires_auth=False,
)
# {{end-fragment}}

if __name__ == "__main__":
    flyte.init_from_config()

    # Deploy the quote search app
    print("Deploying quote search app...")
    deployment = flyte.serve(env)
    print(f"App deployed at: {deployment.url}")
# {{end-fragment}}

Key configuration:

  • args specifies the command to run the Streamlit app
  • port=8080 exposes the application on port 8080
  • parameters defines inputs to the app:
    • RunOutput connects to the embedding pipeline’s output
    • download=True downloads the directory to local storage
    • env_var="QUOTES_DB_PATH" makes the path available to the app
  • include=["app.py"] bundles the Streamlit app with the deployment

The Streamlit application

The app loads the ChromaDB database using the path from the environment variable:

app.py
# Load the database
@st.cache_resource
def load_db():
    db_path = os.environ.get("QUOTES_DB_PATH")
    if not db_path:
        st.error("QUOTES_DB_PATH environment variable not set")
        st.stop()

    client = chromadb.PersistentClient(path=db_path)
    collection = client.get_collection("quotes")
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return collection, model


collection, model = load_db()
# {{end-fragment}}

# Search interface
query = st.text_input("Enter your search query:", placeholder="e.g., love, wisdom, success")
top_k = st.slider("Number of results:", min_value=1, max_value=20, value=5)

col1, col2 = st.columns([1, 1])
with col1:
    search_button = st.button("Search", type="primary", use_container_width=True)
with col2:
    random_button = st.button("Random Quote", use_container_width=True)

st.divider()
# {{end-fragment}}

if search_button and query:
    # Encode query and search
    query_embedding = model.encode([query])[0].tolist()
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
    )

    if results["documents"] and results["documents"][0]:
        for i, (doc, metadata, distance) in enumerate(
            zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
        ):
            similarity = 1 - distance  # Convert distance to similarity
            st.markdown(f'**{i+1}.** ___STRING_15___')
            st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
            st.write("")
    else:
        st.info("No results found.")
# {{end-fragment}}

elif random_button:
    # Get a random quote from the collection
    all_data = collection.get(limit=100)
    if all_data["documents"]:
        idx = random.randint(0, len(all_data["documents"]) - 1)
        quote = all_data["documents"][idx]
        author = all_data["metadatas"][idx]["author"]
        st.markdown(f'**___STRING_24___**')
        st.caption(f"— {author}")
# {{end-fragment}}

elif search_button and not query:
    st.warning("Please enter a search query.")

The search interface provides a text input and result count slider:

app.py
# Search interface
query = st.text_input("Enter your search query:", placeholder="e.g., love, wisdom, success")
top_k = st.slider("Number of results:", min_value=1, max_value=20, value=5)

col1, col2 = st.columns([1, 1])
with col1:
    search_button = st.button("Search", type="primary", use_container_width=True)
with col2:
    random_button = st.button("Random Quote", use_container_width=True)

st.divider()
# {{end-fragment}}

if search_button and query:
    # Encode query and search
    query_embedding = model.encode([query])[0].tolist()
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
    )

    if results["documents"] and results["documents"][0]:
        for i, (doc, metadata, distance) in enumerate(
            zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
        ):
            similarity = 1 - distance  # Convert distance to similarity
            st.markdown(f'**{i+1}.** ___STRING_11___')
            st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
            st.write("")
    else:
        st.info("No results found.")
# {{end-fragment}}

elif random_button:
    # Get a random quote from the collection
    all_data = collection.get(limit=100)
    if all_data["documents"]:
        idx = random.randint(0, len(all_data["documents"]) - 1)
        quote = all_data["documents"][idx]
        author = all_data["metadatas"][idx]["author"]
        st.markdown(f'**___STRING_20___**')
        st.caption(f"— {author}")
# {{end-fragment}}

elif search_button and not query:
    st.warning("Please enter a search query.")

When the user searches, the app encodes the query and finds similar quotes:

app.py
if search_button and query:
    # Encode query and search
    query_embedding = model.encode([query])[0].tolist()
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
    )

    if results["documents"] and results["documents"][0]:
        for i, (doc, metadata, distance) in enumerate(
            zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
        ):
            similarity = 1 - distance  # Convert distance to similarity
            st.markdown(f'**{i+1}.** "{doc}"')
            st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
            st.write("")
    else:
        st.info("No results found.")
# {{end-fragment}}

elif random_button:
    # Get a random quote from the collection
    all_data = collection.get(limit=100)
    if all_data["documents"]:
        idx = random.randint(0, len(all_data["documents"]) - 1)
        quote = all_data["documents"][idx]
        author = all_data["metadatas"][idx]["author"]
        st.markdown(f'**___STRING_14___**')
        st.caption(f"— {author}")
# {{end-fragment}}

elif search_button and not query:
    st.warning("Please enter a search query.")

The app also includes a random quote feature:

app.py
elif random_button:
    # Get a random quote from the collection
    all_data = collection.get(limit=100)
    if all_data["documents"]:
        idx = random.randint(0, len(all_data["documents"]) - 1)
        quote = all_data["documents"][idx]
        author = all_data["metadatas"][idx]["author"]
        st.markdown(f'**___STRING_5___**')
        st.caption(f"— {author}")
# {{end-fragment}}

elif search_button and not query:
    st.warning("Please enter a search query.")

Deploying the app

To deploy the quote search application:

serve.py
if __name__ == "__main__":
    flyte.init_from_config()

    # Deploy the quote search app
    print("Deploying quote search app...")
    deployment = flyte.serve(env)
    print(f"App deployed at: {deployment.url}")
# {{end-fragment}}

uv run serve.py

The app will be deployed and automatically connected to the embedding pipeline’s output through the RunOutput parameter.

Key takeaways

  1. Two-stage RAG pattern: Separate offline embedding creation from online serving for better resource utilization and cost efficiency.

  2. Dir artifacts: Use Dir to pass entire directories (like databases) between tasks and to serving applications.

  3. RunOutput: Connect applications to task outputs declaratively, enabling automatic data flow between pipelines and apps.

  4. CPU-friendly embeddings: The all-MiniLM-L6-v2 model runs efficiently on CPU, making this pattern accessible without GPU resources.