Fraud detection with Feast
Code available here.
This tutorial builds a credit-card fraud detection pipeline that combines Feast feature store materialization with an XGBoost classifier on the Sparkov simulated transactions dataset. The workflow engineers transaction and user-level features, trains a model, registers features in Feast, and materializes online feature values for low-latency scoring.
Flyte provides:
- Cached data preparation for the Kaggle dataset download and feature engineering.
- Report-backed training with confusion matrix and ROC-style metrics in the UI.
- Durable artifacts — the trained model and Feast repo are returned as
flyte.io.Fileandflyte.io.Dir.
Define the task environment
main_img = flyte.Image.from_uv_script(__file__, name="fraud-detection-feast", pre=True)
env = flyte.TaskEnvironment(
name="fraud-detection-feast",
image=main_img,
resources=flyte.Resources(cpu=2, memory="4Gi"),
)
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "flyte>=2.4.0",
# "feast==0.63.0",
# "xgboost==3.2.0",
# "scikit-learn==1.8.0",
# "kagglehub==0.3.12",
# ...
# ]
# ///Orchestrate the pipeline
The fraud_detection_pipeline task downloads data, trains XGBoost, applies Feast feature definitions, and materializes features.
@env.task(report=True)
async def fraud_detection_pipeline(
n_estimators: int = 300,
max_depth: int = 6,
learning_rate: float = 0.1,
min_child_weight: int = 5,
gamma: float = 1.0,
) -> tuple[flyte.io.File, flyte.io.Dir]:
"""
Full fraud detection pipeline:
1. Download and prepare data
2. Materialize user profiles to Feast
3. Train model using Feast for feature retrieval
Returns model file and Feast artifacts for serving.
"""
log.info("Starting fraud detection pipeline")
steps = ["Prepare Data", "Materialize Features", "Train Model", "Done"]
html = '<h2>Fraud Detection Pipeline</h2>' + rh.pipeline_step_indicator(0, steps)
await flyte.report.replace.aio(rh.wrap(html))
await flyte.report.flush.aio()
data_dir = await prepare_data()
html = '<h2>Fraud Detection Pipeline</h2>' + rh.pipeline_step_indicator(1, steps)
await flyte.report.replace.aio(rh.wrap(html))
await flyte.report.flush.aio()
# Materialize features first so training can use Feast
feast_dir = await materialize_features(data_dir)
html = '<h2>Fraud Detection Pipeline</h2>' + rh.pipeline_step_indicator(2, steps)
await flyte.report.replace.aio(rh.wrap(html))
await flyte.report.flush.aio()
# Train model using Feast for user feature retrieval
model_file = await train_model(
data_dir,
feast_dir,
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
min_child_weight=min_child_weight,
gamma=gamma,
)
# Save copies to working directory for local app testing
model_local = await model_file.download()
feast_local = await feast_dir.download()
shutil.copy2(model_local, "model.joblib")
if os.path.exists("feast_artifacts"):
shutil.rmtree("feast_artifacts")
shutil.copytree(feast_local, "feast_artifacts")
log.info("Saved local copies: model.joblib, feast_artifacts/")
html = (
'<h2>Fraud Detection Pipeline</h2>'
+ rh.pipeline_step_indicator(4, steps)
+ '<div class="card">'
'<div style="font-weight:600;color:#155724;font-size:1.1em;margin-bottom:8px;">Pipeline Complete</div>'
'<p>Model and feature store artifacts are ready for serving.</p>'
'<table>'
'<tr><th>Next Step</th><th>Command</th></tr>'
'<tr><td>Run locally</td><td><code>python app.py</code></td></tr>'
'<tr><td>Deploy scoring app</td><td><code>flyte deploy app.py serving_env</code></td></tr>'
'<tr><td>Deploy dashboard</td><td><code>flyte deploy dashboard.py dashboard_env</code></td></tr>'
'</table></div>'
)
await flyte.report.replace.aio(rh.wrap(html))
await flyte.report.flush.aio()
log.info("Pipeline complete")
return model_file, feast_dir
Run the workflow
From the example directory:
cd v2/tutorials/fraud_detection_feast
uv run --script fraud_detection_feast.pyThe first run downloads the dataset via kagglehub (public dataset, no API key required). Open the run report to review the confusion matrix and feature-importance summary when training completes.