Skip to content

Commit ec21000

Browse files
authored
GATT: UnixStream -> UnixDatagram (#172)
GATT communication with bluetoothd happens over a SOCK_SEQPACKET socket, however bluer uses UnixStream socket for those sockets, this leads to issues with tokio/mio/epoll, when receiving notification data from from a remote GATT characteristic. The problem works as follows: - the bluer application does initialization and subscribes to GATT notification events, tokio/mio puts the UnixStream socket into epoll using edge trigger mode. - Remote service send multiple notification event(at least 3?) - The bluer application wakes from epoll and receives the first event, it then begins processing the first event. - While the application processes the event, more events are received(at least 2 more) - The application finishes processing of the event and call epoll_wait, as there has been more new events since last epoll the application is woken immediately to process the second event. - When the process enter epoll_wait this time it will not get woken again, even if there are still events to process, as there are no NEW events since last epoll. The application will only receive the next event when an additional new event is received, however this new event will remain in the queue and not be processed. This happens because tokio/mio assumes that on a UnixStream socket there is no more data available if the read does not fill the whole buffer. However the correct thing to do on a SOCK_SEQPACKET socket is to read from the socket until it returns EAGAIN processing every packet in the queue. This is what UnixDatagram socket does. This commit uses UnixDatagram sockets instead of UnixStream sockets for GATT Characteristic communication.
1 parent 45d0837 commit ec21000

3 files changed

Lines changed: 36 additions & 36 deletions

File tree

bluer/src/gatt/local.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,9 +618,9 @@ impl CharacteristicWriteIoRequest {
618618
/// Accept the write request.
619619
pub fn accept(self) -> Result<CharacteristicReader> {
620620
let CharacteristicWriteIoRequest { adapter_name, device_address, mtu, tx, .. } = self;
621-
let (fd, stream) = make_socket_pair(false)?;
621+
let (fd, socket) = make_socket_pair(false)?;
622622
let _ = tx.send(Ok(fd));
623-
Ok(CharacteristicReader { adapter_name, device_address, mtu: mtu.into(), stream, buf: Vec::new() })
623+
Ok(CharacteristicReader { adapter_name, device_address, mtu: mtu.into(), socket, buf: Vec::new() })
624624
}
625625

626626
/// Reject the write request.
@@ -941,13 +941,13 @@ impl RegisteredCharacteristic {
941941
Some(CharacteristicNotify { method: CharacteristicNotifyMethod::Io, .. }) => {
942942
// BlueZ has already confirmed the start of the notification session.
943943
// So there is no point in making this fail-able by our users.
944-
let (fd, stream) = make_socket_pair(true).map_err(|_| ReqError::Failed)?;
944+
let (fd, socket) = make_socket_pair(true).map_err(|_| ReqError::Failed)?;
945945
let mtu = mtu_workaround(options.mtu.into());
946946
let writer = CharacteristicWriter {
947947
adapter_name: options.adapter_name.clone(),
948948
device_address: options.device_address,
949949
mtu,
950-
stream,
950+
socket,
951951
};
952952
let _ = reg
953953
.c

bluer/src/gatt/mod.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
use strum::{Display, EnumString};
1414
use tokio::{
1515
io::{AsyncRead, AsyncWrite, ReadBuf},
16-
net::UnixStream,
16+
net::UnixDatagram,
1717
};
1818

1919
use crate::Address;
@@ -112,7 +112,7 @@ pub struct CharacteristicReader {
112112
device_address: Address,
113113
mtu: usize,
114114
#[pin]
115-
stream: UnixStream,
115+
socket: UnixDatagram,
116116
buf: Vec<u8>,
117117
}
118118

@@ -134,15 +134,15 @@ impl CharacteristicReader {
134134

135135
/// Wait for a new characteristic value to become available.
136136
pub async fn recvable(&self) -> std::io::Result<()> {
137-
self.stream.readable().await
137+
self.socket.readable().await
138138
}
139139

140140
/// Try to receive the characteristic value from a single notify or write operation.
141141
///
142142
/// Does not wait for new data to arrive.
143143
pub fn try_recv(&self) -> std::io::Result<Vec<u8>> {
144144
let mut buf = Vec::with_capacity(self.mtu);
145-
let n = self.stream.try_read_buf(&mut buf)?;
145+
let n = self.socket.try_recv_buf(&mut buf)?;
146146
buf.truncate(n);
147147
Ok(buf)
148148
}
@@ -162,7 +162,7 @@ impl CharacteristicReader {
162162

163163
/// Consumes this object, returning the raw underlying file descriptor.
164164
pub fn into_raw_fd(self) -> std::io::Result<RawFd> {
165-
Ok(self.stream.into_std()?.into_raw_fd())
165+
Ok(self.socket.into_std()?.into_raw_fd())
166166
}
167167
}
168168

@@ -189,7 +189,7 @@ impl AsyncRead for CharacteristicReader {
189189
// If provided buffer is too small, read into temporary buffer.
190190
let mut mtu_buf: Vec<MaybeUninit<u8>> = vec![MaybeUninit::uninit(); *this.mtu];
191191
let mut mtu_read_buf = ReadBuf::uninit(&mut mtu_buf);
192-
ready!(this.stream.poll_read(cx, &mut mtu_read_buf))?;
192+
ready!(this.socket.poll_recv(cx, &mut mtu_read_buf))?;
193193
let n = mtu_read_buf.filled().len();
194194
mtu_buf.truncate(n);
195195
let mut mtu_buf: Vec<u8> = mtu_buf.into_iter().map(|v| unsafe { v.assume_init() }).collect();
@@ -201,14 +201,14 @@ impl AsyncRead for CharacteristicReader {
201201

202202
Poll::Ready(Ok(()))
203203
} else {
204-
self.project().stream.poll_read(cx, buf)
204+
self.project().socket.poll_recv(cx, buf)
205205
}
206206
}
207207
}
208208

209209
impl AsRawFd for CharacteristicReader {
210210
fn as_raw_fd(&self) -> RawFd {
211-
self.stream.as_raw_fd()
211+
self.socket.as_raw_fd()
212212
}
213213
}
214214

@@ -226,7 +226,7 @@ pub struct CharacteristicWriter {
226226
device_address: Address,
227227
mtu: usize,
228228
#[pin]
229-
stream: UnixStream,
229+
socket: UnixDatagram,
230230
}
231231

232232
impl CharacteristicWriter {
@@ -247,13 +247,13 @@ impl CharacteristicWriter {
247247

248248
/// Waits for the remote device to stop the notification session.
249249
pub async fn closed(&self) -> std::io::Result<()> {
250-
self.stream.readable().await
250+
self.socket.readable().await
251251
}
252252

253253
/// Checks if the remote device has stopped the notification session.
254254
pub fn is_closed(&self) -> std::io::Result<bool> {
255255
let mut buf = [0u8];
256-
match self.stream.try_read(&mut buf) {
256+
match self.socket.try_recv(&mut buf) {
257257
Ok(_) => Ok(true),
258258
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
259259
Err(err) => Err(err),
@@ -262,7 +262,7 @@ impl CharacteristicWriter {
262262

263263
/// Waits for send space to become available.
264264
pub async fn sendable(&self) -> std::io::Result<()> {
265-
self.stream.writable().await
265+
self.socket.writable().await
266266
}
267267

268268
/// Tries to send the characteristic value using a single write or notify operation.
@@ -274,7 +274,7 @@ impl CharacteristicWriter {
274274
if buf.len() > self.mtu {
275275
return Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "data length exceeds MTU"));
276276
}
277-
match self.stream.try_write(buf) {
277+
match self.socket.try_send(buf) {
278278
Ok(n) if n == buf.len() => Ok(()),
279279
Ok(_) => Err(std::io::Error::new(std::io::ErrorKind::Other, "partial write occured")),
280280
Err(err) => Err(err),
@@ -298,7 +298,7 @@ impl CharacteristicWriter {
298298

299299
/// Consumes this object, returning the raw underlying file descriptor.
300300
pub fn into_raw_fd(self) -> std::io::Result<RawFd> {
301-
Ok(self.stream.into_std()?.into_raw_fd())
301+
Ok(self.socket.into_std()?.into_raw_fd())
302302
}
303303
}
304304

@@ -310,21 +310,21 @@ impl AsyncWrite for CharacteristicWriter {
310310
fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context, buf: &[u8]) -> Poll<std::io::Result<usize>> {
311311
let max_len = buf.len().min(self.mtu);
312312
let buf = &buf[..max_len];
313-
self.project().stream.poll_write(cx, buf)
313+
self.project().socket.poll_send(cx, buf)
314314
}
315315

316-
fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
317-
self.project().stream.poll_flush(cx)
316+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
317+
Poll::Ready(Ok(()))
318318
}
319319

320-
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
321-
self.project().stream.poll_shutdown(cx)
320+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
321+
Poll::Ready(Ok(()))
322322
}
323323
}
324324

325325
impl AsRawFd for CharacteristicWriter {
326326
fn as_raw_fd(&self) -> RawFd {
327-
self.stream.as_raw_fd()
327+
self.socket.as_raw_fd()
328328
}
329329
}
330330

@@ -335,7 +335,7 @@ impl IntoRawFd for CharacteristicWriter {
335335
}
336336

337337
/// Creates a UNIX socket pair for communication with bluetoothd.
338-
pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, UnixStream)> {
338+
pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, UnixDatagram)> {
339339
let mut sv: [RawFd; 2] = [0; 2];
340340
let mut ty = SOCK_SEQPACKET | SOCK_CLOEXEC;
341341
if non_block {
@@ -347,10 +347,10 @@ pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, Uni
347347
let [fd1, fd2] = sv;
348348

349349
let fd1 = unsafe { OwnedFd::new(fd1) };
350-
let us = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd2) };
350+
let us = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd2) };
351351

352352
us.set_nonblocking(true)?;
353-
let us = UnixStream::from_std(us)?;
353+
let us = UnixDatagram::from_std(us)?;
354354

355355
Ok((fd1, us))
356356
}

bluer/src/gatt/remote.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use dbus::{
77
};
88
use futures::{Stream, StreamExt};
99
use std::{fmt, os::unix::prelude::FromRawFd, sync::Arc};
10-
use tokio::net::UnixStream;
10+
use tokio::net::UnixDatagram;
1111
use uuid::Uuid;
1212

1313
use super::{
@@ -351,15 +351,15 @@ impl Characteristic {
351351
pub async fn write_io(&self) -> Result<CharacteristicWriter> {
352352
let options = PropMap::new();
353353
let (fd, mtu): (OwnedFd, u16) = self.call_method("AcquireWrite", (options,)).await?;
354-
let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd.into_fd()) };
355-
stream.set_nonblocking(true)?;
356-
let stream = UnixStream::from_std(stream)?;
354+
let socket = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd.into_fd()) };
355+
socket.set_nonblocking(true)?;
356+
let socket = UnixDatagram::from_std(socket)?;
357357
let mtu = mtu_workaround(mtu.into());
358358
Ok(CharacteristicWriter {
359359
adapter_name: self.adapter_name().to_string(),
360360
device_address: self.device_address,
361361
mtu,
362-
stream,
362+
socket,
363363
})
364364
}
365365

@@ -431,14 +431,14 @@ impl Characteristic {
431431
pub async fn notify_io(&self) -> Result<CharacteristicReader> {
432432
let options = PropMap::new();
433433
let (fd, mtu): (OwnedFd, u16) = self.call_method("AcquireNotify", (options,)).await?;
434-
let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd.into_fd()) };
435-
stream.set_nonblocking(true)?;
436-
let stream = UnixStream::from_std(stream)?;
434+
let socket = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd.into_fd()) };
435+
socket.set_nonblocking(true)?;
436+
let socket = UnixDatagram::from_std(socket)?;
437437
Ok(CharacteristicReader {
438438
adapter_name: self.adapter_name().to_string(),
439439
device_address: self.device_address,
440440
mtu: mtu.into(),
441-
stream,
441+
socket,
442442
buf: Vec::new(),
443443
})
444444
}

0 commit comments

Comments
 (0)