Skip to content

Implement memory-efficient streaming query execution #64

@unclesp1d3r

Description

@unclesp1d3r

🚀 Implement Memory-Efficient Streaming Query Execution

📋 Overview

Gold Digger currently loads entire MySQL result sets into memory, causing potential out-of-memory errors with large datasets. This task implements streaming query execution to process rows incrementally, enabling the tool to handle multi-gigabyte result sets efficiently.

🎯 Requirements Traceability

Epic: #52 - Gold Digger Core Enhancements Epic

Requirement 6.1: Memory-Efficient Processing ⚡

WHEN processing large result sets THEN the system SHALL stream rows without loading all into memory

Requirement 6.2: Memory Usage Scaling 📊

WHEN streaming is active THEN memory usage SHALL scale with row width (O(row_width)), not row count (O(total_data))

Requirement 6.3: Robust Error Handling 🛡️

WHEN streaming fails THEN the system SHALL exit with code 4 (query execution error)

🔍 Current State Analysis

❌ Problem: Memory-Inefficient Implementation

// src/main.rs:67 - Loads ENTIRE result set into memory
let result: Vec<mysql::Row> = match conn.query(database_query) {
    Ok(result) => result,
    Err(e) => {
        eprintln!("Database query failed: {}", e);
        process::exit(4);
    }
};

📊 Impact Assessment

  • Memory Usage: O(total_result_set_size) - scales with row count ❌
  • Large Datasets: Multi-GB result sets cause OOM crashes ❌
  • Scalability: Cannot handle enterprise-scale data exports ❌
  • Resource Efficiency: Wastes server memory unnecessarily ❌

🏗️ Architecture Gap

Despite having comprehensive design specifications in .kiro/specs/gold-digger/design.md, the streaming components are not implemented:

  1. Missing RowStream: Core streaming iterator not implemented
  2. Non-Streaming Query Execution: Uses query() instead of query_iter()
  3. Batch Processing Writers: Format writers process all data at once
  4. Memory Accumulation: All data structures grow with dataset size

💡 Proposed Solution

Phase 1: Core Streaming Infrastructure 🏗️

/// Memory-efficient row streaming iterator
pub struct RowStream<'a> {
    result: mysql::QueryResult<'a>,
    columns: Vec<mysql::consts::Column>,
    row_count: usize,
    current_row: usize,
}

impl<'a> Iterator for RowStream<'a> {
    type Item = Result<Vec<String>, mysql::Error>;
    
    fn next(&mut self) -> Option<Self::Item> {
        // Stream rows one at a time without memory accumulation
        match self.result.next() {
            Some(Ok(row)) => {
                self.current_row += 1;
                Some(mysql_row_to_strings(row))
            }
            Some(Err(e)) => Some(Err(e)),
            None => None,
        }
    }
}

Phase 2: Streaming Query Executor 🚀

pub struct StreamingQueryExecutor {
    conn: mysql::PooledConn,
}

impl StreamingQueryExecutor {
    pub fn execute_streaming(&mut self, query: &str) -> Result<RowStream, mysql::Error> {
        // Use query_iter() for true streaming - no memory accumulation
        let result = self.conn.query_iter(query)?;
        let columns = result.columns().cloned().collect();
        
        Ok(RowStream {
            result,
            columns,
            row_count: 0,
            current_row: 0,
        })
    }
}

Phase 3: Incremental Format Writers 📝

// CSV Streaming Writer
pub fn write_csv_streaming<W: Write>(
    writer: &mut W,
    row_stream: RowStream,
) -> Result<(), Box<dyn Error>> {
    // Write headers first
    write_csv_headers(writer, &row_stream.columns)?;
    
    // Process rows incrementally - constant memory usage
    for row_result in row_stream {
        let row = row_result?;
        write_csv_row(writer, &row)?;
        writer.flush()?; // Ensure incremental output
    }
    Ok(())
}

🔧 Implementation Plan

