Skip to content

feat(health): Experimental streaming log support via SSE (and it's messy implications)#711

Open
mkoci wants to merge 44 commits intoNVIDIA:mainfrom
mkoci:feat_health_streaming_sse_logs
Open

feat(health): Experimental streaming log support via SSE (and it's messy implications)#711
mkoci wants to merge 44 commits intoNVIDIA:mainfrom
mkoci:feat_health_streaming_sse_logs

Conversation

@mkoci
Copy link
Copy Markdown
Contributor

@mkoci mkoci commented Mar 24, 2026

i## Description
Adds support for streaming Redfish LogEntries through SSE (Server-Sent Events). Introduces an async OTLP log export pipeline that batches and ships collected log events to an OpenTelemetry collector over gRPC.

But Why?

Periodic collection is not well suited for Redfish LogEntries or Events. In BMC's, completely relying on the periodic collection can cause resource intensive bursty behavior as Logs and Events are accumulating behind a seemingly harmless Redfish endpoint. Once this accumulation occurs, a periodic scrape of the endpoint stresses network I/O on a BMC causing red-lining behavior and downtime.

This change attempts to address this by employing SSE and modifying the existing periodic and synchronous behavior of the Health crate. To ensure the Health service is not overwhelmed, we have implemented backpressure in an async pipeline such that in times of high load, the Health service distributes the system stress across BMC's and does not drop entries. Using the same DBus journal that created load issues when using periodic collection, we leverage the DBus journal to distribute the load away from the Health service as a kind of leaky bucket approach to make log/event flow more consistent.

Dude, where's my sync Sink?

To make an omelette, sometimes you have to break some eggs. In this case, the eggs were the synchronous nature of our pipeline. SSE coming from nv-redfish is already async, likewise the collectors themselves are tokio::spawn tasks. If we input streaming logs/events into our synchronous pipeline, we have no way to suspend too much pressure from the BMC's or an overwhelmed OTLP client on the other end of the OTLPSink without dropping information.I really didn't want to make such impactful changes, but here is a non-exhaustive list of things I attempted:

Why not use try_send in the DataSink?
Does not block and drops logs - no backpressure.

Isn't there some way to do this with an mpsc::SyncSender?
Not without blocking the OS thread and stalling the tokio worker tasks - all tasks assigned to the worker...

What about something like the OverrideQueue?
The OverrideQueue uses a unique key to loosely bound Overrides. This works because there's a finite number of MachineId's and ReportSource's and we only care about the latest. For logs / events this leads to dropping, not to mention the queue size for logs / events has much more freedom compared to the bounded machine/report uniqueness dimension.

Breaking Changes

  • This PR contains breaking changes

Type of Change

  • Add - New feature or capability
  • Change - Changes in existing functionality
  • Fix - Bug fixes
  • Remove - Removed features or deprecated functionality
  • Internal - Internal changes (refactoring, tests, docs, etc.)

Testing

  • Unit tests added/updated
  • Integration tests added/updated
  • Manual testing performed
  • No testing required (docs, internal refactor, etc.)

Additional Notes

  • The workspace pins tonic to 0.14 globally. In Health, we need to leverage tonic 0.14 Channel. The problem arises because opentelemetry-proto pins tonic to 0.12 where that Channel is incompatible with 0.14's Channel. I looked into upgrading opentelemetry-proto to 0.31 which pins tonic 0.14, and saw a refactor spilling into 8 or more crates.... likely out of scope for this PR, but needs to be done at some point.
  • In addition to unit tests, I've tested on rack. Added benches, but there is certainly room for optimization.
  • OtlpDrainTask currently flushes batches on count (512) or time (2s), whichever comes first. Retries with exponential backoff on transient gRPC errors. This will likely need tuning.
  • Periodic log collection preserved and accessible through config - carved into its own logs/periodic.rs module.

mkoci added 28 commits March 13, 2026 03:59
…ueRest. Update example and add doc comments.
…ing patterns in PeriodicCollector with injection/config
…ssingPipeline to avoid sync/async conv with DataSink+streaming
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Mar 24, 2026

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@mkoci mkoci changed the title Feat health streaming sse logs feat(health): Experimental streaming log support via SSE (and it's messy implications) Mar 24, 2026
Copy link
Copy Markdown
Contributor

@Matthias247 Matthias247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not looked at everything so far.

But one high level question upfront: Is there a specific reason that SSE connections need to make the internal event handling asynchronous? My understanding is it could still be synchronous (the events are written to channels synchronously and can then be processed on the other side at arbitrary speed).

Is the reasoning here backpressure? If yes, it could likely be handled other ways too. E.g. we could implement a channel where we just drop the newest (or oldest) events if the processing infrastructure can't keep up. But @yoks is certainly the expert on how its designed so far and can provide more input.


/// Trait for collectors that maintain a long-lived stream (SSE, gRPC, etc.)
/// runtime.rs creates the BMC client and injects it, the collector opens the stream and maps payloads to events
pub trait StreamingCollector<B: Bmc>: Send + 'static {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason this isn't using async_trait? That's much more ergonomic

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threre was very specific reason i was unable to use async_trait (which i forgot), not sure if that the case here. I think we need to re-evaluate tat.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've updated the description with a bit more detail on the why of backpressure. If the BMC's already buffer naturally with DBus, what's the upside of dropping logs/events?

I don't see anything blocking swapping in async_trait, let me know and i'll add it to this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no value in backpressure from source, as you do not controll source (you could not backpressure it). So only solution for slow sink is to buffer/drop tail if buffer is full. This removes whole async pipeline and makes it easier.
Or i fail to see bigger picture here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also SSE->File does not require backpressure. or SSE->Tracing. So i think it is characterstic of the Sink (OTEL).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think you are right. And in a system with 2 variable ends that you don't control (neither the frequency at which logs are produced nor the frequency at which the sink accepts logs), there is pretty much no way besides dropping something at some point in time.

So I think keeping the pipeline synchronous for the moment is ok. We can still change it in the future once we learn more about the problem.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoks I can understand what you're saying about not controlling the source.

At the same time "you could not backpressure it" I partially agree... OpenBMC buffers 10MB on the HTTP sender side. I have no idea if HPE, Dell, or others that support SSE (some only support POST) have generous buffers like OpenBMC. Point taken, and at the same time this back-pressure does work for OpenBMC and follows normal HTTP and underlying TCP patterns for sender/receiver blocking.

Big question: Are we supporting Redfish Servers that support SSE, but don't follow proper async guidelines?
HTTP/2 is not new, am I wrong in thinking that client-side blocking is not supported for SSE on Redfish servers? We are not talking about primitive Redfish servers that offer push support via POST, we're talking about applying back-pressure to redfish servers that support SSE subscriptions only, correct? That is the only place I'm applying any back-pressure.

@Matthias247 So we are totally fine keeping health synchronous and just dropping events? If so, this is much easier. Note, some of these events could be leak events, not just OOB logs 😂

The OTLPSink side is a completely different animal. Depending on how sharding is implemented in the health crate and what type of scaling happens in the OTLP client-side, there are knobs that can be tuned there, no?

Copy link
Copy Markdown
Contributor

@yoks yoks Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is several points here. First is backpressure from client buffer could work, but it is not realy solves our problem. Log event is something which already happened, you still need to consume it. If you backpressure at the rate where your consumer is slower than producer, you will lose data, and worse - you will loose recent data, not old one (depending on how it build). So backpressure is not really possible here.

Also second point - your async pipeline can be split into log/tracing/OTEL. Only OTEL is slow consumer, so you trying async backpressure propogation just because of OTEL.

I believe building same buffer at before the slow sink gives us more controll on how we work with slow consumer.One solution is very simmple buffer which drops tail (oldest message) if it at capacity. Another solution, which is we already kinda have, is to write all to file and launch otel exporter as sidecar (or seprate process), which will read from file and publush it down the line. So file becomes this buffer.

And finnaly, HTTP/2 while smells like something you can do backpressure, but HTTP/2 is not flow-aware, it works on TCP level. So you would not be able to build real backpressure system.

To make backpressure work properly you want closed loop system where producer can wait, usually this is lossless producer (Kafka is simple example). Logs over HTTP/2 are lossy be definition. so backpressure brings more problems than it is worth.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to make your logs durable, and not lossy, you need to write them to durable storage as soon as possible and transfer them to logging system from there. But my impression what we are not treating them as critical (leak event is good example, but we do not act on event, we act on sensor data).

]);

let registry = collector_registry.registry();
let metrics = StreamMetrics::new(registry, collector_registry.prefix(), const_labels)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating new metric objects per stream would lead to accumulation of a lot of similar objects with would then conflict during metric scraping - unless there is a per endpoint label.
It would also lead to leaking the metric objects when the endpoint is no longer monitored (host force-deleted).

We should find a way to create the metric registries only once upfront, and make it possible for the collectors to update the latest data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collectors create per-endpoint, so this should be fine.

There is two types of metrics, Prometheus sink ones and general service. I already started to moving them into ComponentMetrics. Matt added this StreamMetrics, i think we should consolidate them, and make clear of use of Service level metrics from Sink level metrics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Matthias247 agreed, it's not ideal.

@yoks Perhaps I could track this as a Github issue to be addressed later or would you rather I just tackle it in this PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets create issue. Too much to tackle in single PR

let connection_state = Gauge::with_opts(
Opts::new(
format!("{prefix}_stream_connection_state"),
"Stream connection state per SSE readyState: 0=CONNECTING, 1=OPEN, 2=CLOSED",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't be bound what to what the browser and JS are doing, and could just use the labels in text form?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we even need this metric. Seems oddly specific

Copy link
Copy Markdown
Contributor Author

@mkoci mkoci Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, this was an artifact of testing on rack and trying to validate what was going on with connections (after pulling my hair out before discovering this)

Will remove.

format!("log_collector_{}", endpoint.addr.hash_key()),
metrics_prefix,
)?);
match create_log_file_writer(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have the historic background on this. But I'M surprised we create the file writer in only one of the branches (periodic).

My take is that whether we collect logs via polling or SSE is an implementation detail on the BMC side => We do whatever is more efficient and quicker and what is supported. In any case, the result will be logs in our pipeline.

And these logs can then be written to sinks (e..g. files)

==> I'd decouple the sinks, set them up once, and made sure the logs from both path somehow get there.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, file writer sink is good way to separate them. And thens tream logs via Periodic or SSE collector (to both file/OTEL etc).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me

@yoks
Copy link
Copy Markdown
Contributor

yoks commented Mar 31, 2026

I think OTLP should be modelled as a sink, as it async and can have backpressure i think you can reuse OverrideQueue for that. Very similar on how HealthOverrides works

@yoks
Copy link
Copy Markdown
Contributor

yoks commented Mar 31, 2026

I have not looked at everything so far.

But one high level question upfront: Is there a specific reason that SSE connections need to make the internal event handling asynchronous? My understanding is it could still be synchronous (the events are written to channels synchronously and can then be processed on the other side at arbitrary speed).

Is the reasoning here backpressure? If yes, it could likely be handled other ways too. E.g. we could implement a channel where we just drop the newest (or oldest) events if the processing infrastructure can't keep up. But @yoks is certainly the expert on how its designed so far and can provide more input.

Yes, they should be synchronios as long as they are not hitting slow sink (e.g. API/OTEL). I think File sink should be fine as well. Answered above, but i design similar on how HealthOverride handled looks like the way how they can be handled.

@mkoci mkoci marked this pull request as ready for review April 2, 2026 02:54
@mkoci mkoci requested a review from a team as a code owner April 2, 2026 02:54
Copilot AI review requested due to automatic review settings April 2, 2026 02:54
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

🔐 TruffleHog Secret Scan

No secrets or credentials found!

Your code has been scanned for 700+ types of secrets and credentials. All clear! 🎉

🔗 View scan details

🕐 Last updated: 2026-04-02 02:55:54 UTC | Commit: 616afc7

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds experimental Redfish LogEntries streaming via SSE to reduce bursty periodic log scrapes, and introduces an async OTLP (gRPC) export path with bounded-channel backpressure to protect the health service under load.

Changes:

  • Introduces an EventPipeline that preserves the existing synchronous sink behavior while optionally forwarding OTLP-relevant events through a bounded async channel for backpressure.
  • Adds a streaming-collector runtime abstraction + SSE log collector, with logs collection now configurable as sse (default) or periodic.
  • Implements OTLP log export (proto generation, event→OTLP conversion, drain task with batching/flush/retry).

Reviewed changes

Copilot reviewed 24 out of 26 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
Makefile.toml Formatting adjustments for env vars and rustfmt tasks.
crates/ssh-console/Cargo.toml Reformat tokio features list.
crates/health/src/processor/mod.rs Adds handle_and_collect to capture original + derived events for downstream forwarding.
crates/health/src/pipeline.rs New async EventPipeline that forwards OTLP-relevant events via bounded channel.
crates/health/src/otlp/mod.rs New OTLP module with generated proto includes + re-exports.
crates/health/src/otlp/drain.rs New OTLP drain task: connect, batch, flush, retry w/ backoff.
crates/health/src/otlp/convert.rs Converts internal events/logs into OTLP ExportLogsServiceRequest.
crates/health/src/lib.rs Wires EventPipeline, OTLP drain lifecycle, and SSE-related error type.
crates/health/src/discovery/spawn.rs Spawns logs collectors based on mode (SSE vs periodic) and injects pipeline.
crates/health/src/discovery/iteration.rs Threads EventPipeline through discovery iteration/spawn.
crates/health/src/config.rs Adds OTLP sink config + logs mode config/validation and updates tests.
crates/health/src/collectors/sensors.rs Switches sensor collector emission to async pipeline.
crates/health/src/collectors/runtime.rs Adds streaming collector trait/runtime, SSE open helper, and backoff utilities.
crates/health/src/collectors/nvue/rest/collector.rs Switches NVUE collector emission to async pipeline.
crates/health/src/collectors/nmxt.rs Switches NMX-T collector emission to async pipeline.
crates/health/src/collectors/mod.rs Re-exports streaming runtime types and new SSE log collector.
crates/health/src/collectors/logs/sse.rs New SSE log collector mapping Redfish EventService payloads into log events.
crates/health/src/collectors/logs/periodic.rs Periodic logs updated to use pipeline + optional file writer + rotation tweak.
crates/health/src/collectors/logs/mod.rs New logs module split (periodic + sse).
crates/health/src/collectors/firmware.rs Switches firmware collector emission to async pipeline.
crates/health/example/config.example.toml Updates example to document collectors.logs.mode and periodic sub-table.
crates/health/Cargo.toml Adds prost/tonic-prost deps and build dep for proto compilation.
crates/health/build.rs New build script fetching/compiling OTLP protos for generated gRPC client.
crates/health/benches/processor_pipeline.rs Adds bench for handle_and_collect overhead.
crates/bmc-explorer/Cargo.toml Reformat nv-redfish feature list.
Cargo.lock Locks new prost/tonic-prost deps (and related lockfile churn).
Comments suppressed due to low confidence (1)

crates/health/src/collectors/logs/periodic.rs:612

  • last_seen_ids is updated even when writing the log batch to disk fails. If the log file is a required output, this can permanently skip entries on the next iteration (data loss on disk). Consider only advancing last_seen_ids (and total_log_count) after a successful write, or otherwise making the failure semantics explicit (e.g., retry on next run when the writer errors).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +198 to +204
if sinks.is_empty() {
return Ok(None);
}

let composite_sink: Arc<dyn DataSink> =
Arc::new(CompositeDataSink::new(sinks, metrics_manager.clone()));
let inner = EventProcessingPipeline::new(processors, composite_sink, metrics_manager);
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_event_pipeline returns Ok(None) when sinks is empty, before considering sinks.otlp. This makes OTLP-only deployments impossible (events will never be forwarded to the OTLP drain when all synchronous sinks are disabled). Consider treating OTLP as a consumer for pipeline creation (e.g., build the pipeline when OTLP is enabled even if sinks is empty, using a no-op/composite sink for the synchronous side).

Copilot uses AI. Check for mistakes.
dyn futures::TryStream<
Ok = EventStreamPayload,
Error = HealthError,
Item = Result<EventStreamPayload, HealthError>,
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SseStream is defined with dyn futures::TryStream<..., Item = ...>, but TryStream doesn’t have an Item associated type (it inherits Stream<Item = Result<Ok, Error>>). This will not compile. Drop the Item = ... constraint (or express it via an additional Stream<Item = ...> bound if needed).

Suggested change
Item = Result<EventStreamPayload, HealthError>,

Copilot uses AI. Check for mistakes.
Comment on lines +32 to +60
// production CI/Docker builds should set OTLP_PROTO_DIR to a pre-fetched Dockerfile layer to avoid runtime network deps
// the curl fallback below should only be used for local development.
fn fetch_otlp_protos(out_dir: &Path) -> PathBuf {
if let Ok(dir) = std::env::var("OTLP_PROTO_DIR") {
let path = PathBuf::from(dir);
if path.exists() {
return path;
}
}

let proto_dir = out_dir.join("otlp-proto");

for proto_file in OTLP_PROTO_FILES {
let dest = proto_dir.join(proto_file);
if dest.exists() {
continue;
}

std::fs::create_dir_all(dest.parent().unwrap()).expect("create proto parent dirs");

let url = format!("{OTLP_PROTO_BASE_URL}/{OTLP_PROTO_VERSION}/{proto_file}");

let status = Command::new("curl")
.args(["-sSfL", "--create-dirs", "-o"])
.arg(&dest)
.arg(&url)
.status()
.expect("curl must be available to download OTLP proto files");

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build script falls back to downloading proto files via curl at build time and only declares rerun-if-env-changed=OTLP_PROTO_DIR. This can make builds non-reproducible/offline-hostile and also won’t re-run when the contents of OTLP_PROTO_DIR change. Consider (a) emitting cargo:rerun-if-changed= for the proto inputs, and (b) gating the network download behind an explicit opt-in (or vendoring the protos) so CI/release builds never depend on external network/curl availability.

Copilot uses AI. Check for mistakes.
loop {
tokio::select! {
_ = self.cancel.cancelled() => return None,
result = endpoint.connect() => {
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpoint.connect() is called inside a retry loop on a single endpoint value created by Channel::from_shared(...). In tonic, connect() typically consumes the endpoint/connector, so this pattern is likely to fail to compile or only allow a single attempt. Consider constructing/cloning the endpoint per attempt (e.g., keep an Endpoint/builder and call endpoint.clone().connect().await) so retries don’t move the original value.

Suggested change
result = endpoint.connect() => {
result = endpoint.clone().connect() => {

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In tonic, connect() typically consumes the endpoint/connector

No copilot, no. If it consumed it, this would fail to compile. bad bot.

Comment on lines +396 to +403
/// SSE is the preferred mode for real-time log streaming.
/// Periodic polling is retained as a fallback for BMCs that lack SSE support.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogCollectionMode {
Sse,
Periodic,
}
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs on LogCollectionMode suggest periodic polling is a fallback for BMCs that lack SSE support, but the implementation selects a single global mode (collectors.logs.mode) and does not appear to auto-fallback per-endpoint when SSE is unavailable (e.g., HealthError::SseNotAvailable). Consider clarifying the comment (manual/operator fallback) or implementing an automatic downgrade path when SSE isn’t supported.

Copilot uses AI. Check for mistakes.
@mkoci
Copy link
Copy Markdown
Contributor Author

mkoci commented Apr 4, 2026

I think OTLP should be modelled as a sink, as it async and can have backpressure i think you can reuse OverrideQueue for that. Very similar on how HealthOverrides works

From the PR description I added after this comment:

What about something like the OverrideQueue?
The OverrideQueue uses a unique key to loosely bound Overrides. This works because there's a finite number of MachineId's and ReportSource's and we only care about the latest. For logs / events this leads to dropping, not to mention the queue size for logs / events has much more freedom compared to the bounded machine/report uniqueness dimension.

If we don't care about dropping, this is much easier to implement similar to the OverrideQueue. Is your expectation that we only hold the latest events in the queue per machineid to push to the OTLPSink and drop the rest?

Copy link
Copy Markdown
Contributor

@kensimon kensimon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked at the code yet but this really stuck out to me, I think we ought to vendor these files rather than shelling out to curl on every build.


const OTLP_PROTO_VERSION: &str = "v1.5.0";
const OTLP_PROTO_BASE_URL: &str =
"https://raw.githubusercontent.com/open-telemetry/opentelemetry-proto";
Copy link
Copy Markdown
Contributor

@kensimon kensimon Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put these in the repo rather than downloading them as part of build.rs? I feel like builds should be as "sealed" as possible and the external dependency management should all be in one place (ie. Cargo.lock and the cargo cache)... shelling out to curl in a build.rs is super surprising behavior, especially if it's just avoid vendoring 4 proto files...

Copy link
Copy Markdown
Contributor

@kensimon kensimon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all nitpicks and can be ignored, only the curl-in-build-rs issue is a "request changes" from me.

Comment on lines +61 to +89
let sleep = tokio::time::sleep(self.flush_interval);
tokio::pin!(sleep);

loop {
tokio::select! {
_ = self.cancel.cancelled() => {
self.flush(&mut client, &mut batch).await;
break;
}
maybe_event = self.receiver.recv() => {
match maybe_event {
Some(event) => {
batch.push(event);
if batch.len() >= self.batch_size {
self.flush(&mut client, &mut batch).await;
sleep.as_mut().reset(tokio::time::Instant::now() + self.flush_interval);
}
}
None => {
self.flush(&mut client, &mut batch).await;
break;
}
}
}
_ = &mut sleep => {
if !batch.is_empty() {
self.flush(&mut client, &mut batch).await;
}
sleep.as_mut().reset(tokio::time::Instant::now() + self.flush_interval);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider using tokio::time's Interval here rather than hand-rolling one with tokio::time::sleep():

Suggested change
let sleep = tokio::time::sleep(self.flush_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
self.flush(&mut client, &mut batch).await;
break;
}
maybe_event = self.receiver.recv() => {
match maybe_event {
Some(event) => {
batch.push(event);
if batch.len() >= self.batch_size {
self.flush(&mut client, &mut batch).await;
sleep.as_mut().reset(tokio::time::Instant::now() + self.flush_interval);
}
}
None => {
self.flush(&mut client, &mut batch).await;
break;
}
}
}
_ = &mut sleep => {
if !batch.is_empty() {
self.flush(&mut client, &mut batch).await;
}
sleep.as_mut().reset(tokio::time::Instant::now() + self.flush_interval);
let mut interval = tokio::time::interval(self.flush_interval);
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
self.flush(&mut client, &mut batch).await;
break;
}
maybe_event = self.receiver.recv() => {
match maybe_event {
Some(event) => {
batch.push(event);
if batch.len() >= self.batch_size {
self.flush(&mut client, &mut batch).await;
interval.reset();
}
}
None => {
self.flush(&mut client, &mut batch).await;
break;
}
}
}
_ = interval.tick() => {
if !batch.is_empty() {
self.flush(&mut client, &mut batch).await;
}

loop {
tokio::select! {
_ = self.cancel.cancelled() => return None,
result = endpoint.connect() => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In tonic, connect() typically consumes the endpoint/connector

No copilot, no. If it consumed it, this would fail to compile. bad bot.

Comment on lines +113 to +138
loop {
tokio::select! {
_ = self.cancel.cancelled() => return None,
result = endpoint.connect() => {
match result {
Ok(channel) => {
tracing::info!(endpoint = %self.endpoint, "connected to otlp collector");
return Some(LogsServiceClient::new(channel));
}
Err(error) => {
let delay = backoff.next_delay();
tracing::warn!(
?error,
endpoint = %self.endpoint,
retry_in = ?delay,
"failed to connect to otlp collector"
);
tokio::select! {
_ = self.cancel.cancelled() => return None,
_ = tokio::time::sleep(delay) => {}
}
}
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: tokio::select! is a bit of a footgun sometimes due to cancel-safety issues... this blog post covers a lot of them.

This code doesn't have any problems, but it's nice to avoid the select altogether if you can, since it's simpler to audit select! calls if there are fewer of them:

Suggested change
loop {
tokio::select! {
_ = self.cancel.cancelled() => return None,
result = endpoint.connect() => {
match result {
Ok(channel) => {
tracing::info!(endpoint = %self.endpoint, "connected to otlp collector");
return Some(LogsServiceClient::new(channel));
}
Err(error) => {
let delay = backoff.next_delay();
tracing::warn!(
?error,
endpoint = %self.endpoint,
retry_in = ?delay,
"failed to connect to otlp collector"
);
tokio::select! {
_ = self.cancel.cancelled() => return None,
_ = tokio::time::sleep(delay) => {}
}
}
}
}
}
}
loop {
match self.cancel.run_until_cancelled(endpoint.connect()).await? {
Ok(channel) => {
tracing::info!(endpoint = %self.endpoint, "connected to otlp collector");
return Some(LogsServiceClient::new(channel));
}
Err(error) => {
let delay = backoff.next_delay();
tracing::warn!(
?error,
endpoint = %self.endpoint,
retry_in = ?delay,
"failed to connect to otlp collector"
);
self.cancel
.run_until_cancelled(tokio::time::sleep(delay))
.await?;
}
};
}

Comment on lines +185 to +188
tokio::select! {
_ = self.cancel.cancelled() => break,
_ = tokio::time::sleep(delay) => continue,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can be

Suggested change
tokio::select! {
_ = self.cancel.cancelled() => break,
_ = tokio::time::sleep(delay) => continue,
}
if self
.cancel
.run_until_cancelled(tokio::time::sleep(delay))
.await
.is_none()
{
break;
}

but if you disagree feel free to ignore, I don't feel strongly about it.

return;
}

let request = build_export_request(batch);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It looks like all code paths are calling batch.clear() before returning... maybe we should just do that here after building the request, so that we don't have to do it in multiple return paths below?

In fact, you could have build_export_request(batch) take the batch by value instead of reference, and just do .drain(..) here. Something like:

pub fn build_export_request(
    batch: impl IntoIterator<Item = (EventContext, CollectorEvent)>,
) -> ExportLogsServiceRequest {
    // ...
}

and here:

let request = build_export_request(batch.drain(..));

Then convert_event() and resource_attributes() can take an owned CollectorEvent and EventContext, and you can avoid some of the clones.

Comment on lines +477 to +483
let stream = tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}
result = collector.connect() => result,
};
Copy link
Copy Markdown
Contributor

@kensimon kensimon Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Perhaps more readable and avoids the select:

Suggested change
let stream = tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}
result = collector.connect() => result,
};
let Some(stream) = cancel_clone.run_until_cancelled(collector.connect()).await
else {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
};

Comment on lines +505 to +511
tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}
_ = tokio::time::sleep(delay) => continue,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}
_ = tokio::time::sleep(delay) => continue,
}
if cancel_clone
.run_until_cancelled(tokio::time::sleep(delay))
.await
.is_none()
{
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}

Comment on lines +516 to +527
let item = tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
tracing::info!(
collector_type,
endpoint = ?endpoint.addr,
"streaming collector shutting down"
);
return;
}
item = stream.next() => item,
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
let item = tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
tracing::info!(
collector_type,
endpoint = ?endpoint.addr,
"streaming collector shutting down"
);
return;
}
item = stream.next() => item,
};
let Some(item) = cancel_clone.run_until_cancelled(stream.next()).await else {
metrics.connection_state.set(STREAM_STATE_CLOSED);
tracing::info!(
collector_type,
endpoint = ?endpoint.addr,
"streaming collector shutting down"
);
return;
};

Comment on lines +559 to +565
tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}
_ = tokio::time::sleep(delay) => {}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
tokio::select! {
_ = cancel_clone.cancelled() => {
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}
_ = tokio::time::sleep(delay) => {}
}
if cancel_clone
.run_until_cancelled(tokio::time::sleep(delay))
.await
.is_none()
{
metrics.connection_state.set(STREAM_STATE_CLOSED);
return;
}

const STREAM_STATE_CLOSED: f64 = 2.0;

pub struct StreamMetrics {
connection_state: Gauge,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This can be an IntGauge and then you don't need the awkwardness of representing discrete states as an f64 (they can become integers at least.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants