Kubeflow Pipelines v2
Flyte
Multi-Tenancy
Type Checking
Caching
Versioning and Reproducibility
Sub DAG
Data Lineage
Scalability
Map Tasks
Dynamic DAGs
~
Retries
Reruns
Scheduling
Branching
Task Timeout
Spark Support
Extensible
Data Visualization
Model Serving
~ (Check out UnionML!)
Notifications
Recovery
Ease of Development
Ease of Local Deployment
Human-in-the-Loop
~ (UI in progress)
Intratask Checkpointing
Kubeflow Pipelines v2
Flyte
Code
Copied to clipboard!
from kfp import dsl
from kfp import client


@dsl.component
def addition_component(num1: int, num2: int) -> int:
    return num1 + num2


@dsl.pipeline(name='addition-pipeline')
def my_pipeline(a: int, b: int, c: int = 10):
    add_task_1 = addition_component(num1=a, num2=b)
    add_task_2 = addition_component(num1=add_task_1.output, num2=c)
Copied to clipboard!
from flytekit import task, workflow



@task
def addition_component(num1: int, num2: int) -> int:
   return num1 + num2



@workflow
def my_pipeline(a: int, b: int, c: int = 10):
   add_task_1 = addition_component(num1=a, num2=b)
   add_task_2 = addition_component(num1=add_task_1, num2=c)
Trigger (CLI)
Copied to clipboard!
kfp dsl compile --py path/to/pipeline.py --output path/to/output.yaml

kfp run create --experiment-name my-experiment --package-file path/to/output.yaml 
Copied to clipboard!

pyflyte run --remote example.py my_pipeline --a 1 --b 2
Trigger (Python)
Copied to clipboard!
endpoint = '<KFP_ENDPOINT>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'a': 1,
        'b': 2    },

)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)
Copied to clipboard!
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

from <your-module> import my_pipeline

remote = FlyteRemote(
   config=Config.auto(),
   default_project="flytesnacks",
   default_domain="development",
)

registered_workflow = remote.register_script(
   my_pipeline,
   source_path="../../", # depends on where __init__.py file is present
   module_name="<your-module>",
)

execution = remote.execute(
   registered_workflow,
   inputs={"a": 100, "b": 19},
)
print(f"Execution successfully started: {execution.id.name}")
Kubeflow Pipelines v2
Flyte
Code
Copied to clipboard!
from typing import List

from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output


