Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
/target
# Generated by Cargo / build output
/target/
**/*.rs.bk

# Cargo lock file (kept ignored per existing project convention)
Cargo.lock

# MSVC debug info
*.pdb

# Backup and temporary files
*.bk
*~
*.swp

# Environment / editor
.env
.DS_Store

# Project-specific
vendor/ncbi-vdb/comp/**
vendor/ncbi-vdb/reconfigure

tests/fixtures/data/
tests/fixtures/data/
tasks/
85 changes: 85 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Overview

`xsra` is a Rust CLI that extracts sequences from NCBI SRA archives, intended as a faster,
storage-efficient replacement for `fastq-dump`/`fasterq-dump`. It links the `ncbi_vdb` C
library through the [`ncbi-vdb-sys`](https://github.com/arcinstitute/ncbi-vdb-sys) crate and
outputs FASTA, FASTQ, or [BINSEQ](https://github.com/arcinstitute/binseq) (`.bq`/`.vbq`/`.cbq`).

## Commands

```bash
cargo build # debug build
cargo build --release # release build (use for any perf testing)
cargo test # run all integration tests
cargo test test_simple_fastq_dump_cli # run a single test by name
cargo clippy --all-targets # lint
cargo fmt # format
```

Runtime logging is controlled by the `XSRA_LOG` env var (e.g. `XSRA_LOG=debug`), parsed by
`env_logger`; default level is `info`.

### Tests require network access

Integration tests in `tests/` use `TestFixtures` (`tests/fixtures/setup.rs`), which **downloads
real SRA files from NCBI on first run** (`SRR5150787` ~1.7MB variable-length, `SRR1574235` ~17MB
fixed-length) into `tests/fixtures/data/`. Subsequent runs reuse the cached files. Corrupt and
invalid fixtures are synthesized locally. A first `cargo test` without internet will fail.

## Domain model

SRA terminology drives the whole codebase:
- A **spot** (a.k.a. record) is one sequencing event, addressed by a 1-based index.
- A spot contains one or more **segments** (reads), each with a zero-based segment id (`sid()`).
Segments may be **technical** (e.g. barcodes/adapters) or biological.
- `-I/--include` selects segments by sid; `-t` skips technical segments; `-L` filters by length;
`-l/--limit` caps the number of *spots* (not reads).

## Architecture

The four subcommands are dispatched in `src/main.rs` from the clap tree in `src/cli/`. Only
`prefetch` is fully async (tokio); the other commands are synchronous but may spin up a short-lived
tokio runtime solely to resolve a remote accession URL.

- **`cli/`** — clap arg structs, composed via `#[clap(flatten)]`. Reusable groups: `InputOptions`
/ `AccessionOptions` (accession + provider + retry), `FilterOptions`, `RuntimeOptions` (threads).
`Provider` is `Https | Gcp | Aws` (gcp requires `-G <project>`). Each command has its own output
struct (`DumpOutput`, `RecodeOutput`, etc.).

- **`dump/`** — multi-threaded extraction (`dump/mod.rs::launch_threads`). The spot range is split
evenly across N threads; each thread opens its *own* `SraReader` over its sub-range, fills
thread-local buffers, and flushes to a single shared writer (`Arc<Mutex<BoxedSegmentWriter>>`)
every `RECORD_CAPACITY` spots. **Spot output order is not deterministic** — it depends on thread
completion order; paired segments from one spot stay together. Per-segment statistics are summed
across threads at join. Empty per-segment files are deleted afterward unless `--keep-empty`.

- **`recode/`** — writes BINSEQ directly from SRA without an intermediate FASTQ. Requires exactly
1 or 2 included segments (`include[0]` = primary, `include[1]` = extended/paired); validated in
`RecodeArgs::validate`. Flavor `c`=CBQ (default), `b`=BQ, `v`=VBQ.

- **`prefetch/`** — async download of accessions to disk. `identify_url` (also used by dump/recode
to stream remote accessions) resolves an accession to a concrete URL via the NCBI SRA Data
Locator API, honoring provider and lite-vs-full quality preference, with retry/backoff.

- **`describe/`** — reads a sample of spots and reports per-segment statistics.

- **`output.rs`** (crate root) — `Compression` (uncompressed/gzip/bgzip/zstd; gzip & bgzip via
parallel `gzp`, zstd via `zstd`), `OutputFileType` (regular file / named pipe / stdout), path
construction, and FIFO creation. `dump/output.rs` builds the actual segment writers on top of it.

### lib vs bin

`src/lib.rs` re-exports all modules so integration tests can call internals directly (e.g.
`xsra::dump::dump`, `xsra::prefetch::prefetch`). The constants `BUFFER_SIZE` and `RECORD_CAPACITY`
are defined in **both** `main.rs` and `lib.rs` — keep them in sync if changed.

## Conventions

- Errors use `anyhow::Result` throughout; `bail!` for user-facing validation failures.
- Structured logging via the `log` crate with key-value fields (`info!(url = ...; "msg")`).
- The `ncbi_vdb` static library is built from bundled source at install time, so builds are
system-specific and binaries are not portable.
75 changes: 65 additions & 10 deletions src/prefetch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::cli::{AccessionOptions, MultiInputOptions, Provider};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use futures::{future::join_all, stream::FuturesUnordered, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use log::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -242,12 +242,19 @@ pub async fn identify_urls(
// Wait for all tasks to complete
let results = join_all(tasks).await;

// Process results, handling any JoinError from the spawned tasks
// Process results, handling any JoinError from the spawned tasks.
// `join_all` preserves task order, which matches the input `accessions`
// order, so a JoinError can be attributed to its accession rather than
// being silently dropped.
let mut processed_results = Vec::new();
Comment thread
nick-youngblut marked this conversation as resolved.
Outdated
for result in results {
for (accession, result) in accessions.iter().zip(results) {
match result {
Ok(res) => processed_results.push(res),
Err(e) => error!(error:% = e; "Task join error"),
Err(e) => {
error!(accession = accession.as_str(), error:% = e; "Task join error");
processed_results
.push((accession.clone(), Err(anyhow!("task failed to complete: {e}"))));
}
}
}

Expand Down Expand Up @@ -372,6 +379,10 @@ pub async fn prefetch(input: &MultiInputOptions, output_dir: Option<&str>) -> Re
// For GCP downloads, we'll use a separate Vec since gsutil has its own concurrency management
let mut gcp_downloads = Vec::new();

// Accumulate per-accession failures so the command can fail loudly at the
// end while still attempting every accession (best-effort behavior).
let mut failures: Vec<String> = Vec::new();

for (accession, url_result) in url_results {
match url_result {
Ok(url) => {
Expand All @@ -385,7 +396,12 @@ pub async fn prefetch(input: &MultiInputOptions, output_dir: Option<&str>) -> Re

match input.options.provider {
Provider::Https => {
https_downloads.push(download_url(url, path, pb));
// Carry the accession into the future so download
// failures can be attributed back to it.
https_downloads.push(async move {
let result = download_url(url, path, pb).await;
(accession, result)
});
}
Provider::Gcp => {
let project_id = match &input.options.gcp_project_id {
Expand All @@ -395,43 +411,61 @@ pub async fn prefetch(input: &MultiInputOptions, output_dir: Option<&str>) -> Re
accession = accession.as_str();
"GCP project ID is required for GCP downloads"
);
failures.push(format!(
"{accession}: GCP project ID is required for GCP downloads"
));
continue;
}
};
// We'll collect GCP downloads and process them separately
gcp_downloads.push((url, path, project_id, pb));
gcp_downloads.push((accession, url, path, project_id, pb));
}
_ => {
error!(
accession = accession.as_str(),
provider:? = input.options.provider;
"Unsupported provider"
);
failures.push(format!(
"{accession}: unsupported provider: {:?}",
input.options.provider
));
continue;
}
}
}
Err(e) => {
error!(accession = accession.as_str(), error:% = e; "Failed to identify URL");
failures.push(format!("{accession}: URL resolution failed: {e}"));
}
}
}

// Process HTTPS downloads concurrently
while let Some(result) = https_downloads.next().await {
while let Some((accession, result)) = https_downloads.next().await {
if let Err(e) = result {
error!(error:% = e; "HTTPS download failed");
error!(accession = accession.as_str(), error:% = e; "HTTPS download failed");
failures.push(format!("{accession}: download failed: {e}"));
}
}

// Process GCP downloads - since gsutil has its own concurrency management,
// we'll run them sequentially to avoid overwhelming the terminal output
for (url, path, project_id, pb) in gcp_downloads {
for (accession, url, path, project_id, pb) in gcp_downloads {
if let Err(e) = download_url_gcp(url, path, project_id, pb).await {
error!(error:% = e; "GCP download failed");
error!(accession = accession.as_str(), error:% = e; "GCP download failed");
failures.push(format!("{accession}: download failed: {e}"));
}
}

if !failures.is_empty() {
bail!(
"prefetch failed for {} accession(s):\n{}",
failures.len(),
failures.join("\n")
);
}

Ok(())
}

Expand Down Expand Up @@ -844,4 +878,25 @@ mod tests {
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("GCP project ID is required for GCP downloads"));
}

#[tokio::test]
async fn prefetch_multi_fails_when_any_url_resolution_fails() {
// Both accessions contain "INVALID", so the test `query_entrez` returns
// "no urls found" for each and resolution fails with no network access.
let input = MultiInputOptions {
accessions: vec!["INVALID_A".to_string(), "INVALID_B".to_string()],
options: create_test_accession_options(),
};

let result = prefetch(&input, None).await;
assert!(
result.is_err(),
"multi-accession prefetch must fail when an accession cannot be resolved"
);

// The summary error must identify every failed accession, not just the first.
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("INVALID_A"), "missing INVALID_A: {err_msg}");
assert!(err_msg.contains("INVALID_B"), "missing INVALID_B: {err_msg}");
}
}
Loading