How to Trigger the Feast Workflow using FlyteRemote
The goal of this notebook is to train a simple Gaussian Naive Bayes model using sklearn on a modified Horse-Colic dataset from UCI.
The model aims to classify if the lesion of the horse is surgical or not.
Let’s get started!
Set the AWS environment variables before importing Flytekit.
import os
os.environ["FLYTE_AWS_ENDPOINT"] = os.environ["FEAST_S3_ENDPOINT_URL"] = "http://localhost:30084/"
os.environ["FLYTE_AWS_ACCESS_KEY_ID"] = os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["FLYTE_AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"
01. Register the code
The actual workflow code is auto-documented and rendered using sphinx here. We’ve used Flytekit to express the pipeline in pure Python.
You can use FlyteConsole to launch, monitor, and introspect Flyte executions. However here, let’s use flytekit.remote to interact with the Flyte backend.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
# The `for_sandbox` method instantiates a connection to the demo cluster.
remote = FlyteRemote(
config=Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development"
)
/Users/samhitaalla/.pyenv/versions/3.9.9/envs/flytesnacks/lib/python3.9/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
The register_script
method can be used to register the workflow.
from flytekit.configuration import ImageConfig
from feast_workflow import feast_workflow
wf = remote.register_script(
feast_workflow,
image_config=ImageConfig.from_images(
"ghcr.io/flyteorg/flytecookbook:feast_integration-latest"
),
version="v2",
source_path="../",
module_name="feast_workflow",
)
02: Launch an execution
FlyteRemote provides convenient methods to retrieve version of the pipeline from the remote server.
NOTE: It is possible to get a specific version of the workflow and trigger a launch for that, but let’s just get the latest.
lp = remote.fetch_launch_plan(name="feast_integration.feast_workflow.feast_workflow")
lp.id.version
'v1'
The execute
method can be used to execute a Flyte entity — a launch plan in our case.
execution = remote.execute(
lp,
inputs={"num_features_univariate": 5},
wait=True
)
03. Sync an execution
You can sync an execution to retrieve the workflow’s outputs. sync_nodes
is set to True to retrieve the intermediary nodes’ outputs as well.
NOTE: It is possible to fetch an existing execution or simply retrieve an already commenced execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!
from flytekit.models.core.execution import WorkflowExecutionPhase
synced_execution = remote.sync(execution, sync_nodes=True)
print(f"Execution {synced_execution.id.name} is in {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)} phase")
Execution f218aba055ba34a3fb75 is in SUCCEEDED phase
04. Retrieve the output
Fetch the model and the model prediction.
model = synced_execution.outputs["o0"]
prediction = synced_execution.outputs["o1"]
prediction
/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteaqx6tlyu/control_plane_metadata/local_flytekit/e1a690494fe33da04a4dca7737096234/0c81c76dc3a029267a96f275431b5bc5.npy
NOTE: The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded.
model
/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyteaqx6tlyu/control_plane_metadata/local_flytekit/91246ef2160dde99a7512ab3aa9aa2ce/model.joblib.dat
Fetch the repo_config
.
repo_config = synced_execution.node_executions["n0"].outputs["o0"]
05. Generate predictions
Re-use the predict
function from the workflow to generate predictions — Flytekit will automatically manage the IO for you!
Load features from the online feature store
import os
from feast_workflow import predict, FEAST_FEATURES, retrieve_online
inference_point = retrieve_online(
repo_config=repo_config,
online_store=synced_execution.node_executions["n4"].outputs["o0"],
data_point=533738,
)
inference_point
{'total protein': [70.0],
'peripheral pulse': [3.0],
'nasogastric reflux PH': [4.718545454545455],
'surgical lesion': ['1'],
'rectal temperature': [38.17717842323652],
'nasogastric tube': ['1.751269035532995'],
'Hospital Number': ['533738'],
'packed cell volume': [43.0],
'outcome': ['1'],
'abdominal distension': [4.0]}
Generate a prediction
predict(model_ser=model, features=inference_point)
array(['2'], dtype='<U1')