🏃‍♂️ Sprint Tasks

  • T1: Implement RowStream<'a> iterator with MySQL result streaming
  • T2: Create StreamingQueryExecutor using mysql::query_iter()
  • T3: Refactor CSV writer for incremental row processing
  • T4: Refactor JSON writer for streaming array output
  • T5: Refactor TSV writer for row-by-row processing
  • T6: Update main execution flow to use streaming
  • T7: Implement safe MySQL type-to-string conversion for streaming
  • T8: Add streaming-specific error handling with exit code 4
  • T9: Create memory usage tests with large datasets
  • T10: Add performance benchmarks comparing streaming vs batch

🛠️ Technical Dependencies

# Already available - no new dependencies required
[dependencies]
mysql = { version = "26.0.1", features = ["minimal"] }

🔄 Architecture Transformation

// BEFORE: Memory-inefficient batch processing
let result: Vec<mysql::Row> = conn.query(query)?;           // ❌ Loads everything
let rows = rows_to_strings(result)?;                        // ❌ Converts everything  
write_all_rows(rows, output)?;                              // ❌ Writes everything

// AFTER: Memory-efficient streaming processing  
let mut executor = StreamingQueryExecutor::new(conn);
let row_stream = executor.execute_streaming(query)?;        // ✅ Streams incrementally
write_streaming_rows(row_stream, output)?;                  // ✅ Processes row-by-row

🧪 Validation Strategy

📊 Memory Efficiency Testing

# Test with progressively larger datasets
./gold_digger --config large_test.toml "SELECT * FROM massive_table LIMIT 1000000"
# Memory usage should remain constant ~10-50MB regardless of row count

🏋️ Load Testing Scenarios

  1. 1M Row Test: Verify constant memory usage
  2. 10M Row Test: Ensure no OOM errors
  3. Multi-GB Dataset: Test real-world enterprise scenarios
  4. Network Interruption: Test streaming resilience
  5. Format Integrity: Ensure output remains RFC-compliant

⚡ Performance Benchmarking

  • Compare streaming vs batch processing execution times
  • Measure memory usage scaling with dataset size
  • Test output format correctness with large datasets

✅ Definition of Done

🎯 Functional Requirements

  • R6.1: Memory usage scales with row width, not row count ✅
  • R6.2: No loading of entire result sets into memory ✅
  • R6.3: Streaming failures exit with code 4 ✅
  • All output formats (JSON, CSV, TSV) support streaming
  • Integration maintains existing TLS and configuration features
  • Backward compatibility with existing command-line interface

🧪 Quality Gates

  • Unit tests for streaming components (>90% coverage)
  • Integration tests with datasets >100MB
  • Memory usage tests demonstrating O(row_width) scaling
  • Performance tests showing acceptable streaming overhead
  • Error handling tests for streaming failure scenarios

📚 Technical References

🏗️ Architecture Documentation

  • Design Specification: .kiro/specs/gold-digger/design.md - Comprehensive streaming architecture
  • Requirements: .kiro/specs/gold-digger/requirements.md - Requirements 6.1-6.3 details
  • MySQL Rust Crate: docs.rs/mysql - query_iter() documentation

🔗 Related Issues

  • Epic: Epic: Core enhancements #52 - Gold Digger Core Enhancements Epic
  • Dependencies: None - standalone implementation
  • Blocked By: None - ready for immediate development

💪 Impact: Enables Gold Digger to handle enterprise-scale MySQL data exports efficiently
🎯 Priority: High - Required for production scalability
⏱️ Estimate: 2-3 sprints (streaming infrastructure + format writers + testing)

Epic: #52 - Gold Digger Core Enhancements Epic

Metadata

Metadata

Assignees

Labels

area/coreCore CLI and featurescategory/code-qualityCode quality and linting improvementsenhancementNew feature or requestpriority/P1-highHigh priority for standards compliancerustPull requests that update Rust codetype/enhancementNew features and improvements

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions