From dac6f52c27e05477aca7cd5dd6c434c51cf23d52 Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Sat, 3 May 2025 16:05:31 -0600 Subject: [PATCH 1/3] Remove atomic-waker --- src/platform/linux_usbfs/events.rs | 31 ++++++++++++-------------- src/platform/macos_iokit/hotplug.rs | 11 +++++---- src/platform/windows_winusb/hotplug.rs | 15 ++++++++----- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/platform/linux_usbfs/events.rs b/src/platform/linux_usbfs/events.rs index 6c4ab4b..93eade0 100644 --- a/src/platform/linux_usbfs/events.rs +++ b/src/platform/linux_usbfs/events.rs @@ -19,15 +19,7 @@ use rustix::{ io::Errno, }; use slab::Slab; -use std::{ - io, - mem::MaybeUninit, - sync::{Arc, Mutex}, - task::Waker, - thread, -}; - -use atomic_waker::AtomicWaker; +use std::{io, mem::MaybeUninit, sync::Mutex, task::Waker, thread}; use super::Device; @@ -104,8 +96,10 @@ fn event_loop() { Tag::Device(id) => Device::handle_usb_epoll(id), Tag::DeviceTimer(id) => Device::handle_timer_epoll(id), Tag::Waker(id) => { - if let Some(waker) = WAKERS.lock().unwrap().get(id) { - waker.wake(); + if let Some(waker) = WAKERS.lock().unwrap().get_mut(id) { + if let Some(w) = waker.take() { + w.wake(); + } } } } @@ -113,24 +107,27 @@ fn event_loop() { } } -static WAKERS: Mutex>> = Mutex::new(Slab::new()); +static WAKERS: Mutex>> = Mutex::new(Slab::new()); pub(crate) struct Async { pub(crate) inner: T, - waker: Arc, id: usize, } impl Async { pub fn new(inner: T) -> Result { - let waker = Arc::new(AtomicWaker::new()); - let id = WAKERS.lock().unwrap().insert(waker.clone()); + let id = WAKERS.lock().unwrap().insert(None); register_fd(inner.as_fd(), Tag::Waker(id), EventFlags::empty())?; - Ok(Async { inner, id, waker }) + Ok(Async { inner, id }) } pub fn register(&self, waker: &Waker) -> Result<(), io::Error> { - self.waker.register(waker); + WAKERS + .lock() + .unwrap() + .get_mut(self.id) + .unwrap() + .replace(waker.clone()); let epoll_fd = EPOLL_FD.get().unwrap(); epoll::modify( epoll_fd, diff --git a/src/platform/macos_iokit/hotplug.rs b/src/platform/macos_iokit/hotplug.rs index 175b9be..86440b6 100644 --- a/src/platform/macos_iokit/hotplug.rs +++ b/src/platform/macos_iokit/hotplug.rs @@ -6,7 +6,6 @@ use std::{ task::{Context, Poll, Waker}, }; -use atomic_waker::AtomicWaker; use core_foundation::{base::TCFType, dictionary::CFDictionary, runloop::CFRunLoopSource}; use io_kit_sys::{ kIOMasterPortDefault, @@ -29,18 +28,18 @@ use super::{ }; // Wakers are owned by a global slab to avoid race conditions when freeing them -static WAKERS: Mutex> = Mutex::new(Slab::new()); +static WAKERS: Mutex>> = Mutex::new(Slab::new()); /// An AtomicWaker registered with `WAKERS` struct SlabWaker(usize); impl SlabWaker { fn new() -> SlabWaker { - SlabWaker(WAKERS.lock().unwrap().insert(AtomicWaker::new())) + SlabWaker(WAKERS.lock().unwrap().insert(None)) } fn register(&self, w: &Waker) { - WAKERS.lock().unwrap()[self.0].register(w); + WAKERS.lock().unwrap()[self.0].replace(w.clone()); } } @@ -181,7 +180,7 @@ fn register_notification( unsafe extern "C" fn callback(refcon: *mut c_void, _iterator: io_iterator_t) { debug!("hotplug event callback"); let id = refcon as usize; - if let Some(waker) = WAKERS.lock().unwrap().get(id) { - waker.wake() + if let Some(waker) = WAKERS.lock().unwrap().get_mut(id) { + waker.take().map(|w| w.wake()); } } diff --git a/src/platform/windows_winusb/hotplug.rs b/src/platform/windows_winusb/hotplug.rs index 3a52843..355783f 100644 --- a/src/platform/windows_winusb/hotplug.rs +++ b/src/platform/windows_winusb/hotplug.rs @@ -5,10 +5,9 @@ use std::{ mem::size_of, ptr::{self, addr_of}, sync::Mutex, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; -use atomic_waker::AtomicWaker; use log::{debug, error}; use windows_sys::Win32::{ Devices::{ @@ -40,7 +39,7 @@ pub(crate) struct WindowsHotplugWatch { } struct HotplugInner { - waker: AtomicWaker, + waker: Mutex>, events: Mutex>, } @@ -54,7 +53,7 @@ impl WindowsHotplugWatch { pub fn new() -> Result { let inner = Box::into_raw(Box::new(HotplugInner { events: Mutex::new(VecDeque::new()), - waker: AtomicWaker::new(), + waker: Mutex::new(None), })); let mut registration = ptr::null_mut(); @@ -98,7 +97,11 @@ impl WindowsHotplugWatch { } pub fn poll_next(&mut self, cx: &mut Context) -> Poll { - self.inner().waker.register(cx.waker()); + self.inner() + .waker + .lock() + .unwrap() + .replace(cx.waker().clone()); let event = self.inner().events.lock().unwrap().pop_front(); match event { Some((Action::Connect, devinst)) => { @@ -162,6 +165,6 @@ unsafe extern "system" fn hotplug_callback( debug!("Hotplug callback: action={action:?}, instance={device_instance}"); inner.events.lock().unwrap().push_back((action, devinst)); - inner.waker.wake(); + inner.waker.lock().unwrap().take().map(|w| w.wake()); return ERROR_SUCCESS; } From 4a9c3e5bbc32e7c2bdcaf62b2e82f1d128bf1e5c Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Sun, 4 May 2025 09:37:31 -0600 Subject: [PATCH 2/3] Prevent dropping a device while a control transfer is pending --- src/platform/linux_usbfs/device.rs | 14 ++++++++------ src/platform/macos_iokit/device.rs | 14 ++++++++------ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/platform/linux_usbfs/device.rs b/src/platform/linux_usbfs/device.rs index cacf954..488f537 100644 --- a/src/platform/linux_usbfs/device.rs +++ b/src/platform/linux_usbfs/device.rs @@ -323,24 +323,26 @@ impl LinuxDevice { } pub fn control_in( - &self, + self: Arc, data: ControlIn, timeout: Duration, ) -> impl MaybeFuture, TransferError>> { let t = TransferData::new_control_in(data); - TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(|t| { + TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(move |t| { + drop(self); // ensure device stays alive t.status()?; Ok(t.control_in_data().to_owned()) }) } pub fn control_out( - &self, + self: Arc, data: ControlOut, timeout: Duration, ) -> impl MaybeFuture> { let t = TransferData::new_control_out(data); - TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(|t| { + TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(move |t| { + drop(self); // ensure device stays alive t.status()?; Ok(()) }) @@ -570,7 +572,7 @@ impl LinuxInterface { data: ControlIn, timeout: Duration, ) -> impl MaybeFuture, TransferError>> { - self.device.control_in(data, timeout) + self.device.clone().control_in(data, timeout) } pub fn control_out( @@ -578,7 +580,7 @@ impl LinuxInterface { data: ControlOut, timeout: Duration, ) -> impl MaybeFuture> { - self.device.control_out(data, timeout) + self.device.clone().control_out(data, timeout) } pub fn get_alt_setting(&self) -> u8 { diff --git a/src/platform/macos_iokit/device.rs b/src/platform/macos_iokit/device.rs index 264b699..b1e52a7 100644 --- a/src/platform/macos_iokit/device.rs +++ b/src/platform/macos_iokit/device.rs @@ -222,7 +222,7 @@ impl MacDevice { } pub fn control_in( - self: &Arc, + self: Arc, data: ControlIn, timeout: Duration, ) -> impl MaybeFuture, TransferError>> { @@ -244,7 +244,8 @@ impl MacDevice { noDataTimeout: timeout, }; - TransferFuture::new(t, |t| self.submit_control(Direction::In, t, req)).map(|t| { + TransferFuture::new(t, |t| self.submit_control(Direction::In, t, req)).map(move |t| { + drop(self); // ensure device stays alive t.status()?; let t = ManuallyDrop::new(t); Ok(unsafe { Vec::from_raw_parts(t.buf, t.actual_len as usize, t.capacity as usize) }) @@ -252,7 +253,7 @@ impl MacDevice { } pub fn control_out( - self: &Arc, + self: Arc, data: ControlOut, timeout: Duration, ) -> impl MaybeFuture> { @@ -273,7 +274,8 @@ impl MacDevice { noDataTimeout: timeout, }; - TransferFuture::new(t, |t| self.submit_control(Direction::Out, t, req)).map(|t| { + TransferFuture::new(t, |t| self.submit_control(Direction::Out, t, req)).map(move |t| { + drop(self); // ensure device stays alive t.status()?; Ok(()) }) @@ -382,7 +384,7 @@ impl MacInterface { data: ControlIn, timeout: Duration, ) -> impl MaybeFuture, TransferError>> { - self.device.control_in(data, timeout) + self.device.clone().control_in(data, timeout) } pub fn control_out( @@ -390,7 +392,7 @@ impl MacInterface { data: ControlOut, timeout: Duration, ) -> impl MaybeFuture> { - self.device.control_out(data, timeout) + self.device.clone().control_out(data, timeout) } pub fn endpoint( From 5a23b845a4021ea277e5ea384c0b8039e64141e3 Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Sat, 10 May 2025 13:00:42 -0600 Subject: [PATCH 3/3] Fix memory ordering for dropping transfers --- src/transfer/internal.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transfer/internal.rs b/src/transfer/internal.rs index bc45bc4..515c633 100644 --- a/src/transfer/internal.rs +++ b/src/transfer/internal.rs @@ -198,7 +198,7 @@ pub fn take_completed_from_option

(option: &mut Option>) -> Option< impl

Drop for Pending

{ fn drop(&mut self) { - match self.state().swap(STATE_ABANDONED, Ordering::Acquire) { + match self.state().swap(STATE_ABANDONED, Ordering::AcqRel) { STATE_PENDING => { /* handler responsible for dropping */ } STATE_IDLE => { // SAFETY: state means there is no concurrent access @@ -217,7 +217,7 @@ pub(crate) unsafe fn notify_completion

(transfer: *mut P) { unsafe { let transfer = transfer as *mut TransferInner

; let notify = (*transfer).notify.clone(); - match (*transfer).state.swap(STATE_IDLE, Ordering::Release) { + match (*transfer).state.swap(STATE_IDLE, Ordering::AcqRel) { STATE_PENDING => (*notify).as_ref().notify(), STATE_ABANDONED => { drop(Box::from_raw(transfer));