HDBSCAN Soft Clustering With Headline Embeddings on GPUs#
HDBSCAN is a state-of-the-art, density-based clustering algorithm that is used to
uncover hidden patterns and structures in data. Some common applications of HDBSCAN
include custom segmentation, anomaly detection, document clustering, and bio-informatics.
In this tutorial, we will use the NVIDIA RAPIDS cuML
’s version of HDBSCAN and UMAP
to find soft clusters in a headlines dataset. We’ll configure Flyte tasks to use
NVIDIA’s A100
accelerators to embed the dataset and RAPIDS cuML
for clustering.
Run on Union BYOC
Once you have a Union account, install union
:
pip install union flytekitplugins-envd
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
git clone https://github.com/unionai/unionai-examples
cd unionai-examples/tutorials/soft_clustering_hdbscan
union run --remote soft_clustering_hdbscan.py hdscan_wf
The source code for this tutorial can be found here .
Downloading data#
We start by importing the dependencies for this workflow. Then, we download
the headline dataset and cache it using cache=True
and a cache_version
:
import os
import tarfile
from pathlib import Path
from typing import Tuple
from flytekit import task, workflow, Resources, ImageSpec, current_context, Deck
from flytekit.deck import DeckField
from flytekit.types.file import FlyteFile
from flytekit.extras.accelerators import A100
import fsspec
@task(requests=Resources(cpu="2", mem="2Gi"), cache=True, cache_version="v1")
def download_headline_data() -> FlyteFile:
headline_data = (
"https://github.com/thomasjpfan/headlines-data/raw/main/headlines.parquet"
)
new_file = FlyteFile.new_remote_file("headlines.parquet")
with fsspec.open(headline_data, "rb") as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
Defining Python Dependencies#
The tasks in this workflow require python dependencies such as RAPIDS’ cuML
for
clustering and sentence_transformers
for embedding our headline dataset. Here we use
flytekit
’s ImageSpec
to build an image with our require dependencies.
image = ImageSpec(
name="sentence-transformer",
python_version="3.11",
packages=["union", "sentence-transformers==3.0.1"],
conda_packages=[
"cuml=24.08",
"scikit-learn==1.4.*",
"pytorch-cuda=12.1",
"pytorch==2.4.0",
],
conda_channels=["nvidia", "pytorch", "rapidsai"],
registry=os.environ.get("IMAGE_SPEC_REGISTRY"),
)
Embedding the headline data#
For embedding the headlines, we will use the
all-MiniLM-L6-v2
model to convert the headlines into a 384 dimensional vector. For faster iterations,
we define a download_sentence_transformer
task to download the model and cache
it with Union.
@task(
requests=Resources(cpu="2", mem="6Gi"),
container_image=image,
cache=True,
cache_version="v2",
)
def download_sentence_transformer() -> FlyteFile:
from sentence_transformers import SentenceTransformer
ctx = current_context()
working_dir = Path(ctx.working_directory)
model_cache = working_dir / "sentence_model"
model = SentenceTransformer("all-MiniLM-L6-v2")
model.save(os.fspath(model_cache))
model_cache_compressed = working_dir / "sentence_model.tar.gz"
_compress(model_cache, model_cache_compressed)
return model_cache_compressed
Finally, we use the sentence transformer to embed the headline data into an embedding
matrix with 384 columns and the number of rows is equal to the number of headlines.
With accelerator=A100
and gpu="1"
, the SentenceTransformer
uses the GPU to
compute the embedding.
@task(
requests=Resources(gpu="1", cpu="2", mem="2Gi"),
accelerator=A100,
container_image=image,
cache=True,
cache_version="v1",
)
def embed_headlines(
headline_data: FlyteFile, sentence_transformer: FlyteFile
) -> FlyteFile:
from sentence_transformers import SentenceTransformer
import numpy as np
import pandas as pd
ctx = current_context()
working_dir = Path(ctx.working_directory)
sentence_model = working_dir / "sentence_transformer"
sentence_model.mkdir(exist_ok=True)
# Load headline data
df = pd.read_parquet(headline_data.remote_source)
# Load sentence transformer
_decompress(Path(sentence_transformer), sentence_model)
model = SentenceTransformer(os.fspath(sentence_model), local_files_only=True)
embeddings = model.encode(df["headline_text"])
# Serialize model
embedding_path = working_dir / "embedding.npy"
np.save(embedding_path, embeddings)
return embedding_path
Soft Clustering#
Next, we use UMAP from RAPIDS cuML
to reduce the dimensionality of the headline
embeddings and pipe the results into HDBSCAN to soft cluster the data. Given that
cuML
’s UMAP and HDBSCAN are GPU accelerated, we set accelerator=A100
to
run the task with a GPU.
@task(
requests=Resources(gpu="1", cpu="2", mem="2Gi"),
accelerator=A100,
container_image=image,
cache=True,
cache_version="v1",
)
def soft_clustering(embeddings: FlyteFile) -> Tuple[FlyteFile, FlyteFile]:
import numpy as np
import cuml
embeddings.download()
embeddings_np = np.load(embeddings.path)
umap = cuml.manifold.UMAP(
n_components=5, n_neighbors=15, min_dist=0.0, random_state=12
)
reduced_data = umap.fit_transform(embeddings_np)
clusterer = cuml.cluster.hdbscan.HDBSCAN(
min_cluster_size=50, metric="euclidean", prediction_data=True
)
clusterer.fit(reduced_data)
soft_clusters = cuml.cluster.hdbscan.all_points_membership_vectors(clusterer)
# Save clusters
ctx = current_context()
working_dir = Path(ctx.working_directory)
cluster_labels_path = working_dir / "cluster_labels.npy"
soft_cluster_path = working_dir / "soft_clusters.npy"
np.save(cluster_labels_path, clusterer.labels_)
np.save(soft_cluster_path, soft_clusters)
return cluster_labels_path, soft_cluster_path
Plotting Cluster Membership Uncertainty#
For plotting the soft clustering results, we define another ImageSpec
that contains
the requirements for plotting.
plot_image = ImageSpec(
name="plot_cluster",
packages=["numpy==1.26.4", "union", "seaborn==0.13.2", "matplotlib==3.9.1"],
registry=os.environ.get("IMAGE_SPEC_REGISTRY"),
)
Soft clustering assigns probabilities to each point that represents the likelihood a
headline belongs to a cluster. To measure the confidence of each point, we take the
difference between the probabilities between the point’s top two clusters.
In this Flyte task, we set enable_deck=True
and build a histogram and
empirical cumulative distribution to visualize the probability differences.
@task(
requests=Resources(cpu="2", mem="2Gi"),
container_image=plot_image,
enable_deck=True,
deck_fields=[DeckField.SOURCE_CODE, DeckField.DEPENDENCIES],
)
def plot_cluster_membership_uncertainty(
cluster_labels: FlyteFile, soft_clusters: FlyteFile
):
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
cluster_labels.download()
soft_clusters.download()
cluster_labels_np = np.load(cluster_labels.path)
soft_clusters_np = np.load(soft_clusters.path)
soft_non_noise = soft_clusters_np[cluster_labels_np != -1]
probs_top2_non_noise = np.take_along_axis(
soft_non_noise, soft_non_noise.argsort(), axis=1
)[:, -2:]
diffs = np.diff(probs_top2_non_noise).ravel()
fig, axes = plt.subplots(1, 2, figsize=(10, 6), sharey=False)
fig.suptitle("Cluster Membership Uncertainty Evaluation")
sns.histplot(ax=axes[0], data=diffs)
axes[0].set_title("Difference between top two membership probabilities")
sns.ecdfplot(ax=axes[1], data=diffs)
axes[1].set_title("Cumulative distribution of differences")
ctx = current_context()
cluster_deck = Deck("Cluster Membership", _fig_to_html(fig))
ctx.decks.insert(0, cluster_deck)
Workflow#
Finally, we define the workflow that calls each Flyte task and route the data between each task. We run the workflow with:
union run --remote soft_clustering_hdbscan.py hdscan_wf
@workflow
def hdscan_wf():
headline_data = download_headline_data()
sentence_transformer = download_sentence_transformer()
embeddings = embed_headlines(
headline_data=headline_data, sentence_transformer=sentence_transformer
)
cluster_labels, soft_cluster_path = soft_clustering(embeddings=embeddings)
plot_cluster_membership_uncertainty(
cluster_labels=cluster_labels, soft_clusters=soft_cluster_path
)
Appendix#
The following are helper functions used by our Flyte tasks. We include functions that decompress & compress tar files, and convert a matplotlib figure into HTML.
def _compress(src: Path, dest: Path):
"""Compress src into a tarfile."""
import tarfile
with tarfile.open(dest, "w:gz") as tar:
for file in src.rglob("*"):
tar.add(file, arcname=file.relative_to(src))
def _decompress(src: Path, dest: Path):
"""Decompress a tarfile into dest."""
with tarfile.open(src, "r:gz") as tar:
tar.extractall(path=dest)
def _fig_to_html(fig) -> str:
"""Convert matplotlib figure to HTML."""
import io
import base64
fig_bytes = io.BytesIO()
fig.savefig(fig_bytes, format="jpg")
fig_bytes.seek(0)
image_base64 = base64.b64encode(fig_bytes.read()).decode()
return f'<img src="data:image/png;base64,{image_base64}" alt="Rendered Image" />'