2.0.11

DynamicBatcher

Package: flyte.extras

Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.

The batcher runs two internal loops:

  1. Aggregation loop — drains the submission queue and assembles cost-budgeted batches, respecting target_batch_cost, max_batch_size, and batch_timeout_s.
  2. Processing loop — pulls assembled batches and calls process_fn, resolving each record’s asyncio.Future.

Type Parameters: RecordT: The input record type produced by your tasks. ResultT: The per-record output type returned by process_fn.

Parameters

class DynamicBatcher(
    process_fn: ProcessFn[RecordT, ResultT],
    cost_estimator: CostEstimatorFn[RecordT] | None,
    target_batch_cost: int,
    max_batch_size: int,
    min_batch_size: int,
    batch_timeout_s: float,
    max_queue_size: int,
    prefetch_batches: int,
    default_cost: int,
)
Parameter Type Description
process_fn ProcessFn[RecordT, ResultT] async def f(batch: list[RecordT]) -> list[ResultT] Must return results in the same order as the input batch.
cost_estimator CostEstimatorFn[RecordT] | None Optional (RecordT) -> int function. When provided, it is called to estimate the cost of each submitted record. Falls back to record.estimate_cost() if the record implements CostEstimator, then to default_cost.
target_batch_cost int Cost budget per batch. The aggregator fills batches up to this limit before dispatching.
max_batch_size int Hard cap on records per batch regardless of cost budget.
min_batch_size int Minimum records before dispatching. Ignored when the timeout fires or shutdown is in progress.
batch_timeout_s float Maximum seconds to wait for a full batch. Lower values reduce idle time but may produce smaller batches.
max_queue_size int Bounded queue size. When full, submit awaits (backpressure).
prefetch_batches int Number of pre-assembled batches to buffer between the aggregation and processing loops.
default_cost int Fallback cost when no estimator is available.

Properties

Property Type Description
is_running None Whether the aggregation and processing loops are active.
stats None Current BatchStats snapshot.

Methods

Method Description
start() Start the aggregation and processing loops.
stop() Graceful shutdown: process all enqueued work, then stop.
submit() Submit a single record for batched processing.
submit_batch() Convenience: submit multiple records and return their futures.

start()

def start()

Start the aggregation and processing loops.

Raises

Exception Description
RuntimeError If the batcher is already running.

stop()

def stop()

Graceful shutdown: process all enqueued work, then stop.

Blocks until every pending future is resolved.

submit()

def submit(
    record: RecordT,
    estimated_cost: int | None,
) -> asyncio.Future[ResultT]

Submit a single record for batched processing.

Returns an asyncio.Future that resolves once the batch containing this record has been processed.

Parameter Type Description
record RecordT The input record.
estimated_cost int | None Optional explicit cost. When omitted the batcher tries cost_estimator, then record.estimate_cost(), then default_cost.

Returns

A future whose result is the corresponding entry from the list returned by process_fn.

Raises

Exception Description
RuntimeError If the batcher is not running.

If the internal queue is full this coroutine awaits until space is available, providing natural backpressure to fast producers.

submit_batch()

def submit_batch(
    records: Sequence[RecordT],
    estimated_cost: Sequence[int] | None,
) -> list[asyncio.Future[ResultT]]

Convenience: submit multiple records and return their futures.

Parameter Type Description
records Sequence[RecordT] Iterable of input records.
estimated_cost Sequence[int] | None Optional per-record cost estimates. Length must match records when provided.

Returns: List of futures, one per record.