Asynchronous model
Why we need an async model
The shift to an asynchronous model in Flyte 2 is driven by the need for more efficient and flexible workflow execution.
We believe, in particular, that with the rise of the agentic AI pattern, asynchronous programming has become an essential part of AI/ML engineering and data science toolkit.
With Flyte 2, the entire framework is now written with async constructs, allowing for:
- Seamless overlapping of I/O and independent external operations.
- Composing multiple tasks and external tool invocations within the same Python process.
- Native support of streaming operations for data, observability and downstream invocations.
It is also a natural fit for the expression parallelism in workflows.
Understanding concurrency vs. parallelism
Before diving into Flyte 2’s approach, it’s essential to understand the distinction between concurrency and parallelism:
Concurrency | Parallelism |
---|---|
Dealing with multiple tasks at once through interleaved execution, even on a single thread. | Executing multiple tasks truly simultaneously across multiple cores or machines. |
Performance benefits come from allowing the system to switch between tasks when one is waiting for external operations. | This is a subset of concurrency where tasks run at the same time rather than being interleaved. |
Python’s async evolution
Python’s asynchronous programming capabilities have evolved significantly:
- The GIL challenge: Python’s Global Interpreter Lock (GIL) traditionally prevented true parallelism for CPU-bound tasks, limiting threading effectiveness to I/O-bound operations.
- Traditional solutions:
multiprocessing
: Created separate processes to sidestep the GIL, effective but resource-intensivethreading
: Useful for I/O-bound tasks where the GIL could be released during external operations
- The async revolution: The
asyncio
library introduced cooperative multitasking within a single thread, using an event loop to manage multiple tasks efficiently.
Parallelism in Flyte 1 vs Flyte 2
Flyte 1 | Flyte 2 | |
---|---|---|
Parallelism | The workflow DSL automatically parallelized tasks that weren’t dependent on each other. The map operator allowed running a task multiple times in parallel with different inputs. |
Leverages Python’s asyncio as the primary mechanism for expressing parallelism, but with a crucial difference: the Flyte orchestrator acts as the event loop, managing task execution across distributed infrastructure. |
Core async concepts
async def
: Declares a function as a coroutine. When called, it returns a coroutine object managed by the event loop rather than executing immediately.await
: Pauses coroutine execution and passes control back to the event loop. In standard Python, this enables other tasks to run while waiting for I/O operations. In Flyte 2, it signals where tasks can be executed in parallel.asyncio.gather
: The primary tool for concurrent execution. In standard Python, it schedules multiple awaitable objects to run concurrently within a single event loop. In Flyte 2, it signals to the orchestrator that these tasks can be distributed across separate compute resources.
A practical example
Consider this pattern for parallel data processing:
import asyncio
import flyte
env = flyte.TaskEnvironment("data_pipeline")
@env.task
async def process_chunk(chunk_id: int, data: str) -> str:
# This could be any computational work - CPU or I/O bound
await asyncio.sleep(1) # Simulating work
return f"Processed chunk {chunk_id}: {data}"
@env.task
async def parallel_pipeline(data_chunks: List[str]) -> List[str]:
# Create coroutines for all chunks
tasks = []
for i, chunk in enumerate(data_chunks):
tasks.append(process_chunk(i, chunk))
# Execute all chunks in parallel
results = await asyncio.gather(*tasks)
return results
In standard Python, this would provide concurrency benefits primarily for I/O-bound operations.
In Flyte 2, the orchestrator schedules each process_chunk
task on separate Kubernetes pods or configured plugins, achieving true parallelism for any type of work.
True parallelism for all workloads
This is where Flyte 2’s approach becomes revolutionary: async syntax is not just for I/O-bound operations.
The async
/await
syntax becomes a powerful way to declare your workflow’s parallel structure for any type of computation.
When Flyte’s orchestrator encounters await asyncio.gather(...)
, it understands that these tasks are independent and can be executed simultaneously across different compute resources.
This means you achieve true parallelism for:
- CPU-bound computations: Heavy mathematical operations, model training, data transformations
- I/O-bound operations: Database queries, API calls, file operations
- Mixed workloads: Any combination of computational and I/O tasks
The Flyte platform handles the complex orchestration while you express parallelism using intuitive async
syntax.
Bridging the transition: Sync support and migration tools
Seamless synchronous task support
Recognizing that many existing codebases use synchronous functions, Flyte 2 provides seamless backward compatibility:
@env.task
def legacy_computation(x: int) -> int:
# Existing synchronous function works unchanged
return x * x + 2 * x + 1
@env.task
async def modern_workflow(numbers: List[int]) -> List[int]:
# Call sync tasks from async context using .aio()
tasks = []
for num in numbers:
tasks.append(legacy_computation.aio(num))
results = await asyncio.gather(*tasks)
return results
Under the hood, Flyte automatically “asyncifies” synchronous functions, wrapping them to participate seamlessly in the async execution model.
You don’t need to rewrite existing code—just leverage the .aio()
method when calling sync tasks from async contexts.
The flyte.map
function: Familiar patterns
For scenarios that previously used Flyte 1’s map
operation, Flyte 2 provides flyte.map
as a direct replacement.
The new flyte.map
can be used either in synchronous or asynchronous contexts, allowing you to express parallelism without changing your existing patterns.
@env.task
def sync_map_example(n: int) -> List[str]:
# Synchronous version for easier migration
results = []
for result in flyte.map(process_item, range(n)):
if isinstance(result, Exception):
raise result
results.append(result)
return results
@env.task
async def async_map_example(n: int) -> List[str]:
# Async version using flyte.map
results = []
async for result in flyte.map.aio(process_item, range(n)):
if isinstance(result, Exception):
raise result
results.append(result)
return results
The flyte.map
function provides:
- Dual interfaces:
flyte.map.aio()
for async contexts,flyte.map()
for sync contexts. - Built-in error handling:
return_exceptions
parameter for graceful failure handling. This matches theasyncio.gather
interface, allowing you to decide how to handle errors. If you are coming from Flyte 1, it allows you to replacemin_success_ratio
in a more flexible way. - Automatic UI grouping: Creates logical groups for better workflow visualization.
- Concurrency control: Optional limits for resource management.