@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
    import pandas as pd

    csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    col_names = [
        'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    ]
    df = pd.read_csv(csv_url, names=col_names)

    with open(iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
    input_iris_dataset: Input[Dataset],
    normalized_iris_dataset: Output[Dataset],
    standard_scaler: bool,
    min_max_scaler: bool,
):
    if standard_scaler is min_max_scaler:
        raise ValueError(
            'Exactly one of standard_scaler or min_max_scaler must be True.')

    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler

    with open(input_iris_dataset.path) as f:
        df = pd.read_csv(f)
    labels = df.pop('Labels')

    if standard_scaler:
        scaler = StandardScaler()
    if min_max_scaler:
        scaler = MinMaxScaler()

    df = pd.DataFrame(scaler.fit_transform(df))
    df['Labels'] = labels
    with open(normalized_iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
    normalized_iris_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier

    with open(normalized_iris_dataset.path) as f:
        df = pd.read_csv(f)

    y = df.pop('Labels')
    X = df

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(clf, f)


@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
    standard_scaler: bool,
    min_max_scaler: bool,
    neighbors: List[int],
):
    create_dataset_task = create_dataset()

    normalize_dataset_task = normalize_dataset(
        input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
        standard_scaler=standard_scaler,
        min_max_scaler=min_max_scaler)

    with dsl.ParallelFor(neighbors) as n_neighbors:
        train_model(
            normalized_iris_dataset=normalize_dataset_task
            .outputs['normalized_iris_dataset'],
            n_neighbors=n_neighbors)
Copied to clipboard!
from dataclasses import dataclass
from typing import List

import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import map_task, task, workflow
from flytekit.types.structured import StructuredDataset
from sklearn.base import ClassifierMixin
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MinMaxScaler, StandardScaler

COL_NAMES = ["Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Labels"]


@dataclass_json
@dataclass
class TrainInputs:
   normalized_iris_dataset: StructuredDataset
   n_neighbors: int


@task
def create_dataset() -> pd.DataFrame:
   csv_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
   df = pd.read_csv(csv_url, names=COL_NAMES)
   return df


@task
def normalize_dataset(
   input_iris_dataset: pd.DataFrame, standard_scaler: bool, min_max_scaler: bool
) -> pd.DataFrame:
   if standard_scaler is min_max_scaler:
       raise ValueError(
           "Exactly one of standard_scaler or min_max_scaler must be True."
       )

   labels = input_iris_dataset.pop("Labels")

   if standard_scaler:
       scaler = StandardScaler()
   if min_max_scaler:
       scaler = MinMaxScaler()

   df = pd.DataFrame(
       scaler.fit_transform(input_iris_dataset),
       columns=set(COL_NAMES) - set(["Labels"]),
   )
   df["Labels"] = labels
   return df


@task
def train_model(input: TrainInputs) -> ClassifierMixin:
   df = input.normalized_iris_dataset.open(pd.DataFrame).all()
   y = df.pop("Labels")
   X = df

   X_train, _, y_train, _ = train_test_split(X, y, random_state=0)

   clf = KNeighborsClassifier(n_neighbors=input.n_neighbors)
   clf.fit(X_train, y_train)

   return clf


@task
def prepare_map_inputs(
   list_neighbors: List[int], normalized_iris_dataset: StructuredDataset
) -> List[TrainInputs]:
   return [
       TrainInputs(normalized_iris_dataset, neighbor) for neighbor in list_neighbors
   ]


@workflow
def my_pipeline(standard_scaler: bool, min_max_scaler: bool, neighbors: List[int]):
   create_dataset_task = create_dataset()
   normalize_dataset_task = normalize_dataset(
       input_iris_dataset=create_dataset_task,
       standard_scaler=standard_scaler,
       min_max_scaler=min_max_scaler,
   )
   map_task(train_model)(
       input=prepare_map_inputs(
           list_neighbors=neighbors, normalized_iris_dataset=normalize_dataset_task
       )
   )
Trigger (CLI)
Copied to clipboard!
kfp dsl compile --py path/to/pipeline.py --output path/to/output.yaml

kfp run create --experiment-name my-experiment --package-file path/to/output.yaml
Copied to clipboard!

pyflyte run --remote --image ghcr.io/flyteorg/flytecookbook:core-latest test.py my_pipeline --standard_scaler --neighbors '[3,6,9]'
Trigger (Python)
Copied to clipboard!
endpoint = '<KFP_UI_URL>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'min_max_scaler': True,
        'standard_scaler': False,
        'neighbors': [3, 6, 9]
    },
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)
Copied to clipboard!
from flytekit.configuration import Config, ImageConfig
from flytekit.remote import FlyteRemote

from <your-module> import my_pipeline

remote = FlyteRemote(
   config=Config.auto(),
   default_project="flytesnacks",
   default_domain="development",
)

registered_workflow = remote.register_script(
   my_pipeline,
   source_path="../../", # depends on where __init__.py file is present
   module_name="<your-module>",
   image_config=ImageConfig.from_images("ghcr.io/flyteorg/flytecookbook:core-latest"),
)

execution = remote.execute(
   registered_workflow,
   inputs={"standard_scaler": True, "min_max_scaler": False, "neighbors": [3, 6, 9]},
)
print(f"Execution successfully started: {execution.id.name}")
Kubeflow Pipelines v2
Flyte
Copied to clipboard!
@dsl.component(
   base_image='python:3.7',
   target_image='gcr.io/my-project/my-component:v1',
   packages_to_install=['tensorflow'],
)
def train_model(
   dataset: Input[Dataset],
   model: Output[Model],
   num_epochs: int,
):
   ...
Copied to clipboard!
@task(
   container_image="ghcr.io/my-project/my-component:v1"
)
def train_model(
   dataset: pd.DataFrame,
   model: FlyteFile,
   num_epochs: int
):
   ...

Enter your email to download the full Stash case study.

By entering your email, you agree with our Privacy Policy.

Thank you!

Click on the button to find out how Stash cut pipeline compute costs by 67% with Flyteâ„¢.
Download the Stash case study
Oops! Something went wrong while submitting the form.