Skip to content

Commit 05095f6

Browse files
authored
Merge pull request #18 from node-usb/listener-watch
Refactor Emitter to manage device watching with a task handle
2 parents 290f01d + e05c3c0 commit 05095f6

2 files changed

Lines changed: 67 additions & 64 deletions

File tree

src/lib.rs

Lines changed: 67 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,139 +9,143 @@ use napi::{
99
};
1010
use napi_derive::napi;
1111
use nusb::{hotplug::HotplugEvent, MaybeFuture};
12-
use std::sync::atomic::{AtomicBool, Ordering};
13-
use std::sync::Arc;
14-
use tokio::sync::{watch, RwLock};
12+
use std::sync::{Arc, Mutex, MutexGuard};
13+
use tokio::task::JoinHandle;
1514
use webusb_device::{run_blocking, UsbDevice};
1615

1716
struct Callbacks {
1817
attach: Option<ThreadsafeFunction<UsbDevice, (), UsbDevice, napi::Status, false>>,
1918
detach: Option<ThreadsafeFunction<String, (), String, napi::Status, false>>,
2019
}
2120

21+
fn callbacks_guard(callbacks: &Mutex<Callbacks>) -> MutexGuard<'_, Callbacks> {
22+
callbacks
23+
.lock()
24+
.unwrap_or_else(|poisoned| poisoned.into_inner())
25+
}
26+
2227
#[napi]
2328
pub struct Emitter {
24-
callbacks: Arc<RwLock<Callbacks>>,
25-
listeners_tx: watch::Sender<bool>,
26-
initialized: AtomicBool,
29+
callbacks: Arc<Mutex<Callbacks>>,
30+
watch_task: Option<JoinHandle<()>>,
2731
}
2832

2933
#[napi]
3034
impl Emitter {
35+
fn callbacks(&self) -> MutexGuard<'_, Callbacks> {
36+
callbacks_guard(&self.callbacks)
37+
}
38+
3139
#[napi(constructor)]
3240
pub fn new() -> Self {
33-
let callbacks = Arc::new(RwLock::new(Callbacks {
41+
let callbacks = Arc::new(Mutex::new(Callbacks {
3442
attach: None,
3543
detach: None,
3644
}));
37-
let (listeners_tx, _listeners_rx) = watch::channel(false);
3845
Self {
3946
callbacks,
40-
listeners_tx,
41-
initialized: AtomicBool::new(false),
47+
watch_task: None,
4248
}
4349
}
4450

45-
#[napi]
46-
pub async fn init(&self) -> Result<()> {
47-
if self.initialized.swap(true, Ordering::AcqRel) {
51+
async fn start_watching(&mut self) -> Result<()> {
52+
if matches!(self.watch_task.as_ref(), Some(task) if !task.is_finished()) {
4853
return Ok(());
4954
}
5055

56+
self.watch_task = None;
5157
let callbacks = self.callbacks.clone();
52-
let mut listeners_rx = self.listeners_tx.subscribe();
5358
let mut watch_stream = match nusb::watch_devices() {
5459
Ok(watch_stream) => watch_stream,
5560
Err(e) => {
56-
self.initialized.store(false, Ordering::Release);
57-
return Err(napi::Error::from_reason(format!("init error: {e}")));
61+
return Err(napi::Error::from_reason(format!(
62+
"watch devices error: {e}"
63+
)));
5864
}
5965
};
6066

61-
tokio::spawn(async move {
62-
loop {
63-
// Async-wait until at least one listener is attached
64-
if listeners_rx.wait_for(|v| *v).await.is_err() {
65-
return;
66-
}
67-
68-
loop {
69-
tokio::select! {
70-
_ = listeners_rx.changed() => {
71-
if !*listeners_rx.borrow() {
72-
// No listeners attached, stop watching for device events
73-
break;
74-
}
67+
self.watch_task = Some(tokio::spawn(async move {
68+
while let Some(ev) = watch_stream.next().await {
69+
match ev {
70+
HotplugEvent::Connected(info) => {
71+
let guard = callbacks_guard(&callbacks);
72+
if let Some(cb) = guard.attach.as_ref() {
73+
cb.call(
74+
UsbDevice::new(info),
75+
ThreadsafeFunctionCallMode::NonBlocking,
76+
);
7577
}
76-
ev = watch_stream.next() => {
77-
match ev {
78-
Some(HotplugEvent::Connected(info)) => {
79-
let guard = callbacks.read().await;
80-
if let Some(cb) = guard.attach.as_ref() {
81-
cb.call(UsbDevice::new(info), ThreadsafeFunctionCallMode::NonBlocking);
82-
}
83-
}
84-
Some(HotplugEvent::Disconnected(id)) => {
85-
let guard = callbacks.read().await;
86-
if let Some(cb) = guard.detach.as_ref() {
87-
cb.call(format!("{:?}", id), ThreadsafeFunctionCallMode::NonBlocking);
88-
}
89-
}
90-
None => break,
91-
}
78+
}
79+
HotplugEvent::Disconnected(id) => {
80+
let guard = callbacks_guard(&callbacks);
81+
if let Some(cb) = guard.detach.as_ref() {
82+
cb.call(format!("{:?}", id), ThreadsafeFunctionCallMode::NonBlocking);
9283
}
9384
}
9485
}
9586
}
96-
});
87+
}));
9788

9889
Ok(())
9990
}
10091

92+
async fn stop_watching(&mut self) {
93+
let has_listeners = {
94+
let cb = self.callbacks();
95+
cb.attach.is_some() || cb.detach.is_some()
96+
};
97+
98+
if !has_listeners {
99+
if let Some(task) = self.watch_task.take() {
100+
task.abort();
101+
}
102+
}
103+
}
104+
101105
#[napi]
102106
pub async unsafe fn addAttach(
103107
&mut self,
104108
callback: ThreadsafeFunction<UsbDevice, (), UsbDevice, napi::Status, false>,
105-
) {
109+
) -> Result<()> {
106110
{
107-
self.callbacks.write().await.attach = Some(callback);
111+
self.callbacks().attach = Some(callback);
108112
}
109-
self.publishState().await;
113+
self.start_watching().await
110114
}
111115

112116
#[napi]
113117
pub async unsafe fn removeAttach(&mut self) {
114118
{
115-
self.callbacks.write().await.attach = None;
119+
self.callbacks().attach = None;
116120
}
117-
self.publishState().await;
121+
self.stop_watching().await;
118122
}
119123

120124
#[napi]
121125
pub async unsafe fn addDetach(
122126
&mut self,
123127
callback: ThreadsafeFunction<String, (), String, napi::Status, false>,
124-
) {
128+
) -> Result<()> {
125129
{
126-
self.callbacks.write().await.detach = Some(callback);
130+
self.callbacks().detach = Some(callback);
127131
}
128-
self.publishState().await;
132+
self.start_watching().await
129133
}
130134

131135
#[napi]
132136
pub async unsafe fn removeDetach(&mut self) {
133137
{
134-
self.callbacks.write().await.detach = None;
138+
self.callbacks().detach = None;
135139
}
136-
self.publishState().await;
140+
self.stop_watching().await;
137141
}
142+
}
138143

139-
async fn publishState(&self) {
140-
let listeners = {
141-
let cb = self.callbacks.read().await;
142-
cb.attach.is_some() || cb.detach.is_some()
143-
};
144-
let _ = self.listeners_tx.send(listeners);
144+
impl Drop for Emitter {
145+
fn drop(&mut self) {
146+
if let Some(task) = self.watch_task.take() {
147+
task.abort();
148+
}
145149
}
146150
}
147151

tsc/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ class WebUSB extends EventTarget implements USB {
142142

143143
constructor(private options: USBOptions = {}) {
144144
super();
145-
this.nativeEmitter.init();
146145
}
147146

148147
private deviceConnectCallback = async (device: UsbDevice) => {

0 commit comments

Comments
 (0)