Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .pi/todos/58e24853.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
id: "58e24853"
title: "Integrate turmoil with feature-gated type replacement"
tags:
- "turmoil"
- "feature-gate"
- "network-simulation"
status: "completed"
created_at: "2026-04-16T08:02:46.705Z"
---
Add turmoil as a feature-gated dependency that replaces tokio::net types with turmoil::net types when enabled.

## Tasks
- [x] Add turmoil to workspace dependencies
- [x] Add turmoil feature to msg-transport Cargo.toml
- [x] Create type alias module for feature-gated type resolution
- [x] Update TCP transport to use type aliases with channel-based accept for turmoil
- [x] QUIC uses real UDP sockets (documented limitation)
- [x] IPC uses Unix sockets which turmoil doesn't support (expected)
- [x] Test the integration with cargo check - all builds pass
4 changes: 4 additions & 0 deletions .pi/todos/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"gc": true,
"gcDays": 7
}
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tokio = { version = "1", features = [
"io-util",
"macros",
"fs",
"sync",
] }
tokio-util = { version = "0.7", features = ["codec"] }
tokio-stream = { version = "0.1", features = ["sync"] }
Expand Down Expand Up @@ -77,6 +78,10 @@ quinn = "0.11.9"
rcgen = "0.14"
openssl = { version = "0.10" }

# turmoil simulation
# Note: version must match tokio version for compatibility
turmoil = { version = "0.7" }

# benchmarking & profiling
criterion = { version = "0.5", features = ["async_tokio"] }
pprof = { version = "0.15", features = ["flamegraph", "criterion"] }
Expand Down
8 changes: 6 additions & 2 deletions libmsg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ tikv-jemallocator = { version = "0.6.1", features = ["profiling"] }

[features]
default = []
quic = ["msg-transport/quic"]
tcp-tls = ["msg-transport/tcp-tls"]
quic = ["msg-transport/quic", "msg-socket/quic"]
tcp-tls = ["msg-transport/tcp-tls", "msg-socket/tcp-tls"]
# Enables turmoil-based network simulation across the transport and socket
# layers. TCP and TCP-TLS traffic both flow through `turmoil::net`, so TLS
# integration tests can run inside a deterministic simulation.
turmoil = ["msg-transport/turmoil", "msg-socket/turmoil"]

[[bench]]
name = "reqrep"
Expand Down
15 changes: 14 additions & 1 deletion msg-socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,25 @@ tracing.workspace = true
tokio-stream.workspace = true
parking_lot.workspace = true
arc-swap.workspace = true
turmoil = { workspace = true, optional = true }

derive_more = { workspace = true, features = ["deref"] }

[dev-dependencies]
rand.workspace = true
msg-transport = { workspace = true, features = ["quic", "tcp-tls"] }
# Transport features are forwarded through this crate's own feature flags below,
# so tests pick them up via `--features` rather than being hardcoded here.
msg-transport.workspace = true
openssl.workspace = true
turmoil.workspace = true

tracing-subscriber = "0.3"

[features]
default = []
# Transport forwarding. Downstream users (and this crate's integration tests)
# opt into each transport via these flags without depending on `msg-transport`
# directly. `turmoil` composes with `tcp-tls` and `quic`.
tcp-tls = ["msg-transport/tcp-tls"]
quic = ["msg-transport/quic"]
turmoil = ["dep:turmoil", "msg-transport/turmoil"]
2 changes: 2 additions & 0 deletions msg-socket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub use sub::*;
mod connection;
pub use connection::*;

mod resolve;

/// The default buffer size for a socket.
pub const DEFAULT_BUFFER_SIZE: usize = 8192;

Expand Down
8 changes: 6 additions & 2 deletions msg-socket/src/pub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,14 @@ impl<S: Default> Default for SocketState<S> {
}
}

