2.0.6
DynamicBatcher
Package: flyte.extras
Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.
The batcher runs two internal loops:
- Aggregation loop — drains the submission queue and assembles
cost-budgeted batches, respecting
target_batch_cost,max_batch_size, andbatch_timeout_s. - Processing loop — pulls assembled batches and calls
process_fn, resolving each record’s :class:asyncio.Future.
Type Parameters:
RecordT: The input record type produced by your tasks.
ResultT: The per-record output type returned by process_fn.
class DynamicBatcher(
process_fn: ProcessFn[RecordT, ResultT],
cost_estimator: CostEstimatorFn[RecordT] | None,
target_batch_cost: int,
max_batch_size: int,
min_batch_size: int,
batch_timeout_s: float,
max_queue_size: int,
prefetch_batches: int,
default_cost: int,
)| Parameter | Type | Description |
|---|---|---|
process_fn |
ProcessFn[RecordT, ResultT] |
async def f(batch: list[RecordT]) -> list[ResultT] Must return results in the same order as the input batch. |
cost_estimator |
CostEstimatorFn[RecordT] | None |
Optional (RecordT) -> int function. When provided, it is called to estimate the cost of each submitted record. Falls back to record.estimate_cost() if the record implements :class:CostEstimator, then to default_cost. |
target_batch_cost |
int |
Cost budget per batch. The aggregator fills batches up to this limit before dispatching. |
max_batch_size |
int |
Hard cap on records per batch regardless of cost budget. |
min_batch_size |
int |
Minimum records before dispatching. Ignored when the timeout fires or shutdown is in progress. |
batch_timeout_s |
float |
Maximum seconds to wait for a full batch. Lower values reduce idle time but may produce smaller batches. |
max_queue_size |
int |
Bounded queue size. When full, :meth:submit awaits (backpressure). |
prefetch_batches |
int |
Number of pre-assembled batches to buffer between the aggregation and processing loops. |
default_cost |
int |
Fallback cost when no estimator is available. … |
Properties
| Property | Type | Description |
|---|---|---|
is_running |
None |
Whether the aggregation and processing loops are active. |
stats |
None |
Current :class:BatchStats snapshot. |
Methods
| Method | Description |
|---|---|
start() |
Start the aggregation and processing loops. |
stop() |
Graceful shutdown: process all enqueued work, then stop. |
submit() |
Submit a single record for batched processing. |
submit_batch() |
Convenience: submit multiple records and return their futures. |
start()
def start()Start the aggregation and processing loops.
Raises: RuntimeError: If the batcher is already running.
stop()
def stop()Graceful shutdown: process all enqueued work, then stop.
Blocks until every pending future is resolved.
submit()
def submit(
record: RecordT,
estimated_cost: int | None,
) -> asyncio.Future[ResultT]Submit a single record for batched processing.
Returns an :class:asyncio.Future that resolves once the batch
containing this record has been processed.
| Parameter | Type | Description |
|---|---|---|
record |
RecordT |
The input record. |
estimated_cost |
int | None |
Optional explicit cost. When omitted the batcher tries cost_estimator, then record.estimate_cost(), then default_cost. |
submit_batch()
def submit_batch(
records: Sequence[RecordT],
estimated_cost: Sequence[int] | None,
) -> list[asyncio.Future[ResultT]]Convenience: submit multiple records and return their futures.
| Parameter | Type | Description |
|---|---|---|
records |
Sequence[RecordT] |
Iterable of input records. |
estimated_cost |
Sequence[int] | None |
Optional per-record cost estimates. Length must match records when provided. |