Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl LogProcessor for BatchLogProcessor {
// Successfully sent the log record to the data channel.
// Increment the current batch size and check if it has reached
// the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
if self.current_batch_size.fetch_add(1, Ordering::AcqRel) + 1
>= self.max_export_batch_size
{
// Check if the a control message for exporting logs is
Expand Down Expand Up @@ -372,25 +372,30 @@ impl BatchLogProcessor {
where
E: LogExporter + Send + Sync + 'static,
{
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
let target = current_batch_size.load(Ordering::Acquire); // `target` is used to determine the stopping criteria for exporting logs.
let mut result = OTelSdkResult::Ok(());
let mut total_exported_logs: usize = 0;

while target > 0 && total_exported_logs < target {
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
let batch_limit = max_export_size.min(target - total_exported_logs);

// Get up to the remaining target batch size from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == max_export_size {
if logs.len() == batch_limit {
break;
}
}

let count_of_logs = logs.len(); // Count of logs that will be exported
if count_of_logs == 0 {
break;
}
total_exported_logs += count_of_logs;

result = export_batch_sync(exporter, logs, last_export_time); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
current_batch_size.fetch_sub(count_of_logs, Ordering::AcqRel);
}
result
}
Expand Down
68 changes: 60 additions & 8 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,25 +492,32 @@ impl BatchSpanProcessor {
where
E: SpanExporter + Send + Sync + 'static,
{
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting spans.
let target = current_batch_size.load(Ordering::Acquire); // `target` is used to determine the stopping criteria for exporting spans.
let mut result = OTelSdkResult::Ok(());
let mut total_exported_spans: usize = 0;

while target > 0 && total_exported_spans < target {
// Get up to `max_export_batch_size` amount of spans from the channel and push them to the spans vec
let batch_limit = config
.max_export_batch_size
.min(target - total_exported_spans);

// Get up to the remaining target batch size from the channel and push them to the spans vec
while let Ok(span) = spans_receiver.try_recv() {
spans.push(span);
if spans.len() == config.max_export_batch_size {
if spans.len() == batch_limit {
break;
}
}

let count_of_spans = spans.len(); // Count of spans that will be exported
if count_of_spans == 0 {
break;
}
total_exported_spans += count_of_spans;

result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting

current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
current_batch_size.fetch_sub(count_of_spans, Ordering::AcqRel);
}
result
}
Expand Down Expand Up @@ -568,7 +575,7 @@ impl SpanProcessor for BatchSpanProcessor {
// Successfully sent the span to the data channel.
// Increment the current batch size and check if it has reached
// the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
if self.current_batch_size.fetch_add(1, Ordering::AcqRel) + 1
>= self.max_export_batch_size
{
// Check if the a control message for exporting spans is
Expand Down Expand Up @@ -1179,9 +1186,12 @@ mod tests {

use crate::Resource;
use opentelemetry::{Key, KeyValue, Value};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Instant,
};

// Mock exporter to test functionality
Expand Down Expand Up @@ -1267,6 +1277,48 @@ mod tests {
assert_eq!(exported_spans[0].name, "force_flush_span");
}

#[test]
fn batchspanprocessor_does_not_overdrain_unaccounted_spans() {
let exporter = MockSpanExporter::new();
let exported_spans = exporter.exported_spans.clone();
let (sender, receiver) = std::sync::mpsc::sync_channel(4);
let current_batch_size = AtomicUsize::new(1);
let config = BatchConfigBuilder::default()
.with_max_queue_size(4)
.with_max_export_batch_size(4)
.build();
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();

sender.send(create_test_span("counted")).unwrap();
sender.send(create_test_span("unaccounted")).unwrap();

let result = BatchSpanProcessor::get_spans_and_export(
&receiver,
&exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);

assert!(result.is_ok(), "export should succeed");
assert_eq!(
current_batch_size.load(Ordering::Relaxed),
0,
"helper should only subtract the counted span"
);
assert_eq!(
exported_spans.lock().unwrap().len(),
1,
"helper should export at most the target batch size snapshot"
);
assert!(
receiver.try_recv().is_ok(),
"one span should remain queued for a later export cycle"
);
}

#[test]
fn batchspanprocessor_shutdown() {
// Setup exporter and processor - following the same pattern as test_batch_shutdown from logs
Expand Down
Loading