2.0.6

DynamicBatcher

Package: flyte.extras

Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.

The batcher runs two internal loops:

  1. Aggregation loop — drains the submission queue and assembles cost-budgeted batches, respecting target_batch_cost, max_batch_size, and batch_timeout_s.
  2. Processing loop — pulls assembled batches and calls process_fn, resolving each record’s :class:asyncio.Future.

Type Parameters: RecordT: The input record type produced by your tasks. ResultT: The per-record output type returned by process_fn.

class DynamicBatcher(
    process_fn: ProcessFn[RecordT, ResultT],
    cost_estimator: CostEstimatorFn[RecordT] | None,
    target_batch_cost: int,
    max_batch_size: int,
    min_batch_size: int,
    batch_timeout_s: float,
    max_queue_size: int,
    prefetch_batches: int,
    default_cost: int,
)
Parameter Type Description
process_fn ProcessFn[RecordT, ResultT] async def f(batch: list[RecordT]) -> list[ResultT] Must return results in the same order as the input batch.
cost_estimator CostEstimatorFn[RecordT] | None Optional (RecordT) -> int function. When provided, it is called to estimate the cost of each submitted record. Falls back to record.estimate_cost() if the record implements :class:CostEstimator, then to default_cost.
target_batch_cost int Cost budget per batch. The aggregator fills batches up to this limit before dispatching.
max_batch_size int Hard cap on records per batch regardless of cost budget.
min_batch_size int Minimum records before dispatching. Ignored when the timeout fires or shutdown is in progress.
batch_timeout_s float Maximum seconds to wait for a full batch. Lower values reduce idle time but may produce smaller batches.
max_queue_size int Bounded queue size. When full, :meth:submit awaits (backpressure).
prefetch_batches int Number of pre-assembled batches to buffer between the aggregation and processing loops.
default_cost int Fallback cost when no estimator is available. …

Properties

Property Type Description
is_running None Whether the aggregation and processing loops are active.
stats None Current :class:BatchStats snapshot.

Methods

Method Description
start() Start the aggregation and processing loops.
stop() Graceful shutdown: process all enqueued work, then stop.
submit() Submit a single record for batched processing.
submit_batch() Convenience: submit multiple records and return their futures.

start()

def start()

Start the aggregation and processing loops.

Raises: RuntimeError: If the batcher is already running.

stop()

def stop()

Graceful shutdown: process all enqueued work, then stop.

Blocks until every pending future is resolved.

submit()

def submit(
    record: RecordT,
    estimated_cost: int | None,
) -> asyncio.Future[ResultT]

Submit a single record for batched processing.

Returns an :class:asyncio.Future that resolves once the batch containing this record has been processed.

Parameter Type Description
record RecordT The input record.
estimated_cost int | None Optional explicit cost. When omitted the batcher tries cost_estimator, then record.estimate_cost(), then default_cost.

submit_batch()

def submit_batch(
    records: Sequence[RecordT],
    estimated_cost: Sequence[int] | None,
) -> list[asyncio.Future[ResultT]]

Convenience: submit multiple records and return their futures.

Parameter Type Description
records Sequence[RecordT] Iterable of input records.
estimated_cost Sequence[int] | None Optional per-record cost estimates. Length must match records when provided.