Fanout
Flyte is designed to scale effortlessly, allowing you to run workflows with large fan-outs. When you need to execute many tasks in parallel—such as processing a large dataset or running hyperparameter sweeps—Flyte provides powerful patterns to implement these operations efficiently.
Understanding fanout
A “fanout” pattern occurs when you spawn multiple tasks concurrently.
Each task runs in its own container and contributes an output that you later collect.
The most common way to implement this is using the
asyncio.gather function.
In Flyte terminology, each individual task execution is called an “action”—this represents a specific invocation of a task with particular inputs. When you call a task multiple times in a loop, you create multiple actions.
Example
We start by importing our required packages, defining our Flyte environment, and creating a simple task that fetches user data from a mock API.
import asyncio
from typing import List, Tuple
import flyte
env = flyte.TaskEnvironment("fanout_env")
@env.task
async def fetch_data(user_id: int) -> dict:
"""Simulate fetching user data from an API - good for parallel execution."""
# Simulate network I/O delay
await asyncio.sleep(0.1)
return {
"user_id": user_id,
"name": f"User_{user_id}",
"score": user_id * 10,
"data": f"fetched_data_{user_id}"
}
@env.task
async def parallel_data_fetching(user_ids: List[int]) -> List[dict]:
"""Fetch data for multiple users in parallel - ideal for I/O bound operations."""
tasks = []
# Collect all fetch tasks - these can run in parallel since they're independent
for user_id in user_ids:
tasks.append(fetch_data(user_id))
# Execute all fetch operations in parallel
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
flyte.init_from_config()
user_ids = [1, 2, 3, 4, 5]
r = flyte.run(parallel_data_fetching, user_ids)
print(r.name)
print(r.url)
r.wait()Parallel execution
Next we implement the most common fanout pattern, which is to collect task invocations and execute them in parallel using asyncio.gather():
@env.task
async def parallel_data_fetching(user_ids: List[int]) -> List[dict]:
"""Fetch data for multiple users in parallel - ideal for I/O bound operations."""
tasks = []
# Collect all fetch tasks - these can run in parallel since they're independent
for user_id in user_ids:
tasks.append(fetch_data(user_id))
# Execute all fetch operations in parallel
results = await asyncio.gather(*tasks)
return resultsRunning the example
To actually run our example, we create a main guard that intializes Flyte and runs our main driver task:
if __name__ == "__main__":
flyte.init_from_config()
user_ids = [1, 2, 3, 4, 5]
r = flyte.run(parallel_data_fetching, user_ids)
print(r.name)
print(r.url)
r.wait()How Flyte handles concurrency and parallelism
In the example we use a standard asyncio.gather() pattern.
When this pattern is used in a normal Python environment, the tasks would execute concurrently (cooperatively sharing a single thread through the event loop), but not in true parallel (multiple CPU cores simultaneously).
However, Flyte transforms this concurrency model into true parallelism. When you use asyncio.gather() in a Flyte task:
- Flyte acts as a distributed event loop: Instead of scheduling coroutines on a single machine, Flyte schedules each task action to run in its own container across the cluster
- Concurrent becomes parallel: What would be cooperative multitasking in regular Python becomes true parallel execution across multiple machines
- Native Python patterns: You use familiar
asynciopatterns, but Flyte automatically distributes the work
This means that when you write:
results = await asyncio.gather(fetch_data(1), fetch_data(2), fetch_data(3))Instead of three coroutines sharing one CPU, you get three separate containers running simultaneously, each with their own CPU, memory, and resources. Flyte seamlessly bridges the gap between Python’s concurrency model and distributed parallel computing, allowing for massive scalability while maintaining the familiar async/await programming model.