Skip to content

Commit 0fdeecb

Browse files
committed
fix: remove 12 production unwrap() calls for JSF compliance
JSF AV Rule 208 adaptation: Replace panicking unwrap() with proper error handling in I/O operations that can legitimately fail. Changes: - scheduler.rs: Handle missing core affinity gracefully with fallback to non-pinned workers instead of panicking - net/tcp.rs: Propagate socket timeout errors with ? instead of unwrap() - net/udp.rs: Propagate socket timeout errors with ? instead of unwrap() - io/sys/unix/net/*: Update UdpRecvFrom, UdpSendTo, UnixRecvFrom, UnixSendTo to return io::Result<Self> and propagate timeout errors - io/sys/windows/net/*: Same treatment for Windows UDP operations Production unwrap() count reduced from 52 to 40. Remaining unwraps are mostly mutex lock operations (idiomatic for handling poisoned locks) and internal invariant assertions. All tests pass (297 passed, 3 skipped for known flaky behavior) All clippy checks pass with strict JSF-aligned lints
1 parent bec672f commit 0fdeecb

10 files changed

Lines changed: 64 additions & 42 deletions

File tree

src/io/sys/unix/net/udp_recv_from.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ pub struct UdpRecvFrom<'a> {
2222
}
2323

2424
impl<'a> UdpRecvFrom<'a> {
25-
pub fn new(socket: &'a UdpSocket, buf: &'a mut [u8]) -> Self {
26-
UdpRecvFrom {
25+
pub fn new(socket: &'a UdpSocket, buf: &'a mut [u8]) -> io::Result<Self> {
26+
Ok(UdpRecvFrom {
2727
io_data: socket.as_io_data(),
2828
buf,
2929
socket: socket.inner(),
3030
#[cfg(feature = "io_timeout")]
31-
timeout: socket.read_timeout().unwrap(),
31+
timeout: socket.read_timeout()?,
3232
is_coroutine: is_coroutine(),
33-
}
33+
})
3434
}
3535

3636
pub fn done(&mut self) -> io::Result<(usize, SocketAddr)> {

src/io/sys/unix/net/udp_send_to.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl<'a, A: ToSocketAddrs> UdpSendTo<'a, A> {
2828
socket: socket.inner(),
2929
addr,
3030
#[cfg(feature = "io_timeout")]
31-
timeout: socket.write_timeout().unwrap(),
31+
timeout: socket.write_timeout()?,
3232
is_coroutine: is_coroutine(),
3333
})
3434
}

src/io/sys/unix/net/unix_recv_from.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ pub struct UnixRecvFrom<'a> {
2222
}
2323

2424
impl<'a> UnixRecvFrom<'a> {
25-
pub fn new(socket: &'a UnixDatagram, buf: &'a mut [u8]) -> Self {
26-
UnixRecvFrom {
25+
pub fn new(socket: &'a UnixDatagram, buf: &'a mut [u8]) -> io::Result<Self> {
26+
Ok(UnixRecvFrom {
2727
io_data: socket.0.as_io_data(),
2828
buf,
2929
socket: socket.0.inner(),
3030
#[cfg(feature = "io_timeout")]
31-
timeout: socket.0.read_timeout().unwrap(),
31+
timeout: socket.0.read_timeout()?,
3232
is_coroutine: is_coroutine(),
33-
}
33+
})
3434
}
3535

3636
pub fn done(&mut self) -> io::Result<(usize, SocketAddr)> {

src/io/sys/unix/net/unix_send_to.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl<'a> UnixSendTo<'a> {
2828
socket: socket.0.inner(),
2929
path,
3030
#[cfg(feature = "io_timeout")]
31-
timeout: socket.write_timeout().unwrap(),
31+
timeout: socket.write_timeout()?,
3232
is_coroutine: is_coroutine(),
3333
})
3434
}

src/io/sys/windows/net/udp_recv_from.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ pub struct UdpRecvFrom<'a> {
2828
}
2929

3030
impl<'a> UdpRecvFrom<'a> {
31-
pub fn new(socket: &'a UdpSocket, buf: &'a mut [u8]) -> Self {
32-
UdpRecvFrom {
31+
pub fn new(socket: &'a UdpSocket, buf: &'a mut [u8]) -> io::Result<Self> {
32+
Ok(UdpRecvFrom {
3333
io_data: EventData::new(socket.as_raw_socket() as HANDLE),
3434
buf,
3535
socket: socket.inner(),
3636
addr: SocketAddrBuf::new(),
3737
#[cfg(feature = "io_timeout")]
38-
timeout: socket.read_timeout().unwrap(),
38+
timeout: socket.read_timeout()?,
3939
can_drop: DelayDrop::new(),
4040
is_coroutine: is_coroutine(),
41-
}
41+
})
4242
}
4343

4444
pub fn done(&mut self) -> io::Result<(usize, SocketAddr)> {

src/io/sys/windows/net/udp_send_to.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,23 @@ impl<'a> UdpSendTo<'a> {
2727
buf: &'a [u8],
2828
addr: A,
2929
) -> io::Result<Self> {
30-
addr.to_socket_addrs()?
30+
let resolved_addr = addr
31+
.to_socket_addrs()?
3132
.next()
32-
.ok_or_else(|| io::Error::other("no socket addresses resolved"))
33-
.map(|addr| UdpSendTo {
34-
io_data: EventData::new(socket.as_raw_socket() as HANDLE),
35-
buf,
36-
socket: socket.inner(),
37-
addr,
38-
#[cfg(feature = "io_timeout")]
39-
timeout: socket.write_timeout().unwrap(),
40-
is_coroutine: is_coroutine(),
41-
})
33+
.ok_or_else(|| io::Error::other("no socket addresses resolved"))?;
34+
35+
#[cfg(feature = "io_timeout")]
36+
let timeout = socket.write_timeout()?;
37+
38+
Ok(UdpSendTo {
39+
io_data: EventData::new(socket.as_raw_socket() as HANDLE),
40+
buf,
41+
socket: socket.inner(),
42+
addr: resolved_addr,
43+
#[cfg(feature = "io_timeout")]
44+
timeout,
45+
is_coroutine: is_coroutine(),
46+
})
4247
}
4348

4449
pub fn done(&mut self) -> io::Result<usize> {

src/net/tcp.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ impl TcpStream {
100100
pub fn try_clone(&self) -> io::Result<TcpStream> {
101101
let s = self.sys.try_clone().and_then(TcpStream::new)?;
102102
#[cfg(feature = "io_timeout")]
103-
s.set_read_timeout(self.read_timeout.get()).unwrap();
103+
s.set_read_timeout(self.read_timeout.get())?;
104104
#[cfg(feature = "io_timeout")]
105-
s.set_write_timeout(self.write_timeout.get()).unwrap();
105+
s.set_write_timeout(self.write_timeout.get())?;
106106
Ok(s)
107107
}
108108

@@ -367,7 +367,9 @@ impl TcpListener {
367367
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
368368
use socket2::{Domain, Socket, Type};
369369
let mut addrs = addr.to_socket_addrs()?;
370-
let addr = addrs.next().unwrap();
370+
let addr = addrs.next().ok_or_else(|| {
371+
io::Error::new(io::ErrorKind::InvalidInput, "no addresses resolved")
372+
})?;
371373
let listener = match &addr {
372374
SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
373375
SocketAddr::V6(_) => Socket::new(Domain::IPV6, Type::STREAM, None)?,

src/net/udp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ impl UdpSocket {
5858
pub fn try_clone(&self) -> io::Result<UdpSocket> {
5959
let s = self.sys.try_clone().and_then(UdpSocket::new)?;
6060
#[cfg(feature = "io_timeout")]
61-
s.set_read_timeout(self.read_timeout.get()).unwrap();
61+
s.set_read_timeout(self.read_timeout.get())?;
6262
#[cfg(feature = "io_timeout")]
63-
s.set_write_timeout(self.write_timeout.get()).unwrap();
63+
s.set_write_timeout(self.write_timeout.get())?;
6464
Ok(s)
6565
}
6666

@@ -123,7 +123,7 @@ impl UdpSocket {
123123
}
124124
}
125125

126-
let mut reader = net_impl::UdpRecvFrom::new(self, buf);
126+
let mut reader = net_impl::UdpRecvFrom::new(self, buf)?;
127127
yield_with_io(&reader, reader.is_coroutine);
128128
reader.done()
129129
}

src/os/unix/net.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ impl UnixDatagram {
819819
}
820820
}
821821

822-
let mut reader = net_impl::UnixRecvFrom::new(self, buf);
822+
let mut reader = net_impl::UnixRecvFrom::new(self, buf)?;
823823
yield_with_io(&reader, reader.is_coroutine);
824824
reader.done()
825825
}

src/scheduler.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,32 @@ fn init_scheduler() {
6060
s.timer_thread.run(&timer_event_handler);
6161
});
6262

63-
let core_ids = core_affinity::get_core_ids().unwrap();
64-
let pin_cores = config().get_worker_pin();
63+
let core_ids = core_affinity::get_core_ids();
64+
let pin_cores = config().get_worker_pin() && core_ids.is_some();
65+
66+
if pin_cores && core_ids.is_none() {
67+
eprintln!("[may] Warning: Core affinity not available on this system, worker pinning disabled");
68+
}
69+
6570
// io event loop thread
66-
for (id, core) in (0..workers).zip(core_ids.into_iter().cycle()) {
67-
thread::spawn(move || {
68-
if pin_cores {
69-
core_affinity::set_for_current(core);
70-
}
71-
let s = unsafe { &*SCHED };
72-
s.event_loop.run(id);
73-
});
71+
if let Some(cores) = core_ids {
72+
for (id, core) in (0..workers).zip(cores.into_iter().cycle()) {
73+
thread::spawn(move || {
74+
if pin_cores {
75+
core_affinity::set_for_current(core);
76+
}
77+
let s = unsafe { &*SCHED };
78+
s.event_loop.run(id);
79+
});
80+
}
81+
} else {
82+
// Fallback: spawn workers without core pinning
83+
for id in 0..workers {
84+
thread::spawn(move || {
85+
let s = unsafe { &*SCHED };
86+
s.event_loop.run(id);
87+
});
88+
}
7489
}
7590
}
7691

0 commit comments

Comments
 (0)