Skip to content

feat(audit): stream proxy network events to NDJSON live#831

Open
sikor111 wants to merge 3 commits into
always-further:mainfrom
sikor111:feat/streaming-network-audit
Open

feat(audit): stream proxy network events to NDJSON live#831
sikor111 wants to merge 3 commits into
always-further:mainfrom
sikor111:feat/streaming-network-audit

Conversation

@sikor111

@sikor111 sikor111 commented May 5, 2026

Copy link
Copy Markdown

Closes #798

Disclosure: This pull request is being opened by Claude (an AI coding agent) operating on behalf of @sikor111, per the Coding Agent Contribution Policy in CLAUDE.md.

Summary

Network audit events from nono-proxy were buffered in Arc<Mutex<Vec<NetworkAuditEvent>>> and only persisted to audit-events.ndjson at session exit. That meant any crash/OOM/hard kill of the supervisor lost every event of the session, long sessions silently dropped events past MAX_AUDIT_EVENTS = 4096, and operators could not tail -f to watch decisions live.

This PR routes each event through an optional NetworkAuditSink as it occurs. The in-memory Vec is kept solely to populate SessionMetadata.network_events at exit; the durable file is what the sink writes.

Approach

  • nono-proxy::audit: new NetworkAuditSink trait. SharedAuditLog evolves to Arc<AuditLog> holding the existing buffer, OnceLock<Arc<dyn NetworkAuditSink>>, and AtomicBool close flag. push_event streams first, then buffers; respects the close flag.
  • nono-proxy::server::ProxyHandle: set_audit_streaming_sink, audit_streaming_active, close_audit_log.
  • nono-cli::audit_integrity: RecorderStreamingSink wraps Arc<Mutex<AuditRecorder>> and forwards each event through record_network_event. Errors are logged via tracing::warn, never propagated — audit recording must not break an in-flight network request.
  • nono-cli::supervised_runtime: attaches the sink after the recorder is created. Recorder is now Arc<Mutex<...>> so the sink can hold a clone.
  • nono-cli::rollback_runtime::finalize_supervised_exit: closes the audit log before draining (prevents post-finalize events from extending the file past the committed Merkle root) and skips the per-event recorder push when streaming was active (no duplicates).

No on-disk format change. No new CLI flag — default-on whenever the audit recorder is present. Standalone proxy use without a sink is unchanged.

Files consulted

  • crates/nono-proxy/src/audit.rsSharedAuditLog, push_event, MAX_AUDIT_EVENTS
  • crates/nono-proxy/src/server.rsProxyHandle, drain_audit_events
  • crates/nono-cli/src/audit_integrity.rsAuditRecorder::record_network_event, append_event, verify_audit_log
  • crates/nono-cli/src/supervised_runtime.rs — recorder construction
  • crates/nono-cli/src/rollback_runtime.rsfinalize_supervised_exit

Test plan

  • CI (make ci) green
  • Manual: long-running supervised session, kill -9 mid-flight, confirm audit-events.ndjson contains pre-kill events
  • Manual: tail -f audit-events.ndjson during a session shows events live
  • Manual: verify_audit_log still succeeds end-of-session (no events past Merkle root)

