Credit Default Prediction with XGBoost & NVIDIA RAPIDS#

In this tutorial, we will use NVIDIA RAPIDS cudf DataFrame library for preprocessing data and XGBoost, an optimized gradient boosting library, for credit default prediction. We’ll learn how to declare NVIDIA A100 for our training function and ImageSpec for specifying our python dependencies.

Run on Union BYOC

Run this example on Union BYOC.

Create an account

Once you have a Union account, install 'union[byoc]'

pip install 'union[byoc]' flytekitplugins-envd

Export the following environment variable to build and push images to your own container registry:

# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"

Then run the following commands to run the workflow:

git clone https://github.com/unionai/unionai-examples
cd unionai-examples/tutorials/credit_default
union run --remote credit_default.py credit_default_wf

The source code for this tutorial can be found here .

Declaring workflow dependencies#

First, we start by importing all the dependencies that is required by this workflow:

import os
import gc
from pathlib import Path
from typing import Tuple
import fsspec
from flytekit import task, workflow, current_context, Resources, ImageSpec, Deck
from flytekit.types.file import FlyteFile
from flytekit.extras.accelerators import A100

We download the credit default dataset and return the dataset as two FlyteFiles. With cache=True the results of this task will be cached. This means that downstream tasks that use the data will use the cached version and do not need to re-download the data again. For this example, we use a subset of the data so the task completes quickly.

@task(cache=True, cache_version="v5", requests=Resources(cpu="2", mem="4Gi"))
def download_data() -> Tuple[FlyteFile, FlyteFile]:
    working_dir = Path(current_context().working_directory)

    train_data = working_dir / "train.parquet"
    train_label_data = working_dir / "train_label.parquet"
    _download_file(
        "https://github.com/thomasjpfan/credit-data/raw/main/train/part_0.parquet",
        train_data,
    )
    _download_file(
        "https://github.com/thomasjpfan/credit-data/raw/main/train_labels.parquet",
        train_label_data,
    )

    return train_data, train_label_data

Defining Python Dependencies#

We use flytekit’s ImageSpec to specify the python packages that is required by the XGBoost training task and preprocessing with cudf.

credit_default_image = ImageSpec(
    name="credit-default",
    conda_packages=[
        "cudf=24.08",
        "scikit-learn==1.4.*",
        "pytorch-cuda=12.1",
        "pytorch==2.4.0",
    ],
    packages=["union", "xgboost==2.1.1"],
    conda_channels=["nvidia", "pytorch", "rapidsai"],
    python_version="3.11",
    registry=os.environ.get("IMAGE_SPEC_REGISTRY"),
)

Training with XGBoost with NVIDIA’s A100#

Next, we define our XGBoost training job using flytekit’s @task. We set the container_image to credit_default_image so that this task runs with the required python packages. We easily configure this task to run on A100s by setting accelerator=A100. The training task uses RAPID’s cudf DataFrame library for preprocessing and passed to XGBoost for training.

@task(
    requests=Resources(gpu="1"),
    accelerator=A100,
    container_image=credit_default_image,
    cache=True,
    cache_version="v0",
)
def train_xgboost(
    train_data: FlyteFile, train_labels: FlyteFile
) -> Tuple[FlyteFile, float]:
    import cudf

    train_data.download()
    train_labels.download()

    train_df = cudf.read_parquet(train_data.path)
    train_labels = cudf.read_parquet(train_labels.path)
    train = train_df.merge(train_labels, on="customer_ID", how="left")

    X_train, y_train, X_test, y_test = prepare_for_training(train)

    model = get_xgb_model()

    model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=100)

    ctx = current_context()
    working_dir = Path(ctx.working_directory)
    model_path = working_dir / "model.ubj"
    model.save_model(model_path)

    return model_path, model.best_score

Plot feature importances#

XGBoost’s feature importances represent the relative importance of each feature in making predictions. In this next task, we use a Flyte Deck to plot the feature importances with matplotlib:

matplotlib_image = ImageSpec(
    "plot-xgboost",
    packages=[
        "xgboost==2.1.0",
        "matplotlib==3.9.1",
        "union",
        "scikit-learn==1.4.*",
    ],
    registry=os.environ.get("IMAGE_SPEC_REGISTRY"),
)
@task(container_image=matplotlib_image, enable_deck=True)
def plot_feature_importances(model: FlyteFile):
    import matplotlib.pyplot as plt
    from xgboost import plot_importance

    model.download()

    xgb_model = get_xgb_model()
    xgb_model.load_model(model.path)

    fig, ax = plt.subplots(figsize=(8, 6))
    plot_importance(xgb_model, max_num_features=15, ax=ax)

    importances_deck = Deck("Feature Importance", _fig_to_html(fig))
    decks = current_context().decks
    decks.insert(0, importances_deck)

