from kfp import dsl
from kfp import client
@dsl.component
def addition_component(num1: int, num2: int) -> int:
return num1 + num2
@dsl.pipeline(name='addition-pipeline')
def my_pipeline(a: int, b: int, c: int = 10):
add_task_1 = addition_component(num1=a, num2=b)
add_task_2 = addition_component(num1=add_task_1.output, num2=c)
from flytekit import task, workflow
@task
def addition_component(num1: int, num2: int) -> int:
return num1 + num2
@workflow
def my_pipeline(a: int, b: int, c: int = 10):
add_task_1 = addition_component(num1=a, num2=b)
add_task_2 = addition_component(num1=add_task_1, num2=c)
kfp dsl compile --py path/to/pipeline.py --output path/to/output.yaml
kfp run create --experiment-name my-experiment --package-file path/to/output.yaml
pyflyte run --remote example.py my_pipeline --a 1 --b 2
endpoint = '<KFP_ENDPOINT>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
my_pipeline,
arguments={
'a': 1,
'b': 2 },
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
from <your-module> import my_pipeline
remote = FlyteRemote(
config=Config.auto(),
default_project="flytesnacks",
default_domain="development",
)
registered_workflow = remote.register_script(
my_pipeline,
source_path="../../", # depends on where __init__.py file is present
module_name="<your-module>",
)
execution = remote.execute(
registered_workflow,
inputs={"a": 100, "b": 19},
)
print(f"Execution successfully started: {execution.id.name}")
from typing import List
from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output
@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
import pandas as pd
csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
col_names = [
'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
]
df = pd.read_csv(csv_url, names=col_names)
with open(iris_dataset.path, 'w') as f:
df.to_csv(f)
@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
input_iris_dataset: Input[Dataset],
normalized_iris_dataset: Output[Dataset],
standard_scaler: bool,
min_max_scaler: bool,
):
if standard_scaler is min_max_scaler:
raise ValueError(
'Exactly one of standard_scaler or min_max_scaler must be True.')
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
with open(input_iris_dataset.path) as f:
df = pd.read_csv(f)
labels = df.pop('Labels')
if standard_scaler:
scaler = StandardScaler()
if min_max_scaler:
scaler = MinMaxScaler()
df = pd.DataFrame(scaler.fit_transform(df))
df['Labels'] = labels
with open(normalized_iris_dataset.path, 'w') as f:
df.to_csv(f)
@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
normalized_iris_dataset: Input[Dataset],
model: Output[Model],
n_neighbors: int,
):
import pickle
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
with open(normalized_iris_dataset.path) as f:
df = pd.read_csv(f)
y = df.pop('Labels')
X = df
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
clf = KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(X_train, y_train)
with open(model.path, 'wb') as f:
pickle.dump(clf, f)
@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
standard_scaler: bool,
min_max_scaler: bool,
neighbors: List[int],
):
create_dataset_task = create_dataset()
normalize_dataset_task = normalize_dataset(
input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
standard_scaler=standard_scaler,
min_max_scaler=min_max_scaler)
with dsl.ParallelFor(neighbors) as n_neighbors:
train_model(
normalized_iris_dataset=normalize_dataset_task
.outputs['normalized_iris_dataset'],
n_neighbors=n_neighbors)
from dataclasses import dataclass
from typing import List
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import map_task, task, workflow
from flytekit.types.structured import StructuredDataset
from sklearn.base import ClassifierMixin
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MinMaxScaler, StandardScaler
COL_NAMES = ["Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Labels"]
@dataclass_json
@dataclass
class TrainInputs:
normalized_iris_dataset: StructuredDataset
n_neighbors: int
@task
def create_dataset() -> pd.DataFrame:
csv_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
df = pd.read_csv(csv_url, names=COL_NAMES)
return df
@task
def normalize_dataset(
input_iris_dataset: pd.DataFrame, standard_scaler: bool, min_max_scaler: bool
) -> pd.DataFrame:
if standard_scaler is min_max_scaler:
raise ValueError(
"Exactly one of standard_scaler or min_max_scaler must be True."
)
labels = input_iris_dataset.pop("Labels")
if standard_scaler:
scaler = StandardScaler()
if min_max_scaler:
scaler = MinMaxScaler()
df = pd.DataFrame(
scaler.fit_transform(input_iris_dataset),
columns=set(COL_NAMES) - set(["Labels"]),
)
df["Labels"] = labels
return df
@task
def train_model(input: TrainInputs) -> ClassifierMixin:
df = input.normalized_iris_dataset.open(pd.DataFrame).all()
y = df.pop("Labels")
X = df
X_train, _, y_train, _ = train_test_split(X, y, random_state=0)
clf = KNeighborsClassifier(n_neighbors=input.n_neighbors)
clf.fit(X_train, y_train)
return clf
@task
def prepare_map_inputs(
list_neighbors: List[int], normalized_iris_dataset: StructuredDataset
) -> List[TrainInputs]:
return [
TrainInputs(normalized_iris_dataset, neighbor) for neighbor in list_neighbors
]
@workflow
def my_pipeline(standard_scaler: bool, min_max_scaler: bool, neighbors: List[int]):
create_dataset_task = create_dataset()
normalize_dataset_task = normalize_dataset(
input_iris_dataset=create_dataset_task,
standard_scaler=standard_scaler,
min_max_scaler=min_max_scaler,
)
map_task(train_model)(
input=prepare_map_inputs(
list_neighbors=neighbors, normalized_iris_dataset=normalize_dataset_task
)
)
kfp dsl compile --py path/to/pipeline.py --output path/to/output.yaml
kfp run create --experiment-name my-experiment --package-file path/to/output.yaml
pyflyte run --remote --image ghcr.io/flyteorg/flytecookbook:core-latest test.py my_pipeline --standard_scaler --neighbors '[3,6,9]'
endpoint = '<KFP_UI_URL>'
kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
my_pipeline,
arguments={
'min_max_scaler': True,
'standard_scaler': False,
'neighbors': [3, 6, 9]
},
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)
from flytekit.configuration import Config, ImageConfig
from flytekit.remote import FlyteRemote
from <your-module> import my_pipeline
remote = FlyteRemote(
config=Config.auto(),
default_project="flytesnacks",
default_domain="development",
)
registered_workflow = remote.register_script(
my_pipeline,
source_path="../../", # depends on where __init__.py file is present
module_name="<your-module>",
image_config=ImageConfig.from_images("ghcr.io/flyteorg/flytecookbook:core-latest"),
)
execution = remote.execute(
registered_workflow,
inputs={"standard_scaler": True, "min_max_scaler": False, "neighbors": [3, 6, 9]},
)
print(f"Execution successfully started: {execution.id.name}")
@dsl.component(
base_image='python:3.7',
target_image='gcr.io/my-project/my-component:v1',
packages_to_install=['tensorflow'],
)
def train_model(
dataset: Input[Dataset],
model: Output[Model],
num_epochs: int,
):
...
@task(
container_image="ghcr.io/my-project/my-component:v1"
)
def train_model(
dataset: pd.DataFrame,
model: FlyteFile,
num_epochs: int
):
...