Producer batching unification
Status: Implemented in PR #401 Date: 2026-05-28 Issue: #400 Closes: #293, #294, #295, #296, #377 (as a side effect of the refactor)
Problem
Seven producer blocks each handle (or fail to handle) batching differently:
| Producer | Bounded/Streaming | batch_size today | Behavior |
|---|---|---|---|
std/read | bounded | yes, default 1000 (on batch_size_in_std_read_block branch) | custom process_batch accumulator |
files/read_csv | bounded | yes, default 1000 | own islice(reader, batch_size) loop |
relational/read | bounded | no — hardcoded fetchmany(10000) | yields one row at a time downstream (bug) |
parquet/read | bounded | no | yields one row at a time (bug) |
redis/read_stream | streaming | no | yields one record at a time (bug #377) |
azure/read_event_hub | streaming | yes, default 300, but controls SDK callback batch size, not pipeline batch size | drains internal queue in unbounded batches |
http/receiver | streaming | no | yields one record per HTTP request (bug) |
Four are actively buggy (yielding single records into the pipeline when batches are intended). One uses batch_size with a different semantic. Each producer that has implemented batching has done it differently.
The duplication is the root cause of issues #294, #295, #296, and #377 — all four are the same gap, in different blocks.
Goal
Make the Producer base class own batching. Subclasses describe how to fetch records; the base class controls the size and timing of batches yielded to the pipeline.
After the change:
batch_sizemeans the same thing in every producer: the maximum number of records yielded per downstream batch.- Adding a new producer cannot reintroduce the “yield single records” bug — there’s no place for it to happen.
- Streaming producers get an optional
flush_msso partial batches flush on inactivity instead of being held indefinitely.
Non-goals: changing the Job/Step pipeline, adding new sources, restructuring the Result/payload model (that’s #245).
Design
Base-class contract
# core/src/datayoga_core/producer.py
class Producer(Block):
DEFAULT_BATCH_SIZE = 1000
DEFAULT_FLUSH_MS = None # streaming subclasses override
@abstractmethod
async def produce_chunks(self) -> AsyncGenerator[List[Message], None]:
"""Yield natural chunks of any size. Base class re-chunks to batch_size."""
raise NotImplementedError
async def produce(self) -> AsyncGenerator[List[Message], None]:
"""Public entry point. Reads chunks from produce_chunks() and re-emits
in batches of up to batch_size (smaller on EOS or flush_ms), with
bounded backpressure and source-error propagation."""
...
Subclasses override produce_chunks instead of produce. They emit chunks of any size — whatever’s natural to the source (a Parquet row group, a fetchmany result, an xreadgroup response, an Event Hub callback batch, a single record).
The base class accumulates chunks and re-emits them in batches of up to batch_size, flushing whatever’s left on end-of-stream and (for streaming sources) on flush_ms inactivity.
batch_size and flush_ms are read lazily
produce() reads self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE) on first call, not in init(). This avoids the “subclass forgot super().init(context)” footgun.
flush_ms implementation
For streaming sources, partial batches must flush on inactivity, otherwise a low-traffic stream could hold records indefinitely.
Implementation uses an internal bounded queue + background pump task. The pump captures source errors and re-raises on the consumer side, so failures aren’t silently treated as EOS:
async def produce(self) -> AsyncGenerator[List[Dict[str, Any]], None]:
batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE))
flush_ms = self.properties.get("flush_ms", self.DEFAULT_FLUSH_MS)
timeout = (flush_ms / 1000) if flush_ms else None
# maxsize=1 preserves the natural backpressure the old yield-driven model
# had: the pump can be at most one chunk ahead of the consumer.
queue: asyncio.Queue = asyncio.Queue(maxsize=1)
EOS = object()
pump_error: List[BaseException] = [] # captured non-cancellation errors
async def pump():
cancelled = False
try:
async for chunk in self.produce_chunks():
if chunk:
await queue.put(chunk)
except asyncio.CancelledError:
cancelled = True
raise
except BaseException as exc:
pump_error.append(exc)
finally:
# Skip the EOS put on cancellation — the consumer's finally is
# awaiting us and the queue may be full; putting would deadlock.
if not cancelled:
await queue.put(EOS)
pump_task = asyncio.create_task(pump())
buffer: List[Dict[str, Any]] = []
try:
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=timeout)
except asyncio.TimeoutError:
if buffer:
yield buffer
buffer = []
continue
if item is EOS:
if buffer:
yield buffer
if pump_error:
raise pump_error[0] # propagate source error to caller
return
buffer.extend(item)
while len(buffer) >= batch_size:
yield buffer[:batch_size]
buffer = buffer[batch_size:]
finally:
pump_task.cancel()
with suppress(asyncio.CancelledError, Exception):
await pump_task
Why a queue and not asyncio.wait_for(anext(gen), timeout): cancelling __anext__ on an async generator with side effects (open connections, partial reads) can leave it in a broken state. Cancelling the pump task boundary is safe; the generator finishes its current chunk before the pump’s try/finally runs.
Why maxsize=1 and the cancelled flag: an unbounded queue removes backpressure — the pump could pre-load an entire parquet or relational table into memory while the consumer is processing batch 1 (flagged by Copilot review). Bounding at 1 keeps memory flat at the cost of a deadlock when the consumer is cancelled mid-flow (the pump’s finally: put(EOS) blocks against a full queue). The cancelled flag skips the EOS put on cancellation, since the consumer is gone and EOS doesn’t need to be delivered.
Why pump_error: catching all exceptions in the pump and letting it terminate via EOS would silently truncate input on a source failure (Redis disconnect, broken CSV, DB error) — the consumer would see clean end-of-stream against partial data. Capturing the exception and re-raising on the consumer side makes the job fail loudly instead (also flagged by Copilot review).
flush_ms = None ⇒ timeout = None ⇒ queue.get() waits forever ⇒ no time-based flush. Bounded sources don’t set flush_ms and aren’t affected.
Schema composition (standard JSON Schema)
Two shared fragments in core/src/datayoga_core/resources/schemas/ declare the common properties:
batchable.schema.jsondeclaresbatch_size.streamable.schema.jsondeclares bothbatch_sizeandflush_ms.
Each block schema uses standard JSON Schema composition: allOf + $ref to the fragment file, plus unevaluatedProperties: false (rather than additionalProperties: false) so the fragment-contributed properties are recognized as evaluated. Example:
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"title": "std.read",
"type": "object",
"allOf": [{ "$ref": "../../../resources/schemas/batchable.schema.json" }],
"properties": {},
"unevaluatedProperties": false
}
At load time, schema_utils.resolve_refs(schema, schema_path) walks the schema, finds any local-file $ref (relative path, ends in .json, no URI scheme, no in-document fragment), and inlines the referenced file’s contents in place. The resulting in-memory schema is self-contained — no remaining $refs — so Block.validate() keeps using the simple jsonschema.validate(instance, schema) code path. The on-disk schemas remain standards-compliant; the resolution is purely a runtime detail to avoid threading a RefResolver through every validation site.
unevaluatedProperties: false (introduced in draft 2019-09) is what makes composition + strict property validation work: with additionalProperties: false, a property contributed by an allOf member would be rejected as “additional” at the parent level. unevaluatedProperties is composition-aware.
External tools that ARE $ref-aware (IDE schema validators, OpenAPI exporters) read the on-disk schemas correctly without our resolver. The jsonschema2mk docs generator is not $ref-aware, so scripts/generate-docs.sh pre-resolves $ref and flattens allOf properties for docs rendering only.
Per-producer changes
std/read (bounded)
Replace process_batch with a single-chunk yield. Base class slices.
async def produce_chunks(self):
if select.select([sys.stdin], [], [], 0.0)[0]:
all_records = [r for line in sys.stdin for r in self.get_records(line)]
else:
print("Enter data to process:")
all_records = self.get_records(input())
if all_records:
yield [self.get_message(r) for r in all_records]
files/read_csv (bounded)
Drops the islice loop; yield in batch_size chunks. Base class re-emits.
async def produce_chunks(self):
batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE))
with open(self.file, "r", encoding=self.encoding) as f:
reader = DictReader(f, fieldnames=self.fields,
delimiter=self.delimiter, quotechar=self.quotechar)
for _ in range(self.skip):
next(reader, None)
counter = iter(count())
while True:
chunk = [{self.MSG_ID_FIELD: f"{next(counter)}", **r}
for r in islice(reader, batch_size)]
if not chunk:
return
yield chunk
relational/read (bounded)
batch_size uses the framework default (1000). fetch_size defaults to 10000 to preserve today’s driver-roundtrip count as the no-config baseline. Result: strict improvement vs. today (downstream goes from 1-record batches to 1000-record batches; DB roundtrips stay at 10000).
class Block(DyProducer):
DEFAULT_FETCH_SIZE = 10000
async def produce_chunks(self):
fetch_size = int(self.properties.get("fetch_size", self.DEFAULT_FETCH_SIZE))
result = self.connection.execution_options(stream_results=True).execute(self.tbl.select())
while True:
rows = result.fetchmany(fetch_size)
if not rows:
return
yield [utils.add_uid(dict(r._asdict())) for r in rows]
Schema adds optional fetch_size with default 10000.
parquet/read (bounded)
Fix one-by-one yield. Each row group becomes one chunk; base class re-emits in batch_size slices.
async def produce_chunks(self):
pf = ParquetFile(self.file)
counter = iter(count())
for df in pf.iter_row_groups():
yield [{self.MSG_ID_FIELD: str(next(counter)), **row.to_dict()}
for _, row in df.iterrows()]
redis/read_stream (streaming, closes #377)
Use count=batch_size on xreadgroup. Yield each batch as a chunk. Class overrides DEFAULT_FLUSH_MS = 1000.
class Block(DyProducer):
DEFAULT_FLUSH_MS = 1000
async def produce_chunks(self):
batch_size = int(self.properties.get("batch_size", self.DEFAULT_BATCH_SIZE))
read_pending = True
while True:
streams = self.redis_client.xreadgroup(
self.consumer_group, self.requesting_consumer,
{self.stream: "0" if read_pending else ">"},
count=batch_size,
block=100 if self.snapshot else 0, # streaming blocks forever; snapshot polls briefly
)
for stream in streams:
chunk = []
for key, value in stream[1]:
payload = orjson.loads(value[next(iter(value))])
payload[self.MSG_ID_FIELD] = key
chunk.append(payload)
if chunk:
yield chunk
if self.snapshot and not read_pending:
return
read_pending = False
flush_ms (default 1000) ensures partial batches flush during low-volume periods. The pump task can sit blocked inside xreadgroup indefinitely — that’s fine, because the pump and the consumer side of the base-class queue are decoupled. When a single message finally arrives, it lands in the queue immediately and flush_ms flushes the partial batch downstream.
azure/read_event_hub (streaming, breaking change)
Existing batch_size property → renamed max_batch_size (matches SDK semantic, default 300). New batch_size (pipeline semantic, default 1000) comes from the streamable fragment.
class Block(DyProducer):
DEFAULT_FLUSH_MS = 1000
def init(self, context=None):
self.max_batch_size = int(self.properties.get("max_batch_size", 300))
# ... existing client setup ...
self.events = {}
self.messages = asyncio.Queue()
async def produce_chunks(self):
asyncio.create_task(self.receive_batch()) # uses self.max_batch_size
while True:
msg = await self.messages.get()
chunk = [msg]
while not self.messages.empty():
chunk.append(self.messages.get_nowait())
yield chunk
Migration: Users with batch_size: 300 in YAML thinking it controls SDK callbacks must rename to max_batch_size: 300. No backward-compat shim. The literal batch_size: 300 still validates after the rename but now means pipeline batch size, not SDK callback size — that semantic shift is documented in the PR description.
The schema for azure/read_event_hub also gains unevaluatedProperties: false (it had no additionalProperties declaration before). Typos like batch_sz: 300 now fail validation loudly with a clear error.
http/receiver (streaming)
Drain the queue per chunk; flush_ms flushes partial batches when traffic is low.
class Block(DyProducer):
DEFAULT_FLUSH_MS = 1000
async def produce_chunks(self):
queue: Queue = Queue(maxsize=1000)
async def handler(request):
try:
queue.put_nowait(orjson.loads(await request.read()))
return HTTPOk()
except Exception:
logger.exception("Got exception while parsing request:")
return HTTPInternalServerError()
runner = ServerRunner(Server(handler))
await runner.setup()
srv = TCPSite(runner, self.host, self.port)
await srv.start()
try:
counter = iter(count())
while True:
msg = await queue.get()
chunk = [{self.MSG_ID_FIELD: f"{next(counter)}", **msg}]
while not queue.empty():
chunk.append({self.MSG_ID_FIELD: f"{next(counter)}", **queue.get_nowait()})
yield chunk
finally:
with suppress(Exception):
await srv.stop()
Defaults summary
| Producer | batch_size | flush_ms | Other |
|---|---|---|---|
std/read | 1000 | — | — |
files/read_csv | 1000 | — | — |
relational/read | 1000 | — | optional fetch_size, defaults to 10000 |
parquet/read | 1000 | — | — |
redis/read_stream | 1000 | 1000 | — |
azure/read_event_hub | 1000 | 1000 | max_batch_size 300 (renamed from old batch_size) |
http/receiver | 1000 | 1000 | — |
Tests
New base-class tests (core/src/datayoga_core/tests/test_producer_batching.py):
A FakeProducer whose produce_chunks yields scripted chunks. Cases:
- One 5000-record chunk +
batch_size=1000→ five batches of 1000. - Three chunks of [200, 300, 400] +
batch_size=1000→ one batch of 900 on EOS (no empty trailing). - 1500 records +
batch_size=1000→ batches of [1000, 500]. flush_ms=100with a producer that sleeps 200ms between chunks → partial batches flush on inactivity.flush_ms=Noneholds records indefinitely (asserted with a timeout that the next batch doesn’t arrive early).- Empty chunk yields are ignored (no empty batches emitted).
- Pump-task cleanup: cancelling the consumer cancels the pump cleanly (no warnings, no leaks).
Per-producer tests:
std/read,files/read_csv— existing tests adapted; assert batch counts/sizes matchbatch_size.relational/read— assert it yields batches (not single rows); assertfetch_sizecontrols driver calls independently ofbatch_size.parquet/read— multi-row-group file; batches honorbatch_sizeregardless of row-group boundaries.redis/read_stream— assertxreadgroupcalled withcount=batch_size. Theredis_to_relationalintegration test (mentioned in #377) provides the end-to-end signal; it depends on the batch-fallback inrelational/writeshipped in commit7e5b6f7, which is already in place.azure/read_event_hub— assert validation rejects legacybatch_size: 500with nomax_batch_size; assertmax_batch_size: 500, batch_size: 100results in SDK callbacks of 500 and downstream batches of 100.http/receiver— send N records via webhook; assert they land in batches ofbatch_size, or partial batches afterflush_ms.
Documentation
- Update
docs/reference/blocks/*_read.mdfor each affected producer (batch_size,flush_ms,fetch_size,max_batch_sizewhere applicable). - Add a section in
docs/processing-strategies.mdexplaining the producer batching model: chunked subclass output, base-class re-chunking,flush_msfor streaming sources. - PR description carries the breaking-change note (no CHANGELOG file in this repo):
- New
batch_size/flush_mson previously non-batching producers. - Breaking:
azure/read_event_hub.batch_sizerenamed tomax_batch_size; the namebatch_sizenow means pipeline batch size.
- New
Risks and trade-offs
-
ProducerABC change.produce_chunksis the new override hook (raises NotImplementedError by default; not formally@abstractmethodso legacy subclasses that still overrideproduce()directly continue to validate). All 7 in-tree producers were migrated to overrideproduce_chunks; external/downstream subclassers that overrideproduce()directly continue to work but bypass the base-class batching. Called out in the PR description. -
Event Hub silent-semantic-change risk. The breaking rename is intentional. Adding
unevaluatedProperties: falseto the Event Hub schema (which lacked anyadditionalPropertiesdeclaration before) catches typos loudly. The literalbatch_size: 300still validates after the rename but now means pipeline batch size, not SDK callback size — that semantic shift is documented in the PR description and the processing-strategies docs. -
flush_mssemantics on Job shutdown. When the producer is being cancelled (Job.shutdown→Step.stop), the pump’stry/finallyensuresEOSis queued. Theproduce()loop seesEOSand flushes the final partial batch. Verified by thetest_producer_batchingshutdown case. -
relational/readdefaults.fetch_sizedefaults to 10000 to preserve today’s DB roundtrip count.batch_sizedefaults to 1000, matching the framework default. Net effect vs. today: downstream batches grow from 1 to 1000 (huge improvement); DB roundtrips unchanged. Users with memory pressure on large rows can set a smallerfetch_sizeexplicitly. Documented in the block’s reference page. -
Re-chunking cost. Lists are sliced with
buffer[:n]/buffer[n:]— O(batch_size) per batch. Negligible relative to per-record block work; no benchmark required.
Out of scope
- Changing the
Result/payload internal field representation (issue #245). - Adding new connector blocks (Snowflake #392, Kafka, S3 #351, RabbitMQ #265, Kinesis #264).
- Pulling Prometheus out of core (#336).
- Backpressure / queue sizing changes to the
Steppipeline.