Flyte 2 is available today for local execution - distributed execution coming to open source soon. Preview Flyte 2 for production, hosted on Union.ai
2.0.9

DynamicBatcher

Package: flyte.extras

Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.

The batcher runs two internal loops:

1. **Aggregation loop** — drains the submission queue and assembles
   cost-budgeted batches, respecting `target_batch_cost`,
   `max_batch_size`, and `batch_timeout_s`.
2. **Processing loop** — pulls assembled batches and calls
   `process_fn`, resolving each record's `asyncio.Future`.

Type Parameters:
    RecordT: The input record type produced by your tasks.
    ResultT: The per-record output type returned by `process_fn`.

Args:
    process_fn:
        `async def f(batch: list[RecordT]) -> list[ResultT]`
        Must return results in the **same order** as the input batch.

    cost_estimator:
        Optional `(RecordT) -> int` function.  When provided, it is
        called to estimate the cost of each submitted record.
        Falls back to `record.estimate_cost()` if the record
        implements `CostEstimator`, then to `default_cost`.

    target_batch_cost:
        Cost budget per batch.  The aggregator fills batches up to
        this limit before dispatching.

    max_batch_size:
        Hard cap on records per batch regardless of cost budget.

    min_batch_size:
        Minimum records before dispatching.  Ignored when the timeout
        fires or shutdown is in progress.

    batch_timeout_s:
        Maximum seconds to wait for a full batch.  Lower values reduce
        idle time but may produce smaller batches.

    max_queue_size:
        Bounded queue size.  When full, `submit` awaits
        (backpressure).

    prefetch_batches:
        Number of pre-assembled batches to buffer between the
        aggregation and processing loops.

    default_cost:
        Fallback cost when no estimator is available.

Example::

    async def process(batch: list[dict]) -> list[str]:
        ...

    async with DynamicBatcher(process_fn=process) as batcher:
        futures = []
        for record in my_records:
            f = await batcher.submit(record)
            futures.append(f)
        results = await asyncio.gather(*futures)

Parameters

class DynamicBatcher(
    process_fn: ProcessFn[RecordT, ResultT],
    cost_estimator: CostEstimatorFn[RecordT] | None,
    target_batch_cost: int,
    max_batch_size: int,
    min_batch_size: int,
    batch_timeout_s: float,
    max_queue_size: int,
    prefetch_batches: int,
    default_cost: int,
)
Parameter Type Description
process_fn ProcessFn[RecordT, ResultT]
cost_estimator CostEstimatorFn[RecordT] | None
target_batch_cost int
max_batch_size int
min_batch_size int
batch_timeout_s float
max_queue_size int
prefetch_batches int
default_cost int

Properties

Property Type Description
is_running None Whether the aggregation and processing loops are active.
stats None Current BatchStats snapshot.

Methods

Method Description
start() Start the aggregation and processing loops.
stop() Graceful shutdown: process all enqueued work, then stop.
submit() Submit a single record for batched processing.
submit_batch() Convenience: submit multiple records and return their futures.

start()

def start()

Start the aggregation and processing loops.

Raises

Exception Description
RuntimeError If the batcher is already running.

stop()

def stop()

Graceful shutdown: process all enqueued work, then stop.

Blocks until every pending future is resolved.

submit()

def submit(
    record: RecordT,
    estimated_cost: int | None,
) -> asyncio.Future[ResultT]

Submit a single record for batched processing.

Returns an asyncio.Future that resolves once the batch containing this record has been processed.

Example::

future = await batcher.submit(my_record, estimated_cost=128)
result = await future
Parameter Type Description
record RecordT The input record.
estimated_cost int | None Optional explicit cost. When omitted the batcher tries cost_estimator, then record.estimate_cost(), then default_cost.

Returns

A future whose result is the corresponding entry from the list returned by process_fn.

Raises

Exception Description
RuntimeError If the batcher is not running.

If the internal queue is full this coroutine awaits until space is available, providing natural backpressure to fast producers.

submit_batch()

def submit_batch(
    records: Sequence[RecordT],
    estimated_cost: Sequence[int] | None,
) -> list[asyncio.Future[ResultT]]

Convenience: submit multiple records and return their futures.

Parameter Type Description
records Sequence[RecordT] Iterable of input records.
estimated_cost Sequence[int] | None Optional per-record cost estimates. Length must match records when provided.

Returns: List of futures, one per record.