#[cfg(test)]
#[cfg(all(test, not(feature = "turmoil")))]
mod tests {
use std::time::Duration;

use futures::StreamExt;
use msg_transport::{quic::Quic, tcp::Tcp};
#[cfg(feature = "quic")]
use msg_transport::quic::Quic;
use msg_transport::tcp::Tcp;
use msg_wire::compression::GzipCompressor;
use tracing::info;

Expand Down Expand Up @@ -291,6 +293,7 @@ mod tests {
assert_eq!("WORLD", msg.payload());
}

#[cfg(feature = "quic")]
#[tokio::test]
async fn pubsub_auth_quic() {
let _ = tracing_subscriber::fmt::try_init();
Expand Down Expand Up @@ -408,6 +411,7 @@ mod tests {
assert_eq!("WORLD", msg.payload());
}

#[cfg(feature = "quic")]
#[tokio::test]
async fn pubsub_durable_quic() {
let _ = tracing_subscriber::fmt::try_init();
Expand Down
11 changes: 5 additions & 6 deletions msg-socket/src/pub/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use arc_swap::Guard;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use tokio::{
net::{ToSocketAddrs, lookup_host},
sync::broadcast,
task::JoinSet,
};
use tokio::{sync::broadcast, task::JoinSet};
use tracing::{debug, trace, warn};

use super::{PubError, PubMessage, PubOptions, SocketState, driver::PubDriver, stats::PubStats};
use crate::{ConnectionHook, ConnectionHookErased};
use crate::{
ConnectionHook, ConnectionHookErased,
resolve::{ToSocketAddrs, lookup_host},
};

use msg_transport::{Address, Transport};
use msg_wire::compression::Compressor;
Expand Down
2 changes: 1 addition & 1 deletion msg-socket/src/rep/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<A: Address> Request<A> {
}
}

#[cfg(test)]
#[cfg(all(test, not(feature = "turmoil")))]
mod tests {
use std::{net::SocketAddr, time::Duration};

Expand Down
2 changes: 1 addition & 1 deletion msg-socket/src/rep/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{

use futures::{Stream, stream::FuturesUnordered};
use tokio::{
net::{ToSocketAddrs, lookup_host},
sync::mpsc,
task::{JoinHandle, JoinSet},
};
Expand All @@ -18,6 +17,7 @@ use tracing::{debug, warn};
use crate::{
ConnectionHook, ConnectionHookErased, DEFAULT_QUEUE_SIZE, RepOptions, Request,
rep::{RepError, SocketState, driver::RepDriver},
resolve::{ToSocketAddrs, lookup_host},
};

use msg_transport::{Address, Transport};
Expand Down
6 changes: 2 additions & 4 deletions msg-socket/src/req/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::{marker::PhantomData, net::SocketAddr, path::PathBuf, sync::Arc};
use arc_swap::Guard;
use bytes::Bytes;
use rustc_hash::FxHashMap;
use tokio::{
net::{ToSocketAddrs, lookup_host},
sync::{mpsc, mpsc::error::TrySendError, oneshot},
};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot};
use tokio_util::codec::Framed;

use msg_common::span::WithSpan;
Expand All @@ -23,6 +20,7 @@ use crate::{
driver::ReqDriver,
stats::ReqStats,
},
resolve::{ToSocketAddrs, lookup_host},
stats::SocketStats,
};
use std::sync::atomic::Ordering;
Expand Down
26 changes: 26 additions & 0 deletions msg-socket/src/resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::{io, net::SocketAddr};

#[cfg(not(feature = "turmoil"))]
pub(crate) use tokio::net::ToSocketAddrs;
#[cfg(feature = "turmoil")]
pub(crate) use turmoil::ToSocketAddrs;

#[cfg(not(feature = "turmoil"))]
pub(crate) async fn lookup_host(
addr: impl ToSocketAddrs,
) -> io::Result<impl Iterator<Item = SocketAddr>> {
tokio::net::lookup_host(addr).await
}

#[cfg(feature = "turmoil")]
pub(crate) async fn lookup_host(
addr: impl ToSocketAddrs,
) -> io::Result<impl Iterator<Item = SocketAddr>> {
if !turmoil::in_simulation() {
return Err(io::Error::other(
"hostname resolution under the `turmoil` feature requires a running turmoil simulation",
));
}

turmoil::net::lookup_host(addr).await
}
2 changes: 1 addition & 1 deletion msg-socket/src/sub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<A: Address> Default for SocketState<A> {
}
}

#[cfg(test)]
#[cfg(all(test, not(feature = "turmoil")))]
mod tests {
use std::net::SocketAddr;

Expand Down
7 changes: 2 additions & 5 deletions msg-socket/src/sub/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ use std::{

use futures::Stream;
use rustc_hash::FxHashMap;
use tokio::{
net::{ToSocketAddrs, lookup_host},
sync::mpsc,
task::JoinSet,
};
use tokio::{sync::mpsc, task::JoinSet};

use msg_common::{IpAddrExt, JoinMap};
use msg_transport::{Address, Transport};

use crate::{
ConnectionHook, ConnectionHookErased,
resolve::{ToSocketAddrs, lookup_host},
sub::{
Command, DEFAULT_BUFFER_SIZE, PubMessage, SocketState, SubDriver, SubError, SubOptions,
stats::SubStats,
Expand Down
4 changes: 4 additions & 0 deletions msg-socket/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#[cfg(not(feature = "turmoil"))]
mod pubsub;
#[cfg(not(feature = "turmoil"))]
mod reqrep;
#[cfg(feature = "turmoil")]
mod turmoil;

fn main() {}
Loading
Loading