DataFrames
By default, return values in Python are materialized - meaning the actual data is downloaded and loaded into memory. This applies to simple types like integers, as well as more complex types like DataFrames.
To avoid downloading large datasets into memory, Flyte V2 exposes
flyte.io.dataframe: a thin, uniform wrapper type for DataFrame-style objects that allows you to pass a reference to the data, rather than the fully materialized contents.
The flyte.io.DataFrame type provides serialization support for common engines like pandas, polars, pyarrow, dask, etc.; enabling you to move data between different DataFrame backends.
Setting up the environment and sample data
For our example we will start by setting up our task environment with the required dependencies and create some sample data.
from typing import Annotated
import numpy as np
import pandas as pd
import flyte
import flyte.io
env = flyte.TaskEnvironment(
"dataframe_usage",
image= flyte.Image.from_debian_base().with_pip_packages("pandas", "pyarrow", "numpy"),
resources=flyte.Resources(cpu="1", memory="2Gi"),
)
BASIC_EMPLOYEE_DATA = {
"employee_id": range(1001, 1009),
"name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona", "George", "Hannah"],
"department": ["HR", "Engineering", "Engineering", "Marketing", "Finance", "Finance", "HR", "Engineering"],
"hire_date": pd.to_datetime(
["2018-01-15", "2019-03-22", "2020-07-10", "2017-11-01", "2021-06-05", "2018-09-13", "2022-01-07", "2020-12-30"]
),
}
ADDL_EMPLOYEE_DATA = {
"employee_id": range(1001, 1009),
"salary": [55000, 75000, 72000, 50000, 68000, 70000, np.nan, 80000],
"bonus_pct": [0.05, 0.10, 0.07, 0.04, np.nan, 0.08, 0.03, 0.09],
"full_time": [True, True, True, False, True, True, False, True],
"projects": [
["Recruiting", "Onboarding"],
["Platform", "API"],
["API", "Data Pipeline"],
["SEO", "Ads"],
["Budget", "Forecasting"],
["Auditing"],
[],
["Platform", "Security", "Data Pipeline"],
],
}
Create a raw DataFrame
Now, let’s create a task that returns a native Pandas DataFrame:
@env.task
async def create_raw_dataframe() -> pd.DataFrame:
return pd.DataFrame(BASIC_EMPLOYEE_DATA)
This is the most basic use-case of how to pass DataFrames (of all kinds, not just Pandas). We simply create the DataFrame as normal, and return it.
Because the task has been declared to return a supported native DataFrame type (in this case pandas.DataFrame Flyte will automatically detect it, serialize it correctly and upload it at task completion enabling it to be passed transparently to the next task.
Flyte supports auto-serialization for the following DataFrame types:
pandas.DataFramepyarrow.Tabledask.dataframe.DataFramepolars.DataFrameflyte.io.DataFrame(see below)
Create a flyte.io.DataFrame
Alternatively you can also create a flyte.io.DataFrame object directly from a native object with the from_df method:
@env.task
async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "parquet"]:
pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA)
fdf = flyte.io.DataFrame.from_df(pd_df)
return fdf
The flyte.io.DataFrame class creates a thin wrapper around objects of any standard DataFrame type. It serves as a generic “any DataFrame type” (a concept that Python itself does not currently offer).
As with native DataFrame types, Flyte will automatically serialize and upload the data at task completion.
The advantage of the unified flyte.io.DataFrame wrapper is that you can be explicit about the storage format that makes sense for your use case, by using an Annotated type where the second argument encodes format or other lightweight hints. For example, here we specify that the DataFrame should be stored as Parquet:
Automatically convert between types
You can leverage Flyte to automatically download and convert the DataFrame between types when needed:
@env.task
async def join_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> flyte.io.DataFrame:
joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner")
return flyte.io.DataFrame.from_df(joined_df)
This task takes two DataFrames as input. We’ll pass one raw Pandas DataFrame, and one flyte.io.DataFrame.
Flyte automatically converts the flyte.io.DataFrame to a Pandas DataFrame (since we declared that as the input type) before passing it to the task.
The actual download and conversion happens only when we access the data, in this case, when we do the merge.
Downloading DataFrames
When a task receives a flyte.io.DataFrame, you can request a concrete backend representation. For example, to download as a pandas DataFrame:
@env.task
async def download_data(joined_df: flyte.io.DataFrame):
downloaded = await joined_df.open(pd.DataFrame).all()
print("Downloaded Data:\n", downloaded)
The open() call delegates to the DataFrame handler for the stored format and converts to the requested in-memory type.
Run the example
Finally, we can define a main function to run the tasks defined above and a __main__ block to execute the workflow:
@env.task
async def main():
raw_df = await create_raw_dataframe ()
flyte_df = await create_flyte_dataframe ()
joined_df = await join_data (raw_df, flyte_df)
await download_data (joined_df)
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()
Polars DataFrames
The flyteplugins-polars package extends Flyte’s DataFrame support to polars.DataFrame and polars.LazyFrame. Install it alongside the core SDK and it registers automatically — no additional configuration required.
pip install flyteplugins-polarsBoth types are serialized as Parquet when passed between tasks, just like other DataFrame backends.
Setup
import polars as pl
import flyte
env = flyte.TaskEnvironment(
name="polars-dataframes",
image=flyte.Image.from_debian_base(name="polars").with_pip_packages(
"flyteplugins-polars>=2.0.0", "polars"
),
resources=flyte.Resources(cpu="1", memory="2Gi"),
)
EMPLOYEE_DATA = {
"employee_id": [1001, 1002, 1003, 1004, 1005, 1006],
"name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona"],
"department": ["Engineering", "Engineering", "Marketing", "Finance", "Finance", "Engineering"],
"salary": [75000, 72000, 50000, 68000, 70000, 80000],
"years_experience": [5, 4, 2, 6, 5, 7],
}
Eager DataFrames
Use pl.DataFrame when you want immediate evaluation. Flyte serializes it to Parquet on output and deserializes it on input:
@env.task
async def create_dataframe() -> pl.DataFrame:
"""Create a Polars DataFrame.
Polars DataFrames are passed between tasks as serialized Parquet files
stored in the Flyte blob store — no manual upload required.
"""
return pl.DataFrame(EMPLOYEE_DATA)
@env.task
async def filter_high_earners(df: pl.DataFrame) -> pl.DataFrame:
"""Filter and enrich a Polars DataFrame."""
return (
df.filter(pl.col("salary") > 60000)
.with_columns(
(pl.col("salary") / pl.col("years_experience")).alias("salary_per_year")
)
.sort("salary", descending=True)
)
Lazy DataFrames
Use pl.LazyFrame when you want to defer computation and let Polars optimize the full query plan before executing. Flyte handles serialization the same way as pl.DataFrame:
@env.task
async def create_lazyframe() -> pl.LazyFrame:
"""Create a Polars LazyFrame.
LazyFrames defer computation until collected, allowing Polars to
optimize the full query plan. They are serialized to Parquet just
like DataFrames when passed between tasks.
"""
return pl.LazyFrame(EMPLOYEE_DATA)
@env.task
async def aggregate_by_department(lf: pl.LazyFrame) -> pl.DataFrame:
"""Aggregate salary statistics by department using a LazyFrame.
The query plan is built lazily and executed only when collect() is called.
"""
return (
lf.group_by("department")
.agg(
pl.col("salary").mean().alias("avg_salary"),
pl.col("salary").max().alias("max_salary"),
pl.len().alias("headcount"),
)
.sort("avg_salary", descending=True)
.collect()
)
The collect() call in aggregate_by_department is what triggers execution of the lazy plan. The LazyFrame passed between tasks is serialized as Parquet at that point.
Run the example
@env.task
async def main():
df = await create_dataframe()
filtered = await filter_high_earners(df=df)
print("High earners:")
print(filtered)
lf = await create_lazyframe()
summary = await aggregate_by_department(lf=lf)
print("Department summary:")
print(summary)
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()