Controlling parallel execution

When you fan out to many tasks, you often need to limit how many run at the same time. Common reasons include rate-limited APIs, GPU quotas, database connection limits, or simply avoiding overwhelming a downstream service.

Flyte 2 provides two ways to control concurrency: asyncio.Semaphore for fine-grained control, and flyte.map with a built-in concurrency parameter for simpler cases.

The problem: unbounded parallelism

A straightforward asyncio.gather launches every task at once. If you are calling an external API that allows only a few concurrent requests, this can cause throttling or errors:

controlling_parallelism.py
import asyncio

import flyte

env = flyte.TaskEnvironment("controlling_parallelism")


@env.task
async def call_llm_api(prompt: str) -> str:
    """Simulate calling a rate-limited LLM API."""
    # In a real workflow, this would call an external API.
    # The API might allow only a few concurrent requests.
    await asyncio.sleep(0.5)
    return f"Response to: {prompt}"
controlling_parallelism.py
@env.task
async def process_all_at_once(prompts: list[str]) -> list[str]:
    """Send all requests in parallel with no concurrency limit.

    This can overwhelm a rate-limited API, causing errors or throttling.
    """
    results = await asyncio.gather(*[call_llm_api(p) for p in prompts])
    return list(results)

With eight prompts, this fires eight concurrent API calls. That works fine when there are no limits, but will fail when the API enforces a concurrency cap.

Using asyncio.Semaphore

An asyncio.Semaphore acts as a gate: only a fixed number of tasks can pass through at a time. The rest wait until a slot opens up.

controlling_parallelism.py
@env.task
async def process_batch_with_semaphore(
    prompts: list[str],
    max_concurrent: int = 3,
) -> list[str]:
    """Process prompts in parallel, limiting concurrency with a semaphore.

    At most `max_concurrent` calls to the API run at any given time.
    The remaining tasks wait until a slot is available.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_call(prompt: str) -> str:
        async with semaphore:
            return await call_llm_api(prompt)

    results = await asyncio.gather(*[limited_call(p) for p in prompts])
    return list(results)

The pattern is:

  1. Create a semaphore with the desired limit.
  2. Wrap each task call in an inner async function that acquires the semaphore before calling and releases it after.
  3. Pass all wrapped calls to asyncio.gather.

All eight tasks are submitted immediately, but the Flyte orchestrator only allows three to run in parallel. As each one completes, the next waiting task starts.

The semaphore controls how many tasks execute concurrently on the Flyte cluster. Each task still runs in its own container with its own resources — the semaphore simply limits how many containers are active at a time.

Using flyte.map with concurrency

For uniform work — applying the same task to a list of inputs — flyte.map with the concurrency parameter is simpler:

controlling_parallelism.py
@env.task
async def process_batch_with_map(prompts: list[str]) -> list[str]:
    """Process prompts using flyte.map with a built-in concurrency limit.

    This is the simplest approach when every item goes through the same task.
    """
    results = list(flyte.map(call_llm_api, prompts, concurrency=3))
    return results

This achieves the same concurrency limit with less boilerplate.

Running the example

controlling_parallelism.py
if __name__ == "__main__":
    flyte.init_from_config()
    prompts = [
        "Summarize this text",
        "Translate to French",
        "Extract key points",
        "Generate a title",
        "Write a conclusion",
        "List the main topics",
        "Identify the tone",
        "Suggest improvements",
    ]
    r = flyte.run(process_batch_with_semaphore, prompts)
    print(r.name)
    print(r.url)
    r.wait()

When to use each approach

Use flyte.map(concurrency=N) when:

  • Every item goes through the same task.
  • You want the simplest possible code.

Use asyncio.Semaphore when:

  • You need different concurrency limits for different task types within the same workflow.
  • You want to combine concurrency control with error handling (e.g., asyncio.gather(*tasks, return_exceptions=True)).
  • You are calling multiple different tasks in one parallel batch.