Skip to content

refactor(scan): generalize batch coalescer into coalescing_gpu_ingestible#944

Open
kevkrist wants to merge 2 commits into
sirius-db:devfrom
kevkrist:scan-coalescing-gpu-ingestible
Open

refactor(scan): generalize batch coalescer into coalescing_gpu_ingestible#944
kevkrist wants to merge 2 commits into
sirius-db:devfrom
kevkrist:scan-coalescing-gpu-ingestible

Conversation

@kevkrist

@kevkrist kevkrist commented Jun 15, 2026

Copy link
Copy Markdown
Collaborator

What

Extracts the duckdb-native scan's batch coalescing into a reusable base so the native and
parquet scans share one path
, and moves multi-GPU device assignment to the consumer side for
both formats.

Changes

  • New coalescing_gpu_ingestible base owns the sealed consume_next_input → coalesce → emit_batch pipeline over format-blind coalescing_unit / coalescing_carrier /
    coalescing_payload. Renames duckdb_native_batch_coalescer.hpp → batch_coalescer.hpp.
  • Parquet ported onto the base: the producer emits one coalescing_carrier of per-row-group
    units (group_key = hive partition values); make_batch rebuilds a parquet_split_info,
    merging consecutive same-file units back into multi-row-group slices.
  • Multi-GPU: the device id is stamped consumer-side in emit_batch. prepare_for_query dispatches via
    dynamic_cast<coalescing_gpu_ingestible*>.

Only the cargo differs between the two formats; the coalescing and balancing machinery is shared:

duckdb-native parquet
producer emits 1 carrier per… row-group range file-batch
unit payload duckdb_native_payload parquet_payload (a row_group_slice)
make_batch builds duckdb_native_split_info parquet_split_info (+ same-file merge)
group_key empty (unpartitioned) hive partition values
stamp site emit_batch (base) emit_batch (base) — same

Diagram 1 — how the multi-GPU policy is installed on the ingestible

The policy is installed once, at query-setup time, onto the ingestible itself (the consumer),
and then consumed per batch, at run time in emit_batch.

INSTALL TIME — sirius_scan_manager::prepare_for_query()        [sirius_scan_manager.cpp:269-282]

  for each GPU scan op:
      provider = split_provider(op.get_ingestible())        // provider borrows the ingestible
                        │
      dynamic_cast<coalescing_gpu_ingestible*>(&op.get_ingestible()) ?
                        │
        ┌───────────────┴────────────────┐
     succeeds                          fails
   (native, parquet)            (non-coalescing format)
        │                                │
  ingestible.install_balancing     provider.set_balancing_strategy
    (round_robin, pipeline_id)       (round_robin, pipeline_id)
        │                                │
  ┌─────▼──────────────────────┐   policy lives on the PRODUCER
  │  coalescing_gpu_ingestible │   (stamps final batches directly
  │    _balancing_strategy  ◄──┤    in split_provider::apply_balancing)
  │    _pipeline_id            │
  └────────────────────────────┘
   policy now lives on the INGESTIBLE (consumer side); the provider's
   strategy stays NULL, so apply_balancing(carrier) is a guaranteed no-op
   → carriers are never stamped → the round-robin cursor is never double-advanced


RUN TIME — coalescing_gpu_ingestible::emit_batch()            [coalescing_gpu_ingestible.cpp:66-80]

  batch = make_batch(units)                          // the real decode batch (scan_operator_input)
  if (...):
      gpu = _balancing_strategy->get_next_gpu(_pipeline_id)   // round_robin: cursor++ % N gpus
      batch.set_preferred_device_id(gpu)             // ★ the only stamp site
  return batch
        │
        ▼  honored downstream, format-blind:
  task_creator (preferred_device_id = highest-priority tier) → task_scheduler (exact device match)

Both native and parquet take the left branch, so they are stamped identically. The provider
arm (right branch) is now used only by non-coalescing formats.


Diagram 2 — pull-based coalescing on the consumer

