EDA and Feature Engineering in Jupyter Notebook and Modeling in a Flyte Task
Once you have a Union account, install union:
pip install unionExport the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>The source code for this example can be found here.
First, let’s import the libraries we will use in this example.
import pathlib
from dataclasses import dataclass
import numpy as np
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.papermill import NotebookTask
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import RobustScalerWe define a dataclass to store the hyperparameters of the Gradient Boosting Regressor.
@dataclass_json
@dataclass
class Hyperparameters(object):
    n_estimators: int = 150
    max_depth: int = 3
    max_features: str = "sqrt"
    min_samples_split: int = 4
    random_state: int = 2
    nfolds: int = 10We define a NotebookTask to run the 
Jupyter notebook.
This notebook returns dummified_data and dataset as the outputs.
dummified_data is used in this example, and dataset is used in the upcoming example.
nb = NotebookTask(
    name="eda-feature-eng-nb",
    notebook_path=str(pathlib.Path(__file__).parent.absolute() / "supermarket_regression_1.ipynb"),
    outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
    requests=Resources(mem="500Mi"),
)Next, we define a cross_validate function and a modeling task to compute the MAE score of the data against
the Gradient Boosting Regressor.
def cross_validate(model, nfolds, feats, targets):
    score = -1 * (cross_val_score(model, feats, targets, cv=nfolds, scoring="neg_mean_absolute_error"))
    return np.mean(score)
@task
def modeling(
    dataset: pd.DataFrame,
    hyperparams: Hyperparameters,
) -> float:
    y_target = dataset["Product_Supermarket_Sales"].tolist()
    dataset.drop(["Product_Supermarket_Sales"], axis=1, inplace=True)
    X_train, X_test, y_train, _ = train_test_split(dataset, y_target, test_size=0.3)
    scaler = RobustScaler()
    scaler.fit(X_train)
    X_train = scaler.transform(X_train)
    X_test = scaler.transform(X_test)
    gb_model = GradientBoostingRegressor(
        n_estimators=hyperparams.n_estimators,
        max_depth=hyperparams.max_depth,
        max_features=hyperparams.max_features,
        min_samples_split=hyperparams.min_samples_split,
        random_state=hyperparams.random_state,
    )
    return cross_validate(gb_model, hyperparams.nfolds, X_train, y_train)We define a workflow to run the notebook and the modeling task.
@workflow
def notebook_wf(hyperparams: Hyperparameters = Hyperparameters()) -> float:
    output = nb()
    mae_score = modeling(dataset=output.dummified_data, hyperparams=hyperparams)
    return mae_scoreWe can now run the notebook and the modeling task locally.
if __name__ == "__main__":
    print(notebook_wf())