Data flow
Understanding how data flows between tasks is critical for optimizing workflow performance in Flyte. Tasks take inputs and produce outputs, with data flowing seamlessly through your workflow using an efficient transport layer.
Overview
Flyte tasks are run to completion. Each task takes inputs and produces exactly one output. Even if multiple instances run concurrently (such as in retries), only one output will be accepted. This deterministic data flow model provides several key benefits:
- Reduced boilerplate: Automatic handling of files, DataFrames, directories, custom types, data classes, Pydantic models, and primitive types without manual serialization.
- Type safety: Optional type annotations enable deeper type understanding, automatic UI form generation, and runtime type validation.
- Efficient transport: Data is passed by reference (files, directories, DataFrames) or by value (primitives) based on type.
- Durable storage: All data is stored durably and accessible through APIs and the UI.
- Caching support: Efficient caching using shallow immutable references for referenced data.
Data types and transport
Flyte handles different data types with different transport mechanisms:
Passed by reference
These types are not copied but passed as references to storage locations:
- Files:
flyte.io.File - Directories:
flyte.io.Directory - Dataframes:
flyte.io.DataFrame,pd.DataFrame,pl.DataFrame, etc.
Dataframes are automatically converted to Parquet format and read using Arrow for zero-copy reads. Use flyte.io.DataFrame for lazy materialization to any supported type like pandas or polars.
Passed by value (inline I/O)
Primitive and structured types are serialized and passed inline:
| Type Category | Examples | Serialization |
|---|---|---|
| Primitives | int, float, str, bool, None |
MessagePack |
| Time types | datetime.datetime, datetime.date, datetime.timedelta |
MessagePack |
| Collections | list, dict, tuple |
MessagePack |
| Data structures | data classes, Pydantic BaseModel |
MessagePack |
| Enums | enum.Enum subclasses |
MessagePack |
| Unions | Union[T1, T2], Optional[T] |
MessagePack |
| Protobuf | google.protobuf.Message |
Binary |
Flyte uses efficient MessagePack serialization for most types, providing compact binary representation with strong type safety.
If type annotations are not used, or if typing.Any or unrecognized types are used, data will be pickled. By default, picked objects smaller than 10KB are passed inline, while larger picked objects are automatically passed as a file. Pickling allows for progressive typing but should be used carefully.
Task execution and data flow
Input download
When a task starts:
- Inline inputs download: The task downloads inline inputs from the configured Flyte object store.
- Size limits: By default, inline inputs are limited to 10MB, but this can be adjusted using
flyte.TaskEnvironment’smax_inline_ioparameter. - Memory consideration: Inline data is materialized in memory, so adjust your task resources accordingly.
- Reference materialization: Reference data (files, directories) is passed using special types in
flyte.io. Dataframes are automatically materialized if usingpd.DataFrame. Useflyte.io.DataFrameto avoid automatic materialization.
Output upload
When a task returns data:
- Inline data: Uploaded to the Flyte object store configured at the organization, project, or domain level.
- Reference data: Stored in the same metadata store by default, or configured using
flyte.with_runcontext(raw_data_storage=...). - Separate prefixes: Each task creates one output per retry attempt in separate prefixes, making data incorruptible by design.
Task-to-task data flow
When a task invokes downstream tasks:
- Input recording: The input to the downstream task is recorded to the object store.
- Reference upload: All referenced objects are uploaded (if not already present).
- Task invocation: The downstream task is invoked on the remote server.
- Parallel execution: When multiple tasks are invoked in parallel using
flyte.maporasyncio, inputs are written in parallel. - Storage layer: Data writing uses the
flyte.storagelayer, backed by the Rust-basedobject-storecrate and optionallyfsspecplugins. - Output download: Once the downstream task completes, inline outputs are downloaded and returned to the calling task.
Caching and data hashing
Understanding how Flyte caches data is essential for performance optimization.
Cache key computation
A cache hit occurs when the following components match:
- Task name: The fully-qualified task name
- Computed input hash: Hash of all inputs (excluding
ignored_inputs) - Task interface hash: Hash of input and output types
- Task config hash: Hash of task configuration
- Cache version: User-specified or automatically computed
Inline data caching
All inline data is cached using a consistent hashing system. The cache key is derived from the data content.
Reference data hashing
Reference data (files, directories) is hashed shallowly by default using the hash of the storage location. You can customize hashing:
- Use
flyte.io.File.new_remote()orflyte.io.File.from_existing_remote()with custom hash functions or values. - Provide explicit hash values for deep content hashing if needed.
Cache control
Control caching behavior using flyte.with_runcontext:
- Scope: Set
cache_lookup_scopeto"global"or"project/domain". - Disable cache: Set
overwrite_cache=Trueto force re-execution.
For more details on caching configuration, see Caching.
Traces and data flow
When using traces, the data flow behavior is different:
- Full execution first: The trace is fully executed before inputs and outputs are recorded.
- Checkpoint behavior: Recording happens like a checkpoint at the end of trace execution.
- Streaming iterators: The entire output is buffered and recorded after the stream completes. Buffering is pass-through, allowing caller functions to consume output while buffering.
- Chained traces: All traces are recorded after the last one completes consumption.
- Same process with
asyncio: Traces run within the same Python process and supportasyncioparallelism, so failures can be retried, effectively re-running the trace. - Lightweight overhead: Traces only have the overhead of data storage (no task orchestration overhead).
Traces are not a substitute for tasks if you need caching. Tasks provide full caching capabilities, while traces provide lightweight checkpointing with storage overhead. However, traces support concurrent execution using asyncio patterns within a single task.
Object stores and latency considerations
By default, Flyte uses object stores like S3, GCS, Azure Storage, and R2 as metadata stores. These have high latency for smaller objects, so:
- Minimum task duration: Tasks should take at least a second to run to amortize storage overhead.
- Future improvements: High-performance metastores like Redis and PostgreSQL may be supported in the future. Contact the Union team if you’re interested.
Configuring data storage
Organization and project level
Object stores are configured at the organization level or per project/domain. Documentation for this configuration is coming soon.
Per-run configuration
Configure raw data storage on a per-run basis using flyte.with_runcontext:
run = flyte.with_runcontext(
raw_data_storage="s3://my-bucket/custom-path"
).run(my_task, input_data=data)This allows you to control where reference data (files, directories, DataFrames) is stored for specific runs.