Skip to content
14 changes: 14 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

## vNext

- **Added** `Counter::bind()` and `Histogram::bind()` SDK implementations that
return pre-bound measurement handles (`BoundCounter<T>`, `BoundHistogram<T>`).
Bound instruments resolve the attribute-to-aggregator mapping once at bind time
and cache the result, eliminating per-call HashMap lookups. Bound entries are
never evicted during delta collection — idle cycles produce no export but the
tracker persists. If `bind()` is called at the cardinality limit, the handle
transparently falls back to the unbound measurement path. Gated behind the
`experimental_metrics_bound_instruments` feature flag. Benchmarks show ~28x
speedup for counter operations and ~9x for histograms.
- Delta metrics collection now uses in-place eviction instead of draining the
HashMap on every collect cycle. Stale attribute sets that received no measurements
since the last collection are evicted. Note: recovery from cardinality overflow
now requires 2 collect cycles — the first marks entries as stale, the second
evicts them.
- **Breaking** The SDK `testing` feature is now runtime agnostic. [#3407][3407]
- `TokioSpanExporter` and `new_tokio_test_exporter` have been renamed to `TestSpanExporter` and `new_test_exporter`.
- The following transitive dependencies and features have been removed: `tokio/rt`, `tokio/time`, `tokio/macros`, `tokio/rt-multi-thread`, `tokio-stream`, `experimental_async_runtime`
Expand Down
8 changes: 7 additions & 1 deletion opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url", "experimental_async_runtime"]
logs = ["opentelemetry/logs"]
metrics = ["opentelemetry/metrics"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "tokio/sync"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread", "experimental_metrics_bound_instruments"]
experimental_async_runtime = []
rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"]
Expand All @@ -60,6 +60,7 @@ experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimenta
experimental_logs_concurrent_log_processor = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"]
experimental_metrics_disable_name_validation = ["metrics"]
experimental_metrics_bound_instruments = ["metrics", "opentelemetry/experimental_metrics_bound_instruments"]
bench_profiling = []

[[bench]]
Expand Down Expand Up @@ -125,6 +126,11 @@ name = "log"
harness = false
required-features = ["logs"]

[[bench]]
name = "bound_instruments"
harness = false
required-features = ["metrics", "experimental_metrics_custom_reader", "experimental_metrics_bound_instruments"]

[lib]
bench = false

Expand Down
102 changes: 102 additions & 0 deletions opentelemetry-sdk/benches/bound_instruments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider, Temporality};

// Run this benchmark with:
// cargo bench --bench bound_instruments --features metrics,experimental_metrics_custom_reader,experimental_metrics_bound_instruments
//
// Apple M4 Max, 16 cores (12 performance + 4 efficiency), macOS 15.4
//
// Results (3 attributes: method, status, path):
// Counter_Unbound_Delta time: [53.20 ns]
// Counter_Bound_Delta time: [1.87 ns] ~28x faster
// Histogram_Unbound_Delta time: [58.58 ns]
// Histogram_Bound_Delta time: [6.57 ns] ~8.9x faster
// Counter_Bound_Multithread/2 time: [22.19 µs] (100 adds/thread)
// Counter_Bound_Multithread/4 time: [35.32 µs] (100 adds/thread)
// Counter_Bound_Multithread/8 time: [66.49 µs] (100 adds/thread)

fn create_provider(temporality: Temporality) -> SdkMeterProvider {
let reader = ManualReader::builder()
.with_temporality(temporality)
.build();
SdkMeterProvider::builder().with_reader(reader).build()
}

fn bench_bound_instruments(c: &mut Criterion) {
let mut group = c.benchmark_group("BoundInstruments");
group.sample_size(100);

let attrs = [
KeyValue::new("method", "GET"),
KeyValue::new("status", "200"),
KeyValue::new("path", "/api/v1/users"),
];

// Counter: Unbound vs Bound (Delta)
{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let counter = meter.u64_counter("unbound").build();
group.bench_function("Counter_Unbound_Delta", |b| {
b.iter(|| counter.add(1, &attrs));
});
}

{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let counter = meter.u64_counter("bound").build();
let bound = counter.bind(&attrs);
group.bench_function("Counter_Bound_Delta", |b| {
b.iter(|| bound.add(1));
});
}

// Histogram: Unbound vs Bound (Delta)
{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let histogram = meter.f64_histogram("unbound_hist").build();
group.bench_function("Histogram_Unbound_Delta", |b| {
b.iter(|| histogram.record(1.5, &attrs));
});
}

{
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let histogram = meter.f64_histogram("bound_hist").build();
let bound = histogram.bind(&attrs);
group.bench_function("Histogram_Bound_Delta", |b| {
b.iter(|| bound.record(1.5));
});
}

// Multi-threaded bound counter
for num_threads in [2, 4, 8] {
let provider = create_provider(Temporality::Delta);
let meter = provider.meter("bench");
let counter = meter.u64_counter("mt_bound").build();
let bound = counter.bind(&attrs);

group.bench_function(format!("Counter_Bound_Multithread/{num_threads}"), |b| {
b.iter(|| {
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for _ in 0..100 {
bound.add(1);
}
});
}
});
});
});
}

group.finish();
}

criterion_group!(benches, bench_bound_instruments);
criterion_main!(benches);
30 changes: 30 additions & 0 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};

#[cfg(feature = "experimental_metrics_bound_instruments")]
use opentelemetry::metrics::BoundSyncInstrument;
use opentelemetry::{
metrics::{AsyncInstrument, SyncInstrument},
InstrumentationScope, Key, KeyValue,
};

#[cfg(feature = "experimental_metrics_bound_instruments")]
use crate::metrics::internal::BoundMeasure;
use crate::metrics::{aggregation::Aggregation, internal::Measure};

use super::meter::{
Expand Down Expand Up @@ -388,6 +392,32 @@ impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
measure.call(val, attrs)
}
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundSyncInstrument<T> + Send + Sync> {
let bound_measures: Vec<Box<dyn BoundMeasure<T>>> = self
.measures
.iter()
.map(|m| m.bind(attrs, Arc::clone(m)))
.collect();
Box::new(ResolvedBoundMeasures {
measures: bound_measures,
})
}
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) struct ResolvedBoundMeasures<T> {
measures: Vec<Box<dyn BoundMeasure<T>>>,
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Copy + 'static> BoundSyncInstrument<T> for ResolvedBoundMeasures<T> {
fn measure(&self, val: T) {
for measure in &self.measures {
measure.call(val);
}
}
}

#[derive(Clone)]
Expand Down
31 changes: 31 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,37 @@ use super::{
/// Receives measurements to be aggregated.
pub(crate) trait Measure<T>: Send + Sync + 'static {
fn call(&self, measurement: T, attrs: &[KeyValue]);

#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue], fallback: Arc<dyn Measure<T>>) -> Box<dyn BoundMeasure<T>>;
}

/// A pre-bound measurement handle that bypasses attribute lookup.
#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) trait BoundMeasure<T>: Send + Sync + 'static {
fn call(&self, measurement: T);
}

/// Fallback bound handle for aggregator types that don't support direct binding.
/// Delegates every call to the unbound `Measure::call()` path.
#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) struct BoundFallbackHandle<T> {
measure: Arc<dyn Measure<T>>,
attrs: Vec<KeyValue>,
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Send + Sync + 'static> BoundMeasure<T> for BoundFallbackHandle<T> {
fn call(&self, measurement: T) {
self.measure.call(measurement, &self.attrs);
}
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T> BoundFallbackHandle<T> {
pub(crate) fn new(measure: Arc<dyn Measure<T>>, attrs: Vec<KeyValue>) -> Self {
Self { measure, attrs }
}
}

/// Stores the aggregate of measurements into the aggregation and returns the number
Expand Down
17 changes: 14 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "experimental_metrics_bound_instruments")]
use std::sync::Arc;
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};

use opentelemetry::{otel_debug, KeyValue};
Expand All @@ -12,6 +14,8 @@ use super::{
aggregate::{AggregateTimeInitiator, AttributeSetFilter},
Aggregator, ComputeAggregation, Measure, Number, ValueMap,
};
#[cfg(feature = "experimental_metrics_bound_instruments")]
use super::{BoundFallbackHandle, BoundMeasure};

pub(crate) const EXPO_MAX_SCALE: i8 = 20;
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
Expand Down Expand Up @@ -412,9 +416,11 @@ impl<T: Number> ExpoHistogram<T> {
h.start_time = time.start;
h.time = time.current;

let config = *self.value_map.config();
self.value_map
.collect_and_reset(&mut h.data_points, |attributes, attr| {
let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
let reset = attr.clone_and_reset(&config);
let b = reset.into_inner().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
count: b.count,
Expand Down Expand Up @@ -524,6 +530,11 @@ where
self.value_map.measure(measurement, filtered);
})
}

#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue], fallback: Arc<dyn Measure<T>>) -> Box<dyn BoundMeasure<T>> {
Box::new(BoundFallbackHandle::new(fallback, attrs.to_vec()))
}
}

impl<T> ComputeAggregation for ExpoHistogram<T>
Expand Down Expand Up @@ -721,7 +732,7 @@ mod tests {
for v in test.values {
Measure::call(&h, v, &[]);
}
let dp = h.value_map.no_attribute_tracker.lock().unwrap();
let dp = h.value_map.no_attribute_tracker.aggregator.lock().unwrap();

assert_eq!(test.expected.max, dp.max);
assert_eq!(test.expected.min, dp.min);
Expand Down Expand Up @@ -778,7 +789,7 @@ mod tests {
for v in test.values {
Measure::call(&h, v, &[]);
}
let dp = h.value_map.no_attribute_tracker.lock().unwrap();
let dp = h.value_map.no_attribute_tracker.aggregator.lock().unwrap();

assert_eq!(test.expected.max, dp.max);
assert_eq!(test.expected.min, dp.min);
Expand Down
Loading
Loading