Skip to content

Commit 46ffd0b

Browse files
authored
Merge pull request #70 from ArcInstitute/fix/multi-accession-prefetch-error-propagation
fix: propagate failures in multi-accession prefetch
2 parents 11155d6 + c9b14f0 commit 46ffd0b

4 files changed

Lines changed: 180 additions & 14 deletions

File tree

.gitignore

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
1-
/target
1+
# Generated by Cargo / build output
2+
/target/
3+
**/*.rs.bk
4+
5+
# Cargo lock file (kept ignored per existing project convention)
26
Cargo.lock
37

8+
# MSVC debug info
9+
*.pdb
10+
11+
# Backup and temporary files
12+
*.bk
13+
*~
14+
*.swp
15+
16+
# Environment / editor
17+
.env
18+
.DS_Store
19+
20+
# Project-specific
421
vendor/ncbi-vdb/comp/**
522
vendor/ncbi-vdb/reconfigure
623

7-
tests/fixtures/data/
24+
tests/fixtures/data/
25+
tasks/

CLAUDE.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Overview
6+
7+
`xsra` is a Rust CLI that extracts sequences from NCBI SRA archives, intended as a faster,
8+
storage-efficient replacement for `fastq-dump`/`fasterq-dump`. It links the `ncbi_vdb` C
9+
library through the [`ncbi-vdb-sys`](https://github.com/arcinstitute/ncbi-vdb-sys) crate and
10+
outputs FASTA, FASTQ, or [BINSEQ](https://github.com/arcinstitute/binseq) (`.bq`/`.vbq`/`.cbq`).
11+
12+
## Commands
13+
14+
```bash
15+
cargo build # debug build
16+
cargo build --release # release build (use for any perf testing)
17+
cargo test # run all integration tests
18+
cargo test test_simple_fastq_dump_cli # run a single test by name
19+
cargo clippy --all-targets # lint
20+
cargo fmt # format
21+
```
22+
23+
Runtime logging is controlled by the `XSRA_LOG` env var (e.g. `XSRA_LOG=debug`), parsed by
24+
`env_logger`; default level is `info`.
25+
26+
### Tests require network access
27+
28+
Integration tests in `tests/` use `TestFixtures` (`tests/fixtures/setup.rs`), which **downloads
29+
real SRA files from NCBI on first run** (`SRR5150787` ~1.7MB variable-length, `SRR1574235` ~17MB
30+
fixed-length) into `tests/fixtures/data/`. Subsequent runs reuse the cached files. Corrupt and
31+
invalid fixtures are synthesized locally. A first `cargo test` without internet will fail.
32+
33+
## Domain model
34+
35+
SRA terminology drives the whole codebase:
36+
- A **spot** (a.k.a. record) is one sequencing event, addressed by a 1-based index.
37+
- A spot contains one or more **segments** (reads), each with a zero-based segment id (`sid()`).
38+
Segments may be **technical** (e.g. barcodes/adapters) or biological.
39+
- `-I/--include` selects segments by sid; `-t` skips technical segments; `-L` filters by length;
40+
`-l/--limit` caps the number of *spots* (not reads).
41+
42+
## Architecture
43+
44+
The four subcommands are dispatched in `src/main.rs` from the clap tree in `src/cli/`. Only
45+
`prefetch` is fully async (tokio); the other commands are synchronous but may spin up a short-lived
46+
tokio runtime solely to resolve a remote accession URL.
47+
48+
- **`cli/`** — clap arg structs, composed via `#[clap(flatten)]`. Reusable groups: `InputOptions`
49+
/ `AccessionOptions` (accession + provider + retry), `FilterOptions`, `RuntimeOptions` (threads).
50+
`Provider` is `Https | Gcp | Aws` (gcp requires `-G <project>`). Each command has its own output
51+
struct (`DumpOutput`, `RecodeOutput`, etc.).
52+
53+
- **`dump/`** — multi-threaded extraction (`dump/mod.rs::launch_threads`). The spot range is split
54+
evenly across N threads; each thread opens its *own* `SraReader` over its sub-range, fills
55+
thread-local buffers, and flushes to a single shared writer (`Arc<Mutex<BoxedSegmentWriter>>`)
56+
every `RECORD_CAPACITY` spots. **Spot output order is not deterministic** — it depends on thread
57+
completion order; paired segments from one spot stay together. Per-segment statistics are summed
58+
across threads at join. Empty per-segment files are deleted afterward unless `--keep-empty`.
59+
60+
- **`recode/`** — writes BINSEQ directly from SRA without an intermediate FASTQ. Requires exactly
61+
1 or 2 included segments (`include[0]` = primary, `include[1]` = extended/paired); validated in
62+
`RecodeArgs::validate`. Flavor `c`=CBQ (default), `b`=BQ, `v`=VBQ.
63+
64+
- **`prefetch/`** — async download of accessions to disk. `identify_url` (also used by dump/recode
65+
to stream remote accessions) resolves an accession to a concrete URL via the NCBI SRA Data
66+
Locator API, honoring provider and lite-vs-full quality preference, with retry/backoff.
67+
68+
- **`describe/`** — reads a sample of spots and reports per-segment statistics.
69+
70+
- **`output.rs`** (crate root) — `Compression` (uncompressed/gzip/bgzip/zstd; gzip & bgzip via
71+
parallel `gzp`, zstd via `zstd`), `OutputFileType` (regular file / named pipe / stdout), path
72+
construction, and FIFO creation. `dump/output.rs` builds the actual segment writers on top of it.
73+
74+
### lib vs bin
75+
76+
`src/lib.rs` re-exports all modules so integration tests can call internals directly (e.g.
77+
`xsra::dump::dump`, `xsra::prefetch::prefetch`). The constants `BUFFER_SIZE` and `RECORD_CAPACITY`
78+
are defined in **both** `main.rs` and `lib.rs` — keep them in sync if changed.
79+
80+
## Conventions
81+
82+
- Errors use `anyhow::Result` throughout; `bail!` for user-facing validation failures.
83+
- Structured logging via the `log` crate with key-value fields (`info!(url = ...; "msg")`).
84+
- The `ncbi_vdb` static library is built from bundled source at install time, so builds are
85+
system-specific and binaries are not portable.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "xsra"
3-
version = "0.2.28"
3+
version = "0.2.29"
44
edition = "2021"
55
license = "MIT"
66
authors = ["Noam Teyssier <noam.teyssier@arcinstitute.org>"]

src/prefetch/mod.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::cli::{AccessionOptions, MultiInputOptions, Provider};
2-
use anyhow::{bail, Result};
2+
use anyhow::{anyhow, bail, Result};
33
use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
44
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
55
use log::{debug, error, info, trace, warn};
@@ -242,12 +242,21 @@ pub async fn identify_urls(
242242
// Wait for all tasks to complete
243243
let results = join_all(tasks).await;
244244

245-
// Process results, handling any JoinError from the spawned tasks
246-
let mut processed_results = Vec::new();
247-
for result in results {
245+
// Process results, handling any JoinError from the spawned tasks.
246+
// `join_all` preserves task order, which matches the input `accessions`
247+
// order, so a JoinError can be attributed to its accession rather than
248+
// being silently dropped.
249+
let mut processed_results = Vec::with_capacity(accessions.len());
250+
for (accession, result) in accessions.iter().zip(results) {
248251
match result {
249252
Ok(res) => processed_results.push(res),
250-
Err(e) => error!(error:% = e; "Task join error"),
253+
Err(e) => {
254+
error!(accession = accession.as_str(), error:% = e; "Task join error");
255+
processed_results.push((
256+
accession.clone(),
257+
Err(anyhow!("task failed to complete: {e}")),
258+
));
259+
}
251260
}
252261
}
253262

@@ -372,6 +381,10 @@ pub async fn prefetch(input: &MultiInputOptions, output_dir: Option<&str>) -> Re
372381
// For GCP downloads, we'll use a separate Vec since gsutil has its own concurrency management
373382
let mut gcp_downloads = Vec::new();
374383

384+
// Accumulate per-accession failures so the command can fail loudly at the
385+
// end while still attempting every accession (best-effort behavior).
386+
let mut failures: Vec<String> = Vec::new();
387+
375388
for (accession, url_result) in url_results {
376389
match url_result {
377390
Ok(url) => {
@@ -385,7 +398,12 @@ pub async fn prefetch(input: &MultiInputOptions, output_dir: Option<&str>) -> Re
385398

386399
match input.options.provider {
387400
Provider::Https => {
388-
https_downloads.push(download_url(url, path, pb));
401+
// Carry the accession into the future so download
402+
// failures can be attributed back to it.
403+
https_downloads.push(async move {
404+
let result = download_url(url, path, pb).await;
405+
(accession, result)
406+
});
389407
}
390408
Provider::Gcp => {
391409
let project_id = match &input.options.gcp_project_id {
@@ -395,43 +413,61 @@ pub async fn prefetch(input: &MultiInputOptions, output_dir: Option<&str>) -> Re
395413
accession = accession.as_str();
396414
"GCP project ID is required for GCP downloads"
397415
);
416+
failures.push(format!(
417+
"{accession}: GCP project ID is required for GCP downloads"
418+
));
398419
continue;
399420
}
400421
};
401422
// We'll collect GCP downloads and process them separately
402-
gcp_downloads.push((url, path, project_id, pb));
423+
gcp_downloads.push((accession, url, path, project_id, pb));
403424
}
404425
_ => {
405426
error!(
406427
accession = accession.as_str(),
407428
provider:? = input.options.provider;
408429
"Unsupported provider"
409430
);
431+
failures.push(format!(
432+
"{accession}: unsupported provider: {:?}",
433+
input.options.provider
434+
));
410435
continue;
411436
}
412437
}
413438
}
414439
Err(e) => {
415440
error!(accession = accession.as_str(), error:% = e; "Failed to identify URL");
441+
failures.push(format!("{accession}: URL resolution failed: {e}"));
416442
}
417443
}
418444
}
419445

420446
// Process HTTPS downloads concurrently
421-
while let Some(result) = https_downloads.next().await {
447+
while let Some((accession, result)) = https_downloads.next().await {
422448
if let Err(e) = result {
423-
error!(error:% = e; "HTTPS download failed");
449+
error!(accession = accession.as_str(), error:% = e; "HTTPS download failed");
450+
failures.push(format!("{accession}: download failed: {e}"));
424451
}
425452
}
426453

427454
// Process GCP downloads - since gsutil has its own concurrency management,
428455
// we'll run them sequentially to avoid overwhelming the terminal output
429-
for (url, path, project_id, pb) in gcp_downloads {
456+
for (accession, url, path, project_id, pb) in gcp_downloads {
430457
if let Err(e) = download_url_gcp(url, path, project_id, pb).await {
431-
error!(error:% = e; "GCP download failed");
458+
error!(accession = accession.as_str(), error:% = e; "GCP download failed");
459+
failures.push(format!("{accession}: download failed: {e}"));
432460
}
433461
}
434462

463+
if !failures.is_empty() {
464+
bail!(
465+
"prefetch failed for {} accession(s):\n{}",
466+
failures.len(),
467+
failures.join("\n")
468+
);
469+
}
470+
435471
Ok(())
436472
}
437473

@@ -844,4 +880,31 @@ mod tests {
844880
let err_msg = result.unwrap_err().to_string();
845881
assert!(err_msg.contains("GCP project ID is required for GCP downloads"));
846882
}
883+
884+
#[tokio::test]
885+
async fn prefetch_multi_fails_when_any_url_resolution_fails() {
886+
// Both accessions contain "INVALID", so the test `query_entrez` returns
887+
// "no urls found" for each and resolution fails with no network access.
888+
let input = MultiInputOptions {
889+
accessions: vec!["INVALID_A".to_string(), "INVALID_B".to_string()],
890+
options: create_test_accession_options(),
891+
};
892+
893+
let result = prefetch(&input, None).await;
894+
assert!(
895+
result.is_err(),
896+
"multi-accession prefetch must fail when an accession cannot be resolved"
897+
);
898+
899+
// The summary error must identify every failed accession, not just the first.
900+
let err_msg = result.unwrap_err().to_string();
901+
assert!(
902+
err_msg.contains("INVALID_A"),
903+
"missing INVALID_A: {err_msg}"
904+
);
905+
assert!(
906+
err_msg.contains("INVALID_B"),
907+
"missing INVALID_B: {err_msg}"
908+
);
909+
}
847910
}

0 commit comments

Comments
 (0)