Full Workflow#

Finally, we define the workflow that calls download_data and passes it’s output to train_xgboost. We run the workflow by:

union run --remote credit_default.py credit_default_wf
@workflow
def credit_default_wf() -> Tuple[FlyteFile, float]:
    train_data, train_labels = download_data()
    model, best_score = train_xgboost(train_data=train_data, train_labels=train_labels)
    plot_feature_importances(model=model)
    return model, best_score

Appendix#

The following are helper functions used by our Flyte tasks. They include functions that download files, configure nvidia library, preprocessing data, and configuring the XGBoost model.

def _download_file(src, dest):
    """Download file from src to dest."""
    with fsspec.open(src, mode="rb") as r:
        with dest.open("wb") as w:
            w.write(r.read())
def preprocess(df):
    """Preprocessing dataframe by dropping duplicates based on cid."""
    return (
        df.sort_values(["cid", "S_2"])
        .drop_duplicates("cid", keep="last")
        .sort_values("cid")
        .reset_index(drop=True)
    )
def prepare_for_training(train):
    """Split data into training and validation."""
    train["cid"], _ = train.customer_ID.factorize()
    mask = train["cid"] % 4 == 0

    tr, va = train.loc[~mask], train.loc[mask]

    tr = preprocess(tr)
    va = preprocess(va)

    # prepare for training
    not_used = [
        i for i in tr.columns if i in ["cid", "target", "S_2"] or tr[i].dtype == "O"
    ]
    not_used += [
        "B_30",
        "B_38",
        "D_114",
        "D_116",
        "D_117",
        "D_120",
        "D_126",
        "D_63",
        "D_64",
        "D_66",
        "D_68",
    ]

    X_train = tr.drop(not_used, axis=1)
    y_train = tr["target"]

    X_test = va.drop(not_used, axis=1)
    y_test = va["target"]

    for i in X_train.columns:
        X_train[i] = X_train[i].astype("float32")
        X_test[i] = X_test[i].astype("float32")

    del train, tr, va
    gc.collect()
    return X_train, y_train, X_test, y_test
def amex_metric_np(target, preds) -> float:
    """Custom metric based on the evaluation metric from
    https://www.kaggle.com/competitions/amex-default-prediction/overview."""
    import numpy as np

    indices = np.argsort(preds)[::-1]
    preds, target = preds[indices], target[indices]

    weight = 20.0 - target * 19.0
    cum_norm_weight = (weight / weight.sum()).cumsum()
    four_pct_mask = cum_norm_weight <= 0.04
    d = np.sum(target[four_pct_mask]) / np.sum(target)

    weighted_target = target * weight
    lorentz = (weighted_target / weighted_target.sum()).cumsum()
    gini = ((lorentz - cum_norm_weight) * weight).sum()

    n_pos = np.sum(target)
    n_neg = target.shape[0] - n_pos
    gini_max = 10 * n_neg * (n_pos + 20 * n_neg - 19) / (n_pos + 20 * n_neg)

    g = gini / gini_max
    return 0.5 * (g + d)
def get_xgb_model():
    """Create XGBoost model with amex_metric evaluation."""
    import xgboost as xgb

    max_depth = 7
    num_trees = 1000
    min_child_weight = 50
    early_stop = xgb.callback.EarlyStopping(
        rounds=10, maximize=True, metric_name="amex_metric_np", data_name="validation_0"
    )
    model = xgb.XGBClassifier(
        tree_method="hist",
        objective="binary:logistic",
        max_depth=max_depth,
        n_estimators=num_trees,
        min_child_weight=min_child_weight,
        eval_metric=amex_metric_np,
        callbacks=[early_stop],
        device="cuda",
    )
    return model
def _fig_to_html(fig) -> str:
    """Convert matplotlib figure to html."""
    import io
    import base64

    fig_bytes = io.BytesIO()
    fig.savefig(fig_bytes, format="jpg")
    fig_bytes.seek(0)
    image_base64 = base64.b64encode(fig_bytes.read()).decode()
    return f'<img src="data:image/png;base64,{image_base64}" alt="Rendered Image" />'