Producers run in parallel on the scan pool and push carriers into a blocking split_connector.
The single consumer pulls carriers on demand, packs their units into the coalescer, and pulls
cap-sized batches back out — so early batches start decoding while later carriers are still being
walked.

PRODUCERS (scan-pool worker threads — PUSH)        CONSUMER (scan operator — PULL)
═══════════════════════════════════════════        ═══════════════════════════════════════════════

 split_provider::run, per claimed unit of work:      sirius_gpu_scan_operator
     carrier = run_batch(file_batch | rg_range)        ::get_next_task_input_data()      [:128]
                │   (1 carrier of N coalescing_units)        │ calls
     apply_balancing(carrier)                                ▼
        └─ returns early: provider strategy        consume_next_input(connector)   [coalescing_gpu_ingestible.cpp:30]
           is NULL for coalescing formats            ┌─ loop ───────────────────────────────────────┐
                │                                     │  if coalescer.has_ready():                    │
     connector.push_split(carrier) ──push──┐         │      return emit_batch(coalescer.pop_ready()) │ ◄─ ★ stamp GPU,
                                            │         │                                               │    return batch
                                            ▼         │  split = connector.get_next_split() ──pull──► │ BLOCKS until a
                                  ┌───────────────────┐│      │                                       │ carrier arrives
                                  │  split_connector  ││      ├─ nullopt (closed & drained):          │ (or close)
                                  │  (blocking queue) ├┼──────┤     tail = coalescer.flush()          │
                                  │                   ││      │     return emit_batch(tail) if any    │
                                  │  producers close()││      │     else return nullptr  (done)       │
                                  │  when all done    ││      │                                       │
                                  └───────────────────┘│      └─ carrier:                             │
                                                        │          for unit in carrier.units:         │
                                                        │              coalescer.push(unit)           │
                                                        │          // pack by byte-cap + group_key;   │
                                                        │          // closes a batch when the next    │
                                                        │          // unit would breach a cap or      │
                                                        │          // cross a group_key boundary      │
                                                        └─ continue loop (pull again) ────────────────┘

Why pull-based: the consumer never blocks the producers — get_next_split() is the only wait
point, and it returns as soon as any carrier is ready. has_ready() is checked first each
iteration, so queued batches drain before the consumer waits again. The final partial batch is
served by flush() only after the connector is closed and drained, so a single-split scan
never drops its one batch. Carriers are intermediate transport — discarded right after their units
are unpacked — which is exactly why the device id is stamped on the coalesced batch in emit_batch,
not on the carrier.

🤖 Generated with Claude Code

…ible; port parquet + consumer-side multi-GPU

Extract the duckdb-native scan's batch coalescing into a reusable base so the
native and parquet scans share one path:

- New coalescing_gpu_ingestible base owns the sealed consume_next_input ->
  coalesce -> emit_batch pipeline over format-blind coalescing_unit / carrier /
  payload. Rename duckdb_native_batch_coalescer.hpp -> batch_coalescer.hpp.
- Add group_key partition affinity to the coalescer, and fix would_close to
  close a batch on group_key mismatch (it previously never did, making the
  affinity a no-op).
- Port parquet_gpu_ingestible onto the base: the producer emits one
  coalescing_carrier of per-row-group units (group_key = hive partition values),
  and make_batch rebuilds a parquet_split_info, merging consecutive same-file
  units back into multi-row-group slices.
- Multi-GPU: the device id is stamped consumer-side in emit_batch (the one site
  a real decode batch exists). prepare_for_query dispatches via
  dynamic_cast<coalescing_gpu_ingestible*>, so parquet gets round-robin GPU
  assignment for the first time and native's prior one-GPU default is fixed,
  with no producer-side double-stamp.

Tests: coalescer group_key cases, parquet same-file-merge and round-robin
device-id cases; full Catch2 suite (incl. TPC-H-parquet and hive-partition
integration tests) green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@kevkrist kevkrist force-pushed the scan-coalescing-gpu-ingestible branch from 5b682f9 to 1f39589 Compare June 15, 2026 17:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants