Skip to content

Commit e05c3c0

Browse files
committed
Refactor Emitter to use Mutex for callbacks management
1 parent ae6c258 commit e05c3c0

1 file changed

Lines changed: 21 additions & 11 deletions

File tree

src/lib.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,36 @@ use napi::{
99
};
1010
use napi_derive::napi;
1111
use nusb::{hotplug::HotplugEvent, MaybeFuture};
12-
use std::sync::Arc;
13-
use tokio::{sync::RwLock, task::JoinHandle};
12+
use std::sync::{Arc, Mutex, MutexGuard};
13+
use tokio::task::JoinHandle;
1414
use webusb_device::{run_blocking, UsbDevice};
1515

1616
struct Callbacks {
1717
attach: Option<ThreadsafeFunction<UsbDevice, (), UsbDevice, napi::Status, false>>,
1818
detach: Option<ThreadsafeFunction<String, (), String, napi::Status, false>>,
1919
}
2020

21+
fn callbacks_guard(callbacks: &Mutex<Callbacks>) -> MutexGuard<'_, Callbacks> {
22+
callbacks
23+
.lock()
24+
.unwrap_or_else(|poisoned| poisoned.into_inner())
25+
}
26+
2127
#[napi]
2228
pub struct Emitter {
23-
callbacks: Arc<RwLock<Callbacks>>,
29+
callbacks: Arc<Mutex<Callbacks>>,
2430
watch_task: Option<JoinHandle<()>>,
2531
}
2632

2733
#[napi]
2834
impl Emitter {
35+
fn callbacks(&self) -> MutexGuard<'_, Callbacks> {
36+
callbacks_guard(&self.callbacks)
37+
}
38+
2939
#[napi(constructor)]
3040
pub fn new() -> Self {
31-
let callbacks = Arc::new(RwLock::new(Callbacks {
41+
let callbacks = Arc::new(Mutex::new(Callbacks {
3242
attach: None,
3343
detach: None,
3444
}));
@@ -58,7 +68,7 @@ impl Emitter {
5868
while let Some(ev) = watch_stream.next().await {
5969
match ev {
6070
HotplugEvent::Connected(info) => {
61-
let guard = callbacks.read().await;
71+
let guard = callbacks_guard(&callbacks);
6272
if let Some(cb) = guard.attach.as_ref() {
6373
cb.call(
6474
UsbDevice::new(info),
@@ -67,7 +77,7 @@ impl Emitter {
6777
}
6878
}
6979
HotplugEvent::Disconnected(id) => {
70-
let guard = callbacks.read().await;
80+
let guard = callbacks_guard(&callbacks);
7181
if let Some(cb) = guard.detach.as_ref() {
7282
cb.call(format!("{:?}", id), ThreadsafeFunctionCallMode::NonBlocking);
7383
}
@@ -81,7 +91,7 @@ impl Emitter {
8191

8292
async fn stop_watching(&mut self) {
8393
let has_listeners = {
84-
let cb = self.callbacks.read().await;
94+
let cb = self.callbacks();
8595
cb.attach.is_some() || cb.detach.is_some()
8696
};
8797

@@ -98,15 +108,15 @@ impl Emitter {
98108
callback: ThreadsafeFunction<UsbDevice, (), UsbDevice, napi::Status, false>,
99109
) -> Result<()> {
100110
{
101-
self.callbacks.write().await.attach = Some(callback);
111+
self.callbacks().attach = Some(callback);
102112
}
103113
self.start_watching().await
104114
}
105115

106116
#[napi]
107117
pub async unsafe fn removeAttach(&mut self) {
108118
{
109-
self.callbacks.write().await.attach = None;
119+
self.callbacks().attach = None;
110120
}
111121
self.stop_watching().await;
112122
}
@@ -117,15 +127,15 @@ impl Emitter {
117127
callback: ThreadsafeFunction<String, (), String, napi::Status, false>,
118128
) -> Result<()> {
119129
{
120-
self.callbacks.write().await.detach = Some(callback);
130+
self.callbacks().detach = Some(callback);
121131
}
122132
self.start_watching().await
123133
}
124134

125135
#[napi]
126136
pub async unsafe fn removeDetach(&mut self) {
127137
{
128-
self.callbacks.write().await.detach = None;
138+
self.callbacks().detach = None;
129139
}
130140
self.stop_watching().await;
131141
}

0 commit comments

Comments
 (0)