Caching
Flyte 2 provides intelligent task output caching that automatically avoids redundant computation by reusing previously computed task results.
Caching works at the task level and caches complete task outputs. For function-level checkpointing and resumption within tasks, see Traces.
Overview
By default, caching is disabled.
If caching is enabled for a task, then Flyte determines a cache key for the task. The key is composed of the following:
- Final inputs: The set of inputs after removing any specified in the
ignored_inputs. - Task name: The fully-qualified name of the task.
- Interface hash: A hash of the task’s input and output types.
- Cache version: The cache version string.
If the cache behavior is set to "auto", the cache version is automatically generated using a hash of the task’s source code (or according to the custom policy if one is specified).
If the cache behavior is set to "override", the cache version can be specified explicitly using the version_override parameter.
When the task runs, Flyte checks if a cache entry exists for the key. If found, the cached result is returned immediately instead of re-executing the task.
Basic caching usage
Flyte 2 supports three main cache behaviors:
"auto" - automatic versioning
@env.task(cache=flyte.Cache(behavior="auto"))
async def auto_versioned_task(data: str) -> str:
return await transform_data(data)
With behavior="auto", the cache version is automatically generated based on the function’s source code.
If you change the function implementation, the cache is automatically invalidated.
- When to use: Development and most production scenarios.
- Cache invalidation: Automatic when function code changes.
- Benefits: Zero-maintenance caching that “just works”.
You can also use the direct string shorthand:
@env.task(cache="auto")
async def auto_versioned_task_2(data: str) -> str:
return await transform_data(data)
"override"
With behavior="override", you can specify a custom cache key in the version_override parameter.
Since the cache key is fixed as part of the code, it can be manually changed when you need to invalidate the cache.
@env.task(cache=flyte.Cache(behavior="override", version_override="v1.2"))
async def manually_versioned_task(data: str) -> str:
return await transform_data(data)
- When to use: When you need explicit control over cache invalidation.
- Cache invalidation: Manual, by changing
version_override. - Benefits: Stable caching across code changes that don’t affect logic.
"disable" - No caching
To explicitly disable caching, use the "disable" behavior.
This is the default behavior.
@env.task(cache=flyte.Cache(behavior="disable"))
async def always_fresh_task(data: str) -> str:
return get_current_timestamp() + await transform_data(data)
- When to use: Non-deterministic functions, side effects, or always-fresh data.
- Cache invalidation: N/A - never cached.
- Benefits: Ensures execution every time.
You can also use the direct string shorthand:
@env.task(cache="disable")
async def always_fresh_task_2(data: str) -> str:
return get_current_timestamp() + await transform_data(data)
Advanced caching configuration
Ignoring specific inputs
Sometimes you want to cache based on some inputs but not others:
@env.task(cache=flyte.Cache(behavior="auto", ignored_inputs=("debug_flag",)))
async def selective_caching(data: str, debug_flag: bool) -> str:
if debug_flag:
print(f"Debug: transforming {data}")
return await transform_data(data)
This is useful for:
- Debug flags that don’t affect computation
- Logging levels or output formats
- Metadata that doesn’t impact results
Cache serialization
Cache serialization ensures that only one instance of a task runs at a time for identical inputs:
@env.task(cache=flyte.Cache(behavior="auto", serialize=True))
async def expensive_model_training(data: str) -> str:
return await transform_data(data)
When to use serialization:
- Very expensive computations (model training, large data processing)
- Shared resources that shouldn’t be accessed concurrently
- Operations where multiple parallel executions provide no benefit
How it works:
- First execution acquires a reservation and runs normally.
- Concurrent executions with identical inputs wait for the first to complete.
- Once complete, all waiting executions receive the cached result.
- If the running execution fails, another waiting execution takes over.
Salt for cache key variation
Use salt to vary cache keys without changing function logic:
@env.task(cache=flyte.Cache(behavior="auto", salt="experiment_2024_q4"))
async def experimental_analysis(data: str) -> str:
return await transform_data(data)
salt is useful for:
- A/B testing with identical code.
- Temporary cache namespaces for experiments.
- Environment-specific cache isolation.
Content-based caching for DataFrames, files, and directories
When a task input is a DataFrame (pandas, polars, or flyte.io.DataFrame), a flyte.io.File, or a flyte.io.Dir, the value is passed by reference - the cache key is derived from the data’s storage location, not its contents. As a result, a downstream task keyed on such an input does not get a cache hit when the underlying data is identical but lives at a new path (the common case, since each run writes to a fresh location).
To cache on content instead, attach a hash of the data at the point where it is produced. Flyte then uses that content hash when computing the cache key of any downstream consuming task, so identical content produces a cache hit regardless of where it is stored.
Caching applies only on a remote cluster - local execution does not produce cache hits across runs.
DataFrames
For a raw pandas or polars DataFrame, supply a content hash with flyte.io.HashFunction.from_fn in a typing.Annotated return type. Define the hash function once and reuse the annotated alias:
def hash_pandas_dataframe(df: pd.DataFrame) -> str:
# Content-based hash using pandas' built-in row hashing.
return str(pd.util.hash_pandas_object(df).sum())
# Reusable type alias: a pandas DataFrame whose cache key is its content hash.
HashedPandasDataFrame = Annotated[pd.DataFrame, HashFunction.from_fn(hash_pandas_dataframe)]
@env.task
async def produce_pandas() -> HashedPandasDataFrame:
# The HashFunction in the return annotation tells Flyte to compute a
# content hash for this output.
return pd.DataFrame(SAMPLE_DATA)
@env.task(cache=Cache(behavior="override", version_override="v1"))
async def consume_pandas(df: pd.DataFrame) -> int:
# Cached on the input's content hash: identical content -> cache hit.
return int(df["value"].sum())
The producer’s return annotation tells Flyte to compute the content hash; the consumer is cached on it. The same pattern works for polars:
def hash_polars_dataframe(df: pl.DataFrame) -> str:
return str(df.hash_rows().sum())
HashedPolarsDataFrame = Annotated[pl.DataFrame, HashFunction.from_fn(hash_polars_dataframe)]
@env.task
async def produce_polars() -> HashedPolarsDataFrame:
return pl.DataFrame(SAMPLE_DATA)
@env.task(cache=Cache(behavior="override", version_override="v1"))
async def consume_polars(df: pl.DataFrame) -> int:
return int(df["value"].sum())
For flyte.io.DataFrame, pass the HashFunction to DataFrame.from_local via the hash_method parameter instead of annotating the return type:
@env.task
async def produce_flyte_dataframe() -> DataFrame:
df = pd.DataFrame(SAMPLE_DATA)
# For flyte.io.DataFrame, pass the HashFunction to `from_local` instead of
# annotating the return type.
hash_method = HashFunction.from_fn(hash_pandas_dataframe)
return await DataFrame.from_local(df, hash_method=hash_method)
@env.task(cache=Cache(behavior="override", version_override="v1"))
async def consume_flyte_dataframe(df: DataFrame) -> int:
pdf = await df.open(pd.DataFrame).all()
return int(pdf["value"].sum())
Files
A flyte.io.File accepts a hash_method on File.from_local (and on File.new_remote). Pass a HashFunction that hashes the file’s bytes, and the file is cached on its content rather than its remote path:
def hash_bytes(data: bytes) -> str:
import hashlib
return hashlib.sha256(data).hexdigest()
@env.task
async def produce_file() -> File:
import aiofiles
async with aiofiles.open("/tmp/data.csv", "w") as fh:
await fh.write("id,value\n1,100\n2,200\n")
# Pass a HashFunction (over the uploaded bytes) to `from_local` - the same
# mechanism works for `File.new_remote(...)`. The File is then cached on its
# content rather than its remote path.
return await File.from_local("/tmp/data.csv", hash_method=HashFunction.from_fn(hash_bytes))
@env.task(cache=Cache(behavior="override", version_override="v1"))
async def consume_file(f: File) -> str:
async with f.open("rb") as fh:
return hash_bytes(bytes(await fh.read()))
Directories
flyte.io.Dir.from_local does not take a HashFunction callable. Instead, compute a content key yourself and pass it as the precomputed dir_cache_key:
@env.task
async def produce_dir() -> Dir:
import os
os.makedirs("/tmp/data_dir", exist_ok=True)
with open("/tmp/data_dir/part.csv", "w") as fh:
fh.write("id,value\n1,100\n")
# `Dir.from_local` does not take a HashFunction callable. Instead, compute a
# content key yourself and pass it as the precomputed `dir_cache_key`.
content_key = hash_bytes(b"id,value\n1,100\n")
return await Dir.from_local("/tmp/data_dir/", dir_cache_key=content_key)
@env.task(cache=Cache(behavior="override", version_override="v1"))
async def consume_dir(d: Dir) -> int:
count = 0
async for _ in d.walk():
count += 1
return count
Cache policies
For details on implementing custom cache policies, see the
CachePolicy protocol and
Cache class API references.
For behavior="auto", Flyte uses cache policies to generate version hashes.
Function body policy (default)
The default FunctionBodyPolicy generates cache versions from the function’s source code:
from flyte._cache import FunctionBodyPolicy
@env.task(cache=flyte.Cache(
behavior="auto",
policies=[FunctionBodyPolicy()] # This is the default. Does not actually need to be specified.
))
async def code_sensitive_task(data: str) -> str:
return await transform_data(data)
Custom cache policies
You can implement custom cache policies by following the CachePolicy protocol:
from flyte._cache import CachePolicy
class DatasetVersionPolicy(CachePolicy):
def get_version(self, salt: str, params) -> str:
# Generate version based on custom logic
dataset_version = get_dataset_version()
return f"{salt}_{dataset_version}"
@env.task(cache=flyte.Cache(behavior="auto", policies=[DatasetVersionPolicy()]))
async def dataset_dependent_task(data: str) -> str:
# Cache invalidated when dataset version changes
return await transform_data(data)
Caching configuration at different levels
You can configure caching at three levels: TaskEnvironment definition, @env.task decorator, and task invocation.
TaskEnvironment level
You can configure caching at the TaskEnvironment level.
This will set the default cache behavior for all tasks defined using that environment.
For example:
cached_env = flyte.TaskEnvironment(
name="cached_environment",
cache=flyte.Cache(behavior="auto") # Default for all tasks
)
@cached_env.task # Inherits auto caching from environment
async def inherits_caching(data: str) -> str:
return await transform_data(data)
@env.task decorator level
By setting the cache parameter in the @env.task decorator, you can override the environment’s default cache behavior for specific tasks:
@cached_env.task(cache=flyte.Cache(behavior="disable")) # Override environment default
async def decorator_caching(data: str) -> str:
return await transform_data(data)
task.override level
By setting the cache parameter in the task.override method, you can override the cache behavior for specific task invocations:
@env.task
async def override_caching_on_call(data: str) -> str:
# Create an overridden version and call it
overridden_task = inherits_caching.override(cache=flyte.Cache(behavior="disable"))
return await overridden_task(data)
Runtime cache control
You can also force cache invalidation for a specific run:
# Disable caching for this specific execution
run = flyte.with_runcontext(overwrite_cache=True).run(my_cached_task, data="test")Project and domain cache isolation
Caches are automatically isolated by:
- Project: Tasks in different projects have separate cache namespaces.
- Domain: Development, staging, and production domains maintain separate caches.
Local development caching
When running locally, Flyte maintains a local cache:
# Local execution uses ~/.flyte/local-cache/
flyte.init() # Local mode
result = flyte.run(my_cached_task, data="test")Local cache behavior:
- Stored in
~/.flyte/local-cache/directory - No project/domain isolation (since running locally)
- Disabled by setting
FLYTE_LOCAL_CACHE_ENABLED=false