First project
This example demonstrates a two-stage RAG (Retrieval-Augmented Generation) pattern: an offline embedding pipeline that processes and stores quotes, followed by an online serving application that enables semantic search.
Concepts covered
TaskEnvironmentfor defining task execution environmentsDirartifacts for passing directories between tasksAppEnvironmentfor serving applicationsParameterandRunOutputfor connecting apps to task outputs- Semantic search with sentence-transformers and ChromaDB
Part 1: The embedding pipeline
The embedding pipeline fetches quotes from a public API, creates vector embeddings using sentence-transformers, and stores them in a ChromaDB database.
Setting up the environment
The TaskEnvironment defines the execution environment for all tasks in the pipeline.
It specifies the container image, required packages, and resource allocations:
# Define the embedding environment
embedding_env = flyte.TaskEnvironment(
name="quote-embedding",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"sentence-transformers>=2.2.0",
"chromadb>=0.4.0",
"requests>=2.31.0",
),
resources=flyte.Resources(cpu=2, memory="4Gi"),
cache="auto",
)
# {{end-fragment}}
@embedding_env.task
async def fetch_quotes() -> list[dict]:
"""
Fetch quotes from a public quotes API.
Returns:
List of quote dictionaries with 'quote' and 'author' fields.
"""
import requests
print("Fetching quotes from API...")
response = requests.get("https://dummyjson.com/quotes?limit=100")
response.raise_for_status()
data = response.json()
quotes = data.get("quotes", [])
print(f"Fetched {len(quotes)} quotes")
return quotes
# {{end-fragment}}
@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
"""
Create embeddings for quotes and store them in ChromaDB.
Args:
quotes: List of quote dictionaries with 'quote' and 'author' fields.
Returns:
Directory containing the ChromaDB database.
"""
import chromadb
from sentence_transformers import SentenceTransformer
print("Loading embedding model...")
model = SentenceTransformer("all-MiniLM-L6-v2")
# Create ChromaDB in a temporary directory
db_dir = tempfile.mkdtemp()
print(f"Creating ChromaDB at {db_dir}...")
client = chromadb.PersistentClient(path=db_dir)
collection = client.create_collection(
name="quotes",
metadata={"hnsw:space": "cosine"},
)
# Prepare data for insertion
texts = [q["quote"] for q in quotes]
ids = [str(q["id"]) for q in quotes]
metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]
print(f"Embedding {len(texts)} quotes...")
embeddings = model.encode(texts, show_progress_bar=True)
# Add to collection
collection.add(
ids=ids,
embeddings=embeddings.tolist(),
metadatas=metadatas,
documents=texts,
)
print(f"Stored {len(quotes)} quotes in ChromaDB")
return await Dir.from_local(db_dir)
# {{end-fragment}}
@embedding_env.task
async def embedding_pipeline() -> Dir:
"""
Main pipeline that fetches quotes and creates embeddings.
Returns:
Directory containing the ChromaDB database with quote embeddings.
"""
print("Starting embedding pipeline...")
# Fetch quotes from API
quotes = await fetch_quotes()
# Create embeddings and store in ChromaDB
db_dir = await embed_quotes(quotes)
print("Embedding pipeline complete!")
return db_dir
# {{end-fragment}}
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(embedding_pipeline)
print(f"Embedding run URL: {run.url}")
run.wait()
print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}
The environment uses:
Image.from_debian_base()to create a container with Python 3.12with_pip_packages()to install sentence-transformers and ChromaDBResourcesto request 2 CPUs and 4GB of memorycache="auto"to enable automatic caching of task outputs
Fetching data
The fetch_quotes task retrieves quotes from a public API:
@embedding_env.task
async def fetch_quotes() -> list[dict]:
"""
Fetch quotes from a public quotes API.
Returns:
List of quote dictionaries with 'quote' and 'author' fields.
"""
import requests
print("Fetching quotes from API...")
response = requests.get("https://dummyjson.com/quotes?limit=100")
response.raise_for_status()
data = response.json()
quotes = data.get("quotes", [])
print(f"Fetched {len(quotes)} quotes")
return quotes
# {{end-fragment}}
@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
"""
Create embeddings for quotes and store them in ChromaDB.
Args:
quotes: List of quote dictionaries with 'quote' and 'author' fields.
Returns:
Directory containing the ChromaDB database.
"""
import chromadb
from sentence_transformers import SentenceTransformer
print("Loading embedding model...")
model = SentenceTransformer("all-MiniLM-L6-v2")
# Create ChromaDB in a temporary directory
db_dir = tempfile.mkdtemp()
print(f"Creating ChromaDB at {db_dir}...")
client = chromadb.PersistentClient(path=db_dir)
collection = client.create_collection(
name="quotes",
metadata={"hnsw:space": "cosine"},
)
# Prepare data for insertion
texts = [q["quote"] for q in quotes]
ids = [str(q["id"]) for q in quotes]
metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]
print(f"Embedding {len(texts)} quotes...")
embeddings = model.encode(texts, show_progress_bar=True)
# Add to collection
collection.add(
ids=ids,
embeddings=embeddings.tolist(),
metadatas=metadatas,
documents=texts,
)
print(f"Stored {len(quotes)} quotes in ChromaDB")
return await Dir.from_local(db_dir)
# {{end-fragment}}
@embedding_env.task
async def embedding_pipeline() -> Dir:
"""
Main pipeline that fetches quotes and creates embeddings.
Returns:
Directory containing the ChromaDB database with quote embeddings.
"""
print("Starting embedding pipeline...")
# Fetch quotes from API
quotes = await fetch_quotes()
# Create embeddings and store in ChromaDB
db_dir = await embed_quotes(quotes)
print("Embedding pipeline complete!")
return db_dir
# {{end-fragment}}
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(embedding_pipeline)
print(f"Embedding run URL: {run.url}")
run.wait()
print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}
This task demonstrates:
- Async task definition with
async def - Returning structured data (
list[dict]) from a task - Using the
@embedding_env.taskdecorator to associate the task with its environment
Creating embeddings
The embed_quotes task creates vector embeddings and stores them in ChromaDB:
@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
"""
Create embeddings for quotes and store them in ChromaDB.
Args:
quotes: List of quote dictionaries with 'quote' and 'author' fields.
Returns:
Directory containing the ChromaDB database.
"""
import chromadb
from sentence_transformers import SentenceTransformer
print("Loading embedding model...")
model = SentenceTransformer("all-MiniLM-L6-v2")
# Create ChromaDB in a temporary directory
db_dir = tempfile.mkdtemp()
print(f"Creating ChromaDB at {db_dir}...")
client = chromadb.PersistentClient(path=db_dir)
collection = client.create_collection(
name="quotes",
metadata={"hnsw:space": "cosine"},
)
# Prepare data for insertion
texts = [q["quote"] for q in quotes]
ids = [str(q["id"]) for q in quotes]
metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]
print(f"Embedding {len(texts)} quotes...")
embeddings = model.encode(texts, show_progress_bar=True)
# Add to collection
collection.add(
ids=ids,
embeddings=embeddings.tolist(),
metadatas=metadatas,
documents=texts,
)
print(f"Stored {len(quotes)} quotes in ChromaDB")
return await Dir.from_local(db_dir)
# {{end-fragment}}
@embedding_env.task
async def embedding_pipeline() -> Dir:
"""
Main pipeline that fetches quotes and creates embeddings.
Returns:
Directory containing the ChromaDB database with quote embeddings.
"""
print("Starting embedding pipeline...")
# Fetch quotes from API
quotes = await fetch_quotes()
# Create embeddings and store in ChromaDB
db_dir = await embed_quotes(quotes)
print("Embedding pipeline complete!")
return db_dir
# {{end-fragment}}
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(embedding_pipeline)
print(f"Embedding run URL: {run.url}")
run.wait()
print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}
Key points:
- Uses the
all-MiniLM-L6-v2model from sentence-transformers (runs on CPU) - Creates a persistent ChromaDB database with cosine similarity
- Returns a
Dirartifact that captures the entire database directory - The
await Dir.from_local()call uploads the directory to artifact storage
Orchestrating the pipeline
The main pipeline task composes the individual tasks:
@embedding_env.task
async def embedding_pipeline() -> Dir:
"""
Main pipeline that fetches quotes and creates embeddings.
Returns:
Directory containing the ChromaDB database with quote embeddings.
"""
print("Starting embedding pipeline...")
# Fetch quotes from API
quotes = await fetch_quotes()
# Create embeddings and store in ChromaDB
db_dir = await embed_quotes(quotes)
print("Embedding pipeline complete!")
return db_dir
# {{end-fragment}}
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(embedding_pipeline)
print(f"Embedding run URL: {run.url}")
run.wait()
print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}
Running the pipeline
To run the embedding pipeline:
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(embedding_pipeline)
print(f"Embedding run URL: {run.url}")
run.wait()
print(f"Embedding complete! Database directory: {run.outputs()}")
# {{end-fragment}}
uv run embed.pyThe pipeline will:
- Fetch 100 quotes from the API
- Create embeddings using sentence-transformers
- Store everything in a ChromaDB database
- Return the database as a
Dirartifact
Part 2: The serving application
The serving application provides a Streamlit web interface for searching quotes using the embeddings created by the pipeline.
App environment configuration
The AppEnvironment defines how the application runs:
# Define the app environment
env = AppEnvironment(
name="quote-search-app",
description="Semantic search over quotes using embeddings",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"streamlit>=1.41.0",
"sentence-transformers>=2.2.0",
"chromadb>=0.4.0",
),
args=["streamlit", "run", "app.py", "--server.port", "8080"],
port=8080,
resources=flyte.Resources(cpu=2, memory="4Gi"),
parameters=[
Parameter(
name="quotes_db",
value=RunOutput(task_name="quote-embedding.embedding_pipeline", type="directory"),
download=True,
env_var="QUOTES_DB_PATH",
),
],
include=["app.py"],
requires_auth=False,
)
# {{end-fragment}}
if __name__ == "__main__":
flyte.init_from_config()
# Deploy the quote search app
print("Deploying quote search app...")
deployment = flyte.serve(env)
print(f"App deployed at: {deployment.url}")
# {{end-fragment}}
Key configuration:
argsspecifies the command to run the Streamlit appport=8080exposes the application on port 8080parametersdefines inputs to the app:RunOutputconnects to the embedding pipeline’s outputdownload=Truedownloads the directory to local storageenv_var="QUOTES_DB_PATH"makes the path available to the app
include=["app.py"]bundles the Streamlit app with the deployment
The Streamlit application
The app loads the ChromaDB database using the path from the environment variable:
# Load the database
@st.cache_resource
def load_db():
db_path = os.environ.get("QUOTES_DB_PATH")
if not db_path:
st.error("QUOTES_DB_PATH environment variable not set")
st.stop()
client = chromadb.PersistentClient(path=db_path)
collection = client.get_collection("quotes")
model = SentenceTransformer("all-MiniLM-L6-v2")
return collection, model
collection, model = load_db()
# {{end-fragment}}
# Search interface
query = st.text_input("Enter your search query:", placeholder="e.g., love, wisdom, success")
top_k = st.slider("Number of results:", min_value=1, max_value=20, value=5)
col1, col2 = st.columns([1, 1])
with col1:
search_button = st.button("Search", type="primary", use_container_width=True)
with col2:
random_button = st.button("Random Quote", use_container_width=True)
st.divider()
# {{end-fragment}}
if search_button and query:
# Encode query and search
query_embedding = model.encode([query])[0].tolist()
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
)
if results["documents"] and results["documents"][0]:
for i, (doc, metadata, distance) in enumerate(
zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
):
similarity = 1 - distance # Convert distance to similarity
st.markdown(f'**{i+1}.** ___STRING_15___')
st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
st.write("")
else:
st.info("No results found.")
# {{end-fragment}}
elif random_button:
# Get a random quote from the collection
all_data = collection.get(limit=100)
if all_data["documents"]:
idx = random.randint(0, len(all_data["documents"]) - 1)
quote = all_data["documents"][idx]
author = all_data["metadatas"][idx]["author"]
st.markdown(f'**___STRING_24___**')
st.caption(f"— {author}")
# {{end-fragment}}
elif search_button and not query:
st.warning("Please enter a search query.")
The search interface provides a text input and result count slider:
# Search interface
query = st.text_input("Enter your search query:", placeholder="e.g., love, wisdom, success")
top_k = st.slider("Number of results:", min_value=1, max_value=20, value=5)
col1, col2 = st.columns([1, 1])
with col1:
search_button = st.button("Search", type="primary", use_container_width=True)
with col2:
random_button = st.button("Random Quote", use_container_width=True)
st.divider()
# {{end-fragment}}
if search_button and query:
# Encode query and search
query_embedding = model.encode([query])[0].tolist()
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
)
if results["documents"] and results["documents"][0]:
for i, (doc, metadata, distance) in enumerate(
zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
):
similarity = 1 - distance # Convert distance to similarity
st.markdown(f'**{i+1}.** ___STRING_11___')
st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
st.write("")
else:
st.info("No results found.")
# {{end-fragment}}
elif random_button:
# Get a random quote from the collection
all_data = collection.get(limit=100)
if all_data["documents"]:
idx = random.randint(0, len(all_data["documents"]) - 1)
quote = all_data["documents"][idx]
author = all_data["metadatas"][idx]["author"]
st.markdown(f'**___STRING_20___**')
st.caption(f"— {author}")
# {{end-fragment}}
elif search_button and not query:
st.warning("Please enter a search query.")
When the user searches, the app encodes the query and finds similar quotes:
if search_button and query:
# Encode query and search
query_embedding = model.encode([query])[0].tolist()
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
)
if results["documents"] and results["documents"][0]:
for i, (doc, metadata, distance) in enumerate(
zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
):
similarity = 1 - distance # Convert distance to similarity
st.markdown(f'**{i+1}.** "{doc}"')
st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
st.write("")
else:
st.info("No results found.")
# {{end-fragment}}
elif random_button:
# Get a random quote from the collection
all_data = collection.get(limit=100)
if all_data["documents"]:
idx = random.randint(0, len(all_data["documents"]) - 1)
quote = all_data["documents"][idx]
author = all_data["metadatas"][idx]["author"]
st.markdown(f'**___STRING_14___**')
st.caption(f"— {author}")
# {{end-fragment}}
elif search_button and not query:
st.warning("Please enter a search query.")
The app also includes a random quote feature:
elif random_button:
# Get a random quote from the collection
all_data = collection.get(limit=100)
if all_data["documents"]:
idx = random.randint(0, len(all_data["documents"]) - 1)
quote = all_data["documents"][idx]
author = all_data["metadatas"][idx]["author"]
st.markdown(f'**___STRING_5___**')
st.caption(f"— {author}")
# {{end-fragment}}
elif search_button and not query:
st.warning("Please enter a search query.")
Deploying the app
To deploy the quote search application:
if __name__ == "__main__":
flyte.init_from_config()
# Deploy the quote search app
print("Deploying quote search app...")
deployment = flyte.serve(env)
print(f"App deployed at: {deployment.url}")
# {{end-fragment}}
uv run serve.pyThe app will be deployed and automatically connected to the embedding pipeline’s
output through the RunOutput parameter.
Key takeaways
-
Two-stage RAG pattern: Separate offline embedding creation from online serving for better resource utilization and cost efficiency.
-
Dir artifacts: Use
Dirto pass entire directories (like databases) between tasks and to serving applications. -
RunOutput: Connect applications to task outputs declaratively, enabling automatic data flow between pipelines and apps.
-
CPU-friendly embeddings: The
all-MiniLM-L6-v2model runs efficiently on CPU, making this pattern accessible without GPU resources.