Track XGBoost Training with Neptune
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>
The source code for this example can be found here.
First, we need to import the necessary libraries.
from typing import List, Tuple
import numpy as np
import flytekit
from flytekitplugins.neptune import neptune_scale_run
We then specify the Neptune project that was created on Neptune’s platform.
Please update NEPTUNE_PROJECT
to the value associated with your account.
NEPTUNE_PROJECT = (
"username/project" # TODO: Update this to your Neptune "workspace/project"
)
Neptune requires an API key for authentication. You can securely provide this key to Flyte by creating a secret using the secrets manager.
NEPTUNE_API_KEY = flytekit.Secret(group="neptune-api-group", key="neptune-api-token")
We use ImageSpec
to build a container image with the dependencies required for the XGBoost training task.
Make sure to set REGISTRY
to a container registry accessible by your cluster.
REGISTRY = "localhost:30000"
image = flytekit.ImageSpec(
name="flytekit-xgboost",
packages=[
"neptune",
"neptune-xgboost",
"flytekitplugins-neptune",
"scikit-learn==1.5.1",
"numpy==1.26.1",
"matplotlib==3.9.2",
],
builder="default",
registry=REGISTRY,
)
To train and track our XGBoost model, we’ll first need a dataset.
In this example, we use the California housing dataset, which provides a clean and structured foundation for
regression tasks. We can easily download it using the fetch_california_housing
function from sklearn.datasets
.
@flytekit.task(
container_image=image,
cache=True,
cache_version="v1",
requests=flytekit.Resources(cpu="2", mem="2Gi"),
)
def get_dataset() -> Tuple[np.ndarray, np.ndarray]:
from sklearn.datasets import fetch_california_housing
X, y = fetch_california_housing(return_X_y=True, as_frame=False)
return X, y
Next, we integrate Neptune into our training workflow using the neptune_scale_run
decorator.
This decorator initializes a
Neptune run and handles authentication using the NEPTUNE_API_KEY
secret,
which must be both defined in the decorator and explicitly requested in the task definition.
Once initialized, the Neptune Run object becomes available via current_context().neptune_run
.
In this example, we log metadata to the Neptune run, such as model parameters and evaluation metrics.
@flytekit.task(
container_image=image,
secret_requests=[NEPTUNE_API_KEY],
requests=flytekit.Resources(cpu="2", mem="4Gi"),
)
@neptune_scale_run(project=NEPTUNE_PROJECT, secret=NEPTUNE_API_KEY)
def train_model(max_depth: int, X: np.ndarray, y: np.ndarray):
import xgboost as xgb
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=123
)
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_test, label=y_test)
model_params = {
"tree_method": "hist",
"eta": 0.7,
"gamma": 0.001,
"max_depth": max_depth,
"objective": "reg:squarederror",
"eval_metric": ["mae", "rmse"],
}
evals = [(dtrain, "train"), (dval, "valid")]
ctx = flytekit.current_context()
run = ctx.neptune_run
run.log_configs(
{
"parameters/eta": 0.7,
"parameters/gamma": 0.001,
"parameters/tree_method": "hist",
"parameters/eval_metric": ["mae", "rmse"],
"parameters/max_depth": max_depth,
"parameters/objective": "reg:squarederror",
}
)
# Train the model and log metadata to the run in Neptune
xgb.train(
params=model_params,
dtrain=dtrain,
num_boost_round=57,
evals=evals,
callbacks=[
xgb.callback.LearningRateScheduler(lambda epoch: 0.99**epoch),
xgb.callback.EarlyStopping(rounds=30),
],
)
You can also specify run_id
and experiment_name
in the neptune_scale_run
decorator.
Using Flyte’s dynamic workflows, we scale out multiple training jobs with different max_depth
values.
@flytekit.dynamic(container_image=image)
def train_multiple_models(max_depths: List[int], X: np.ndarray, y: np.ndarray):
for max_depth in max_depths:
train_model(max_depth=max_depth, X=X, y=y)
@flytekit.workflow
def train_wf(max_depths: List[int] = [2, 4, 10]):
X, y = get_dataset()
train_multiple_models(max_depths=max_depths, X=X, y=y)
To run this workflow on a remote Flyte cluster, run the following command:
$ pyflyte run --remote neptune_example.py train_wf