Agent Compliance Check

  • I am not prohibited from contributing under this policy
  • An issue already exists (Stream proxy network audit events to NDJSON as they occur #798)
  • I disclosed that I am an agent in the issue discussion
  • I described my intent and approach in the issue discussion
  • I reviewed repository coding and security rules for the affected area
  • I provided required attribution for reused or adapted code
  • I did not use forbidden patterns such as unwrap/expect
  • I used NonoError where required
  • I validated and canonicalized all relevant paths (no new path handling in this PR)
  • This PR matches the approved or disclosed issue scope

Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com

@sikor111 sikor111 force-pushed the feat/streaming-network-audit branch from c1e381c to c6e2638 Compare May 5, 2026 06:50

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements live streaming of network audit events to prevent data loss during crashes and avoid in-memory buffer limits. It introduces a NetworkAuditSink trait and a RecorderStreamingSink implementation, along with logic to close the audit log to ensure Merkle root integrity. Feedback was provided regarding the synchronous file I/O in the record implementation, which may block asynchronous request handlers and should be offloaded to a background task.

Comment on lines +190 to +206
impl nono_proxy::audit::NetworkAuditSink for RecorderStreamingSink {
fn record(&self, event: &NetworkAuditEvent) {
match self.recorder.lock() {
Ok(mut recorder) => {
if let Err(e) = recorder.record_network_event(event.clone()) {
tracing::warn!("Failed to stream network audit event: {}", e);
}
}
Err(e) => {
tracing::warn!(
"Audit recorder mutex poisoned while streaming network event: {}",
e
);
}
}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The record implementation performs synchronous file I/O and a flush() call (via record_network_event -> append_event) while holding a Mutex lock on the AuditRecorder. Since this is called from the proxy's asynchronous request handlers, it may introduce significant latency and become a performance bottleneck under high network load. Consider offloading the recording to a background task or using an asynchronous channel to avoid blocking the proxy's worker threads.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this — pushing back on implementing it now, with reasoning:

  1. Not a new pattern. push_event already took a std::sync::Mutex from the same async handlers before this PR (crates/nono-proxy/src/audit.rs pre-PR). The sink call sits in front of that existing lock; the threading model is unchanged.

  2. Granularity is per-request, not per-byte. One event = one proxy decision. The proxy is already doing the TLS handshake and body relay around each event; a ~1–2 KB NDJSON line + flush() is on the order of 100 µs on any modern SSD, well below the network round-trip the proxy already adds.

  3. Ordering is load-bearing. append_event computes chain_hash = H(prev_chain ‖ leaf_hash) and writes records strictly serially — verify_audit_log walks that chain. spawn_blocking per event still serializes on the recorder mutex (plus task-hop overhead). A single-writer task + channel just relocates the lock; it does not remove it.

  4. A channel reintroduces the bug this PR fixes. Buffered events lost on crash is exactly the loss-on-crash gap Stream proxy network audit events to NDJSON as they occur #798 set out to close. Preserving durability would require a bounded channel with backpressure (producer blocks when full = the same blocking, just renamed) and an explicit drain in close_audit_log. Net: more complexity for a theoretical win.

  5. The issue body called this out explicitly: "if proxy throughput becomes a bottleneck this should move to spawn_blocking". The plan is to defer until benchmark data shows the sink dominates, then redesign with ordering + durability constraints in mind — not speculatively.

Happy to revisit if you have a profile/bench showing the sink is the bottleneck under realistic load.

@lukehinds lukehinds left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sikor111 , only concern is this introduces sync to async tokio threads. It would then be a hot path called for every proxied network event care of the Mutex<AuditRecorder> lock + the disk write via RecorderStreamingSink::record before returning. As push_event is called from an async Tokio task (lhandle_connection), that blocking I/O will occupy an async worker thread for the duration of each write. This would impact streaming downloads (which we plan to introduce), as each proxied connection generates at least one audit event, and that disk write now sits on the hot path. It could add measurable latency per connection under a busyu agent.

One way to address is to offload the sink call in push_event with tokio::task::spawn_blocking - but this may need push_event to become async or to fire-and-forget

@sikor111 sikor111 force-pushed the feat/streaming-network-audit branch from c6e2638 to 7ebae11 Compare May 11, 2026 18:45
sikor111 added a commit to sikor111/nono that referenced this pull request May 11, 2026
Addresses maintainer feedback on always-further#831: `RecorderStreamingSink::record`
took `Mutex<AuditRecorder>` and called `file.write_all + flush` while
on a Tokio async worker thread. Every proxied connection went through
that critical section, so concurrent traffic serialized on one lock
and each async worker was blocked for the duration of a sync disk
write — measurable latency under a busy agent and a blocker for the
planned streaming downloads.

Move the write off the hot path:

- `NetworkAuditSink` gains a default `flush()` hook so sinks that
  buffer asynchronously can drain on demand.
- `AuditLog::close()` invokes `sink.flush()` after flipping `closed`,
  so any further append by `finalize_supervised_exit` (e.g.
  `session_ended`) sees the queue fully drained.
- `RecorderStreamingSink` is rebuilt around a bounded
  `sync_channel(16_384)` drained by a dedicated `nono-audit-writer`
  OS thread. `record()` is a non-blocking `try_send` (lock-free hot
  path); overflow increments a counter and warns at exponential
  intervals. The writer is the sole holder of the recorder mutex on
  the audit path, so concurrent proxy tasks no longer contend.

Order preservation is mandatory because `AuditRecorder::append_event`
builds a hash chain (`prev_chain -> chain_hash`) and a Merkle
accumulator over `leaf_hashes`. A single-consumer channel preserves
FIFO, so `verify_audit_log` continues to pass — fire-and-forget
`spawn_blocking` would have raced and broken the chain. A new
concurrent-writers test exercises 8 threads x 100 events and asserts
chain verification succeeds end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Maciej Sikora <itmaciej.sikora@gmail.com>
@sikor111

Copy link
Copy Markdown
Author

Thanks for the review @lukehinds. You're right that the original RecorderStreamingSink::record was sitting on a Tokio worker through both a Mutex<AuditRecorder> acquire and the sync file.write_all + flush. With L7 interception now generating extra audit events per intercepted CONNECT (per 149abde), that critical section would have been even hotter than it was at the time of your comment.

I considered the tokio::task::spawn_blocking route you suggested but ruled out the fire-and-forget variant: AuditRecorder::append_event builds a per-event hash chain (prev_chain → chain_hash) and a Merkle accumulator over leaf_hashes. Racing spawn_blocking tasks would reorder writes and break verify_audit_log. The awaited variant would fix worker-thread blocking but leave the Mutex<AuditRecorder> contention on the hot path under high concurrency.

Instead I rebuilt the sink around a bounded sync_channel(16_384) drained by a single dedicated OS thread (nono-audit-writer). record() is now a lock-free try_send on the hot path; the writer is the sole holder of the recorder mutex, so the FIFO order required by the hash chain is preserved by construction. Overflow drops with a rate-limited warn! and a dropped_events counter that is logged on Drop.

While doing this I noticed a subtler race in the original PR: a producer that has already passed is_closed() in push_event can still enqueue an event after close() returns. That event would land in the file after record_session_ended, extending the chain past the Merkle root committed to SessionMetadata. The trait now documents that flush() may seal the sink; the writer flips a sticky done flag during Flush processing and silently drops post-flush events. A new test (recorder_streaming_sink_drops_post_flush_events) pins the recorder length across a 50 ms sleep after flush.

If the writer thread itself fails to spawn, supervised_runtime now warn!s and falls back to the legacy "drain at `session_ended`" path rather than aborting the session — streaming durability stays best-effort.

Coverage added (crates/nono-cli/src/audit_integrity.rs):

  • recorder_streaming_sink_preserves_order_under_concurrent_writers — 8 threads × 100 events, then verifies the chain end-to-end.
  • recorder_streaming_sink_drops_events_when_queue_full — fills a capped queue while the writer is parked on the recorder; asserts `recorded + dropped == burst`.
  • recorder_streaming_sink_drops_post_flush_events — seal regression test.
  • recorder_streaming_sink_flush_is_idempotent — extra flushes are no-ops.
  • recorder_streaming_sink_drop_joins_writer_cleanly — `Drop` joins the writer and recorder `Arc` strong count returns to 1.

Rebased onto main (latest f77e0e3), make ci green, 2049 tests pass. Ready for another look when you have a moment.

sikor111 and others added 3 commits May 11, 2026 22:07
Network audit events from nono-proxy were buffered in
Arc<Mutex<Vec<NetworkAuditEvent>>> and only persisted to
audit-events.ndjson at session exit. This dropped all events on
crash, silently lost events past MAX_AUDIT_EVENTS = 4096 on long
sessions, and prevented live observability via tail -f.

Route each event through an optional NetworkAuditSink as it occurs.
The in-memory Vec is kept only to populate
SessionMetadata.network_events at exit; the durable file is what
the sink writes.

- nono-proxy::audit: new NetworkAuditSink trait; SharedAuditLog
  evolves to Arc<AuditLog> holding the buffer, an
  OnceLock<Arc<dyn NetworkAuditSink>>, and an AtomicBool close
  flag. push_event streams first, then buffers.
- nono-proxy::server::ProxyHandle: set_audit_streaming_sink,
  audit_streaming_active, close_audit_log.
- nono-cli::audit_integrity: RecorderStreamingSink wraps
  Arc<Mutex<AuditRecorder>>, forwards each event through
  record_network_event. Errors are logged via tracing::warn,
  never propagated; audit must not break network requests.
- nono-cli::supervised_runtime: attach the sink after the recorder
  is created. Recorder is now Arc<Mutex<...>> so the sink can hold
  a clone.
- nono-cli::rollback_runtime::finalize_supervised_exit: close the
  audit log before draining (prevents post-finalize events from
  extending the file past the committed Merkle root) and skip the
  per-event recorder push when streaming was active (no duplicates).

No on-disk format change. No new CLI flag — default-on whenever the
audit recorder is present. Standalone proxy use without a sink is
unchanged.

Refs: always-further#798

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Maciej Sikora <itmaciej.sikora@gmail.com>
Addresses maintainer feedback on always-further#831: `RecorderStreamingSink::record`
took `Mutex<AuditRecorder>` and called `file.write_all + flush` while
on a Tokio async worker thread. Every proxied connection went through
that critical section, so concurrent traffic serialized on one lock
and each async worker was blocked for the duration of a sync disk
write — measurable latency under a busy agent and a blocker for the
planned streaming downloads.

Move the write off the hot path:

- `NetworkAuditSink` gains a default `flush()` hook so sinks that
  buffer asynchronously can drain on demand.
- `AuditLog::close()` invokes `sink.flush()` after flipping `closed`,
  so any further append by `finalize_supervised_exit` (e.g.
  `session_ended`) sees the queue fully drained.
- `RecorderStreamingSink` is rebuilt around a bounded
  `sync_channel(16_384)` drained by a dedicated `nono-audit-writer`
  OS thread. `record()` is a non-blocking `try_send` (lock-free hot
  path); overflow increments a counter and warns at exponential
  intervals. The writer is the sole holder of the recorder mutex on
  the audit path, so concurrent proxy tasks no longer contend.

Order preservation is mandatory because `AuditRecorder::append_event`
builds a hash chain (`prev_chain -> chain_hash`) and a Merkle
accumulator over `leaf_hashes`. A single-consumer channel preserves
FIFO, so `verify_audit_log` continues to pass — fire-and-forget
`spawn_blocking` would have raced and broken the chain. A new
concurrent-writers test exercises 8 threads x 100 events and asserts
chain verification succeeds end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Maciej Sikora <itmaciej.sikora@gmail.com>
Addresses open review items on the streaming-audit sink:

Race fix: post-flush events could previously slip past `AuditLog::close`
and land in `audit-events.ndjson` after `record_session_ended`, extending
the file past the Merkle root committed to `SessionMetadata`. The race
window is small but real: a producer can pass `is_closed()` and only
reach `sink.record()` after `close()` has flushed and returned. The
writer thread now sets a sticky `done` flag while processing the
`Flush` command and silently drops any subsequent `Event`, so the seal
holds regardless of producer/close interleaving. `verify_audit_log`
keeps verifying end-to-end.

Lifecycle and observability:

- `RecorderStreamingSink` exposes `dropped_events()` and logs the final
  count on `Drop`, so the operator never silently loses count of
  queue-full or writer-gone drops.
- `Disconnected` send errors now rate-limit their `warn!` (exponential
  backoff) to match the `Full` path; a crashed writer no longer floods
  the log.
- `flush()` is idempotent: extra calls re-set `done`, ack, and return —
  exercised by a new test.
- If the writer thread itself fails to spawn, `supervised_runtime`
  degrades to the legacy "drain at session_ended" path with a `warn!`
  instead of failing the run. Streaming durability is best-effort.

Tests (+4):

- queue-full overflow drops events and bumps the counter; total
  (recorded + dropped) equals the burst submitted.
- post-flush events are silently dropped and the recorder length stays
  pinned across a sleep that would otherwise let the writer append.
- `Drop` on the sink joins the writer cleanly: recorder `Arc` strong
  count drops to 1 and the recorder remains fully usable.
- `flush()` is idempotent and the seal stays sticky across repeated
  flushes.

Docstrings: `NetworkAuditSink::flush`, `AuditLog::close`, and
`RecorderStreamingSink` now spell out the seal contract and the
`close`/`flush` race that motivates it. `WriterCmd` variants have
docstrings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Maciej Sikora <itmaciej.sikora@gmail.com>
@sikor111 sikor111 force-pushed the feat/streaming-network-audit branch from 9faf7c8 to e7d5c0a Compare May 11, 2026 20:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Stream proxy network audit events to NDJSON as they occur

2 participants