importosimportjoblibimportpandasaspdimportflytefromflyteimportTaskEnvironment,Resources,Imagefromflyte.ioimportFilefromsklearn.ensembleimportRandomForestClassifier# 1. Define the Image using the fluent builder APIimage=(Image.from_debian_base(name="ml-image",python_version=(3,11),).with_pip_packages("pandas","scikit-learn","joblib","pyarrow"))# 2. Define the TaskEnvironmentenv=TaskEnvironment(name="ml_env",image=image,resources=Resources(cpu="2",memory="4Gi"),cache="auto",)@env.taskasyncdefload_data()->pd.DataFrame:CSV_URL="https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"returnpd.read_csv(CSV_URL)@env.taskasyncdeftrain_model(data:pd.DataFrame)->File:model=RandomForestClassifier()X=data.drop("species",axis=1)y=data["species"]model.fit(X,y)root_dir=os.getcwd()model_path=os.path.join(root_dir,"model.joblib")joblib.dump(model,model_path)returnawaitFile.from_local(model_path)@env.taskasyncdefevaluate(model_file:File,data:pd.DataFrame)->float:local_path=awaitmodel_file.download()model=joblib.load(local_path)X=data.drop("species",axis=1)y=data["species"]returnfloat(model.score(X,y))# 3. The workflow is now just an orchestrating task@env.taskasyncdefml_pipeline()->float:data=awaitload_data()model=awaittrain_model(data)score=awaitevaluate(model,data)returnscore
# Flyte 1: Resources per task@task(requests=Resources(cpu="1"),limits=Resources(cpu="2"))defmy_task():...# Flyte 2: Resources at TaskEnvironment levelenv=flyte.TaskEnvironment(name="my_env",resources=flyte.Resources(cpu="1"),# No separate limits)
# Flyte 1: Workflow is the entrypoint@workflowdefmy_workflow():...# Flyte 2: Use a main() task or any task name@env.taskdefmain():...# Common convention# Run with: flyte run my_module.py main
# Flyte 1: Strict about type annotations@taskdefmy_task(x:int)->dict:# Would fail, need dict[str, int]return{"a":x}# Flyte 2: More lenient@env.taskdefmy_task(x:int)->dict:# Works, v2 will pickle untyped I/Oreturn{"a":x}
# CLI COMMANDSflyte run my_module.py main --items '[1,2,3,4,5]'flyte run --local my_module.py main --items '[1,2,3,4,5]'flyte deploy my_module.py my_env