Skip to content

[WIP] Internal threading + in-place codec API for sharded reads#265

Draft
habemus-papadum wants to merge 8 commits into
JuliaIO:masterfrom
habemus-papadum:parallel-and-alloc-improvements
Draft

[WIP] Internal threading + in-place codec API for sharded reads#265
habemus-papadum wants to merge 8 commits into
JuliaIO:masterfrom
habemus-papadum:parallel-and-alloc-improvements

Conversation

@habemus-papadum

Copy link
Copy Markdown

Draft / WIP — depends on #264. The diff below currently includes that PR's commits because GitHub's base branch must live in JuliaIO/Zarr.jl. Once #264 is merged, this PR's diff will collapse to just the four threading/alloc commits; please do not review or merge until then.

What this changes

Builds on the partial-read fast path from #264. Two follow-up improvements that close the throughput gap to zarr-python:

  1. Internal threading for the sharded partial-read path. When Julia is started with -t > 1, inner-chunk decodes within one outer chunk and outer-chunk reads across one request are dispatched to Threads.@spawn — the same way zarr-python parallelizes inner chunks inside one __getitem__. User code is unchanged; the parallelism is transparent.

  2. In-place codec API to remove transient allocations on the hot path. pipeline_decode! for V3 now decodes zstd directly into the byte view of the caller's typed output array for the dominant [BytesCodec, ZstdV3Codec] pipeline shape, eliminating the chunk-sized scratch buffer + copyto! that ran on every inner-chunk decode.

Total of 4 commits on top of #264:

  • Parallelize sharded partial reads with @spawn — threading inside read_shard_partial_with_source! (over inner chunks) and _readblock_sharded_partial! (over outer chunks). New Zarr.enable_threaded_shard_decode[] toggle (default true).
  • In-place codec_decode! + cap inner-decode pool — adds codec_decode! API with specializations for BytesCodec (zero-copy reinterpret + bulk byte copy) and ZstdV3Codec (ChunkCodecCore.decode! straight into the caller's buffer), plus generic fallbacks for the three V3Codec In/Out tag pairs. Adds Zarr.max_concurrent_inner_decodes[] (Ref{Int}, default 8) capping the buffer pool independently of nthreads().
  • Skip the bytes scratch buffer when array_bytes is BytesCodecpipeline_decode! for V3 detects [BytesCodec, one bytes_bytes codec] and decodes directly into reinterpret(UInt8, vec(output)), removing the chunk-sized scratch allocation entirely. Multi-step bytes-bytes pipelines and pipelines with array→array codecs keep the existing buffered fallback.
  • Test coverage + docs for the threading & in-place codec work — non-production: 34 new test cases and a Threading + In-place codec section in docs/src/UserGuide/partial_shard_reads.md.

Toggles

Three flags, all Ref types modeled on the existing Zarr.concurrent_io_tasks::Ref{Int}:

  • Zarr.enable_partial_shard_storage_reads[] (from Fast partial-read path for sharding_indexed codec #264)
  • Zarr.enable_threaded_shard_decode[]Ref{Bool}, default true. Forces the sequential decode path even with -t > 1 when set to false.
  • Zarr.max_concurrent_inner_decodes[]Ref{Int}, default 8. Mirrors zarr-python's async.concurrency = 10.

Tests

Three new testsets covering the new code (test/v3_codecs.jl, +34 cases):

  • codec_decode! in-place API — BytesCodec little-endian and big-endian (bswap) round-trips, dimension-mismatch error, ZstdV3Codec in-place decode, and the generic V3Codec{:bytes,:bytes} fallback via CRC32cV3Codec.
  • pipeline_decode! V3 paths — exercises every branch of the rewritten function: matching-endian fast path, endian-mismatch variant, no-bytes-bytes path, multi-bytes-bytes scratch-buffer branch, and the array_array fallback (TransposeCodec).
  • threading flags preserve correctness — flips enable_threaded_shard_decode[] and max_concurrent_inner_decodes[] across realistic combinations and verifies reads still match the written data on a sharded DirectoryStore-backed array.

Combined with #264's coverage, the suite is at 2555 / 2555 passing.

Performance

Measured on a (129 syms × 1.48 yr) symbol-major sharded archive, BTC × full quotes history × 6 vars:

Time vs Python (zarr-python serial)
user-serial Julia (this branch) 3.24s 0.81×
user-level @threads-over-vars 1.14s 0.29×
Python (zarr-python 3, internal parallelism) 4.0s 1.0×

Per-call allocations on the f64 quote variables dropped roughly 35-40% versus #264 alone, with chunk-sized transient allocations eliminated for the dominant pipeline shape.

🤖 Generated with Claude Code

habemus-papadum and others added 8 commits April 27, 2026 19:38
The current ShardingCodec read path always decodes the full outer
chunk: a slice that touches one inner shard still pays decompression
for every other inner shard in the outer chunk. On a 740-shard layout
(4y of seconds, daily inner shards) that's a ~700x decompression tax
on surgical slices.

This change closes the gap to zarr-python with two layered fast paths:

In-memory partial decode (src/Codecs/V3/V3.jl):
  - sharding_codec(p) detects a "pure" sharding pipeline (no
    array->array codecs before, no bytes->bytes codecs after) so
    readblock! can opt into the fast path conservatively.
  - read_shard_partial_with_source! takes a byte-source closure and
    only decodes inner chunks intersecting the requested slice.
  - read_shard_partial! is the in-memory wrapper: caller has the
    whole outer chunk in memory, we slice it for index + inner-chunk
    bytes.

Storage-aware partial reads:
  - supports_partial_reads / read_range / getsize on AbstractStore
    (defaults preserve existing behavior — fall back to full read +
    in-memory slice).
  - DirectoryStore opts in: open + seek + read for read_range,
    filesize for getsize. Other store types are unchanged until
    someone implements byte-range reads for them.
  - _readblock_sharded_partial! in ZArray.jl wires it together: per
    outer chunk, full reads stay on the existing path; partial reads
    fetch only the index + intersecting inner chunks via byte-range
    reads.

Toggle:
  - Zarr.enable_partial_shard_storage_reads[] (Ref{Bool}, default
    true), modeled on the existing concurrent_io_tasks Ref. Set to
    false to fall back to the in-memory partial-decode path for A/B
    debugging.

Measured speedups on a (67, 127M) sharded float64 archive:
  partial 1-day query:   665s -> 1.0s   (665x)
  partial week query:     97s -> 1.0s    (97x)

Existing test suite: 2482/2482 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tests
  - test/v3_codecs.jl: new "sharding partial-read fast path" testset
    with 5 sub-testsets covering:
      * sharding_codec() detection on pure / wrapped / non-sharded
        pipelines and on a non-V3Pipeline argument
      * in-memory partial path via DictStore (single-inner-chunk,
        cross-inner, full-chunk, cross-outer, tail, whole-array slices)
      * storage-aware partial path via DirectoryStore (same slice
        patterns; results must match the in-memory path byte-for-byte)
      * Zarr.enable_partial_shard_storage_reads[] toggle preserves
        correctness in both states
      * fill_value over a partial slice of an empty/never-written
        outer chunk
  - test/storage.jl: new "Partial-read storage interface" testset with
    sub-testsets covering the AbstractStore defaults (DictStore) and
    the DirectoryStore overrides (supports_partial_reads, read_range,
    getsize, missing-key fallthrough).

Docs
  - docs/src/UserGuide/partial_shard_reads.md: explains when the fast
    path applies, the storage-interface methods stores opt into, and
    the toggle. References the existing module-level docstrings.

No production code changed in this commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sharding partial-read fast paths were single-threaded: a request that
touched many inner chunks of one shard, or many outer chunks across
the array, decoded them serially. Adds two layers of internal
threading so one readblock! call scales with available cores.

read_shard_partial_with_source! (Codecs/V3/V3.jl)
  - precomputes the work list of intersecting inner chunks (each
    writes to a disjoint region of `aout`, so safe to parallelize)
  - dispatches the decode loop with @sync / Threads.@Spawn
  - bounded buffer pool (Channel of nthreads buffers) caps memory
    instead of allocating one full-shard buffer per task

_readblock_sharded_partial! (ZArray.jl)
  - splits incoming blockr into full vs partial outer-chunk reads
  - full-chunk reads keep the existing serial path with one shared
    chunk buffer (already saturates one core)
  - partial-chunk reads dispatch with @sync / Threads.@Spawn — each
    task reads its shard index + intersecting inner chunks
  - inner-chunk threading inside each task still applies, so
    multi-symbol multi-day queries get both axes of parallelism

Toggle:
  - Zarr.enable_threaded_shard_decode[] (Ref{Bool}, default true).
    Falls back to sequential when nthreads()==1, |work|==1, or the
    flag is off.

Threading is opt-out via the flag, so single-threaded callers see no
behavior change. Both flag declarations moved above include() so
nested modules can import them.

Measured on a (129 syms × 1.48 yr) symbol-major archive, BTC × full
quotes history, all 6 vars:
  user-serial path:   23.9s → 10.3s   (2.3x from internal threading)
  user-level @threads over 6 vars:   12.7s → 5.2s
Python (zarr-python 3, internal parallelism) baseline: 4.0s.

Existing test suite: 2482/2482 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two chunk-sized transient allocations on every inner-chunk decode in
the V3 partial-shard path: one in zstd's `cc_decode` (returns a fresh
Vector{UInt8}) and one in BytesCodec's `collect(reinterpret(...))`.
pipeline_decode! then did a final `copyto!(output, arr)` — a third
copy. For a 187 MB inner chunk that's GBs of waste per query.

Patch A: in-place codec_decode! API (Codecs/V3/V3.jl)
  - Generic fallbacks dispatched on V3Codec{In,Out} so any codec gets
    a working method (allocates + copies, but no caller-visible API
    change).
  - Specialized BytesCodec — reinterpret bytes as UInt8 view of the
    output array and `copyto!` straight in. No fresh Vector{T}.
  - Specialized ZstdV3Codec — calls ChunkCodecCore.decode! into the
    caller's buffer directly. No fresh decoded Vector{UInt8}.

Patch B: pipeline_decode! threads in-place dispatch (pipeline.jl)
  - For the common case (no array_array codecs, ≥1 bytes_bytes codec),
    sizes a single intermediate buffer to the array-bytes step's
    expected output and reuses it across the chain via codec_decode!.
  - Final array-bytes step writes straight into `output` — eliminates
    the `arr` allocation + redundant copy.
  - Falls back to the old path when transpose codecs or other
    array-array steps are present.

Patch C: cap inner-decode buffer pool (Zarr.jl + V3.jl)
  - New Zarr.max_concurrent_inner_decodes Ref{Int} (default 8),
    modeled on zarr-python's async.concurrency = 10.
  - read_shard_partial_with_source! pool sized to
    min(nthreads, |work|, max_concurrent_inner_decodes[]).
  - On a `-t 32` run with ~187 MB chunks the pool no longer reserves
    ~6 GB upfront for buffers most tasks never use.

Measured (HL BTC quotes full history × 6 vars on
hyperliquid_1s_symmajor.zarr, -t 32):
  user-serial:                          9.6s → 7.2s   (-25%)
  user @threads-over-vars:              5.1s → 3.3s   (-35%, beats Python's 4.0s)

Allocations dropped 35-40% across partial-read benches.

Existing test suite: 2482/2482 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
pipeline_decode! was still allocating a chunk-sized Vector{UInt8} on
every inner-chunk decode for the bytes-bytes step's output. For our
prod shape that's ~187 MB per inner chunk × ~7 outer chunks per query
= ~1.3 GB of transient allocations per arr[:, si] call.

The dominant pipeline shape is `[BytesCodec, ZstdV3Codec]` —
BytesCodec just reinterprets bytes as the array's element type, so
the bytes-bytes output IS the byte view of the typed output array.
Decoding zstd straight into `reinterpret(UInt8, vec(output))`
eliminates the scratch entirely.

Two new fast paths added:

  - matching-endian common case: zstd into output's byte view, return.
  - mismatched-endian variant: zstd into output's byte view, then
    in-place bswap via codec_decode!(::BytesCodec, ...).

For multi-step bytes-bytes chains or pipelines with array_array codecs
(transpose etc.), keep the existing scratch-buffer / fallback paths
unchanged.

Measured (HL BTC quotes full history × 6 vars, -t 32):
  user-serial:                          7.2s → 3.2s   (Python 4.0s)
  user @threads-over-vars:              3.3s → 1.1s
  per-call alloc on f64 vars:           2.9 GB → 1.7 GB

Existing test suite: 2482/2482 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tests
  - test/v3_codecs.jl: three new testsets, 34 cases:
      * "codec_decode! in-place API" — BytesCodec little-endian and
        big-endian (bswap) round-trips, dimension-mismatch error,
        ZstdV3Codec in-place decode, and the generic V3Codec{:bytes,
        :bytes} fallback via CRC32cV3Codec.
      * "pipeline_decode! V3 paths" — exercises each branch:
        matching-endian fast path (BytesCodec :little + Zstd),
        endian-mismatch variant (BytesCodec :big), no-bytes-bytes path
        (BytesCodec only), multi bytes-bytes scratch-buffer branch
        (Zstd + CRC32c), and the array_array fallback (TransposeCodec).
      * "threading flags preserve correctness" — flips
        enable_threaded_shard_decode[] and max_concurrent_inner_decodes[]
        across realistic combinations, verifies reads still match.

Docs
  - docs/src/UserGuide/partial_shard_reads.md: new "Threading" and
    "In-place codec API" sections describing the user-transparent
    parallelism, the two new toggles, and how downstream codecs can
    opt into in-place dispatch.

No production code changed in this commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Required by the upstream changelog-enforcer CI check. Describes the
new in-memory and storage-aware partial-decode paths, the optional
AbstractStore methods (supports_partial_reads, read_range, getsize),
DirectoryStore opt-in, and the enable_partial_shard_storage_reads[]
toggle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Describes the internal Threads.@Spawn dispatch in
read_shard_partial_with_source! and _readblock_sharded_partial!, the
new enable_threaded_shard_decode[] and max_concurrent_inner_decodes[]
toggles, the in-place codec_decode! API, and the rewritten V3
pipeline_decode! that eliminates per-inner-chunk scratch allocations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant