Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 84 additions & 40 deletions src/dump/output.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::Write;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -93,69 +92,79 @@ pub fn build_segment_writer(
}
}

/// Shared state between a [`ThreadWriter`] and its worker thread.
///
/// Both the pending bytes and the shutdown flag live under a single mutex so the
/// worker can use one condition predicate. This avoids the data-loss race that
/// existed when shutdown was signalled on a separate channel from the buffer: the
/// worker exits only once the buffer is empty *and* no more data will arrive.
struct WriterState {
/// Owned reusable write buffer.
buffer: Vec<u8>,
/// Set to `true` once the owning [`ThreadWriter`] is dropped, signalling that
/// no further data will be ingested.
closed: bool,
}

/// A thead-local writer that owns a subprocess handling the actual writing
struct ThreadWriter {
/// Owned reusable write buffer with a conditional variable marking when it's been written to
buffer_pair: Arc<(Mutex<Vec<u8>>, Condvar)>,
/// The signal to end the subprocess
shutdown_sender: mpsc::Sender<()>,
/// Shared buffer + shutdown flag with a condition variable to coordinate the worker
state_pair: Arc<(Mutex<WriterState>, Condvar)>,
/// Handle to the owned subprocess
join_handle: Option<thread::JoinHandle<Result<()>>>,
}

impl ThreadWriter {
fn new(mut handle: BoxedWriter) -> Self {
let buffer_pair = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
let buffer_pair_clone = Arc::clone(&buffer_pair);

let (shutdown_sender, shutdown_receiver) = mpsc::channel();
let state_pair = Arc::new((
Mutex::new(WriterState {
buffer: Vec::new(),
closed: false,
}),
Condvar::new(),
));
let state_pair_clone = Arc::clone(&state_pair);

// Start the worker thread
let join_handle = thread::spawn(move || -> Result<()> {
let (buffer, cvar) = &*buffer_pair_clone;
let (state, cvar) = &*state_pair_clone;

loop {
// Wait for data or shutdown signal
let mut guard = buffer.lock();

while guard.is_empty() {
// Check for shutdown before waiting
if shutdown_receiver.try_recv().is_ok() {
return Ok(()); // Exit the thread
}
let mut guard = state.lock();

// Wait for notification that data is available
// Wait until there is data to write or shutdown has been requested
while guard.buffer.is_empty() && !guard.closed {
cvar.wait(&mut guard);
}

// Check again for shutdown after waking
if shutdown_receiver.try_recv().is_ok() {
return Ok(()); // Exit the thread
}
// Only exit once all buffered data has been drained. A non-empty
// buffer always takes the drain branch below, even after shutdown.
if guard.buffer.is_empty() && guard.closed {
return Ok(());
}

// We have data to process
let mut data = std::mem::take(&mut *guard);
let data = std::mem::take(&mut guard.buffer);
drop(guard); // Release lock before I/O

// Perform actual write (potentially blocking I/O)
handle.write_all(data.drain(..).as_slice())?;
handle.write_all(&data)?;
handle.flush()?;
Comment thread
nick-youngblut marked this conversation as resolved.
}
});

ThreadWriter {
buffer_pair,
shutdown_sender,
state_pair,
join_handle: Some(join_handle),
}
}

fn ingest(&self, data: &[u8]) {
let (buffer, cvar) = &*self.buffer_pair;
let (state, cvar) = &*self.state_pair;
loop {
let mut guard = buffer.lock();
if guard.len() <= MAXIMUM_BUFFER_SIZE {
guard.extend_from_slice(data);
let mut guard = state.lock();
if guard.buffer.len() <= MAXIMUM_BUFFER_SIZE {
guard.buffer.extend_from_slice(data);
cvar.notify_one();
break;
} else {
Comment thread
nick-youngblut marked this conversation as resolved.
Expand All @@ -167,16 +176,15 @@ impl ThreadWriter {

impl Drop for ThreadWriter {
fn drop(&mut self) {
// Signal thread to shut down
self.shutdown_sender
.send(())
.expect("Error in sending signal");

// notify the condition variable to wake up the worker thread
let (_buffer, cvar) = &*self.buffer_pair;
cvar.notify_all(); // make sure to wake the thread even if the buffer is empty
// Signal that no more data will arrive, then wake the worker. Setting the
// flag under the lock before notifying closes the wakeup gap.
{
let (state, cvar) = &*self.state_pair;
state.lock().closed = true;
cvar.notify_all();
}

// Wait for thread to finish
// Wait for thread to finish (it will drain any remaining buffered data first)
if let Some(handle) = self.join_handle.take() {
handle
.join()
Expand Down Expand Up @@ -365,4 +373,40 @@ mod tests {
assert_eq!(written1, b"ACGT");
assert!(written2.is_empty());
}

// ThreadWriter shutdown tests (finding #2: pending data must not be dropped)
#[test]
fn thread_writer_drains_pending_data_on_drop() {
let data = Arc::new(Mutex::new(Vec::new()));
let handle: BoxedWriter = Box::new(TestWriter { data: data.clone() });

let payload = b"ACGTACGTACGT";
{
let writer = ThreadWriter::new(handle);
writer.ingest(payload);
// `writer` is dropped here; all ingested bytes must be flushed first.
}

let written = data.lock().unwrap().clone();
assert_eq!(written, payload);
}

#[test]
fn thread_writer_drains_pending_data_under_shutdown_race() {
// The data-loss bug was race-sensitive: ingest immediately before drop,
// repeated many times, to make any regression observably flaky.
let payload = b"the-quick-brown-fox-jumps";
for _ in 0..1000 {
let data = Arc::new(Mutex::new(Vec::new()));
let handle: BoxedWriter = Box::new(TestWriter { data: data.clone() });

{
let writer = ThreadWriter::new(handle);
writer.ingest(payload);
}

let written = data.lock().unwrap().clone();
assert_eq!(written, payload);
}
}
}
Loading