Merge pull request #131 from kevinmehall/fixes

Clean up barely-used dependency, fix potential synchronization issues
This commit is contained in:
Kevin Mehall 2025-05-10 13:21:13 -06:00 committed by GitHub
commit 19e540fdd7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 46 additions and 43 deletions

View file

@ -323,24 +323,26 @@ impl LinuxDevice {
}
pub fn control_in(
&self,
self: Arc<Self>,
data: ControlIn,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<Vec<u8>, TransferError>> {
let t = TransferData::new_control_in(data);
TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(|t| {
TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(move |t| {
drop(self); // ensure device stays alive
t.status()?;
Ok(t.control_in_data().to_owned())
})
}
pub fn control_out(
&self,
self: Arc<Self>,
data: ControlOut,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<(), TransferError>> {
let t = TransferData::new_control_out(data);
TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(|t| {
TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(move |t| {
drop(self); // ensure device stays alive
t.status()?;
Ok(())
})
@ -570,7 +572,7 @@ impl LinuxInterface {
data: ControlIn,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<Vec<u8>, TransferError>> {
self.device.control_in(data, timeout)
self.device.clone().control_in(data, timeout)
}
pub fn control_out(
@ -578,7 +580,7 @@ impl LinuxInterface {
data: ControlOut,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<(), TransferError>> {
self.device.control_out(data, timeout)
self.device.clone().control_out(data, timeout)
}
pub fn get_alt_setting(&self) -> u8 {

View file

@ -19,15 +19,7 @@ use rustix::{
io::Errno,
};
use slab::Slab;
use std::{
io,
mem::MaybeUninit,
sync::{Arc, Mutex},
task::Waker,
thread,
};
use atomic_waker::AtomicWaker;
use std::{io, mem::MaybeUninit, sync::Mutex, task::Waker, thread};
use super::Device;
@ -104,8 +96,10 @@ fn event_loop() {
Tag::Device(id) => Device::handle_usb_epoll(id),
Tag::DeviceTimer(id) => Device::handle_timer_epoll(id),
Tag::Waker(id) => {
if let Some(waker) = WAKERS.lock().unwrap().get(id) {
waker.wake();
if let Some(waker) = WAKERS.lock().unwrap().get_mut(id) {
if let Some(w) = waker.take() {
w.wake();
}
}
}
}
@ -113,24 +107,27 @@ fn event_loop() {
}
}
static WAKERS: Mutex<Slab<Arc<AtomicWaker>>> = Mutex::new(Slab::new());
static WAKERS: Mutex<Slab<Option<Waker>>> = Mutex::new(Slab::new());
pub(crate) struct Async<T: AsFd> {
pub(crate) inner: T,
waker: Arc<AtomicWaker>,
id: usize,
}
impl<T: AsFd> Async<T> {
pub fn new(inner: T) -> Result<Self, io::Error> {
let waker = Arc::new(AtomicWaker::new());
let id = WAKERS.lock().unwrap().insert(waker.clone());
let id = WAKERS.lock().unwrap().insert(None);
register_fd(inner.as_fd(), Tag::Waker(id), EventFlags::empty())?;
Ok(Async { inner, id, waker })
Ok(Async { inner, id })
}
pub fn register(&self, waker: &Waker) -> Result<(), io::Error> {
self.waker.register(waker);
WAKERS
.lock()
.unwrap()
.get_mut(self.id)
.unwrap()
.replace(waker.clone());
let epoll_fd = EPOLL_FD.get().unwrap();
epoll::modify(
epoll_fd,

View file

@ -222,7 +222,7 @@ impl MacDevice {
}
pub fn control_in(
self: &Arc<Self>,
self: Arc<Self>,
data: ControlIn,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<Vec<u8>, TransferError>> {
@ -244,7 +244,8 @@ impl MacDevice {
noDataTimeout: timeout,
};
TransferFuture::new(t, |t| self.submit_control(Direction::In, t, req)).map(|t| {
TransferFuture::new(t, |t| self.submit_control(Direction::In, t, req)).map(move |t| {
drop(self); // ensure device stays alive
t.status()?;
let t = ManuallyDrop::new(t);
Ok(unsafe { Vec::from_raw_parts(t.buf, t.actual_len as usize, t.capacity as usize) })
@ -252,7 +253,7 @@ impl MacDevice {
}
pub fn control_out(
self: &Arc<Self>,
self: Arc<Self>,
data: ControlOut,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<(), TransferError>> {
@ -273,7 +274,8 @@ impl MacDevice {
noDataTimeout: timeout,
};
TransferFuture::new(t, |t| self.submit_control(Direction::Out, t, req)).map(|t| {
TransferFuture::new(t, |t| self.submit_control(Direction::Out, t, req)).map(move |t| {
drop(self); // ensure device stays alive
t.status()?;
Ok(())
})
@ -382,7 +384,7 @@ impl MacInterface {
data: ControlIn,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<Vec<u8>, TransferError>> {
self.device.control_in(data, timeout)
self.device.clone().control_in(data, timeout)
}
pub fn control_out(
@ -390,7 +392,7 @@ impl MacInterface {
data: ControlOut,
timeout: Duration,
) -> impl MaybeFuture<Output = Result<(), TransferError>> {
self.device.control_out(data, timeout)
self.device.clone().control_out(data, timeout)
}
pub fn endpoint(

View file

@ -6,7 +6,6 @@ use std::{
task::{Context, Poll, Waker},
};
use atomic_waker::AtomicWaker;
use core_foundation::{base::TCFType, dictionary::CFDictionary, runloop::CFRunLoopSource};
use io_kit_sys::{
kIOMasterPortDefault,
@ -29,18 +28,18 @@ use super::{
};
// Wakers are owned by a global slab to avoid race conditions when freeing them
static WAKERS: Mutex<Slab<AtomicWaker>> = Mutex::new(Slab::new());
static WAKERS: Mutex<Slab<Option<Waker>>> = Mutex::new(Slab::new());
/// An AtomicWaker registered with `WAKERS`
struct SlabWaker(usize);
impl SlabWaker {
fn new() -> SlabWaker {
SlabWaker(WAKERS.lock().unwrap().insert(AtomicWaker::new()))
SlabWaker(WAKERS.lock().unwrap().insert(None))
}
fn register(&self, w: &Waker) {
WAKERS.lock().unwrap()[self.0].register(w);
WAKERS.lock().unwrap()[self.0].replace(w.clone());
}
}
@ -181,7 +180,7 @@ fn register_notification(
unsafe extern "C" fn callback(refcon: *mut c_void, _iterator: io_iterator_t) {
debug!("hotplug event callback");
let id = refcon as usize;
if let Some(waker) = WAKERS.lock().unwrap().get(id) {
waker.wake()
if let Some(waker) = WAKERS.lock().unwrap().get_mut(id) {
waker.take().map(|w| w.wake());
}
}

View file

@ -5,10 +5,9 @@ use std::{
mem::size_of,
ptr::{self, addr_of},
sync::Mutex,
task::{Context, Poll},
task::{Context, Poll, Waker},
};
use atomic_waker::AtomicWaker;
use log::{debug, error};
use windows_sys::Win32::{
Devices::{
@ -40,7 +39,7 @@ pub(crate) struct WindowsHotplugWatch {
}
struct HotplugInner {
waker: AtomicWaker,
waker: Mutex<Option<Waker>>,
events: Mutex<VecDeque<(Action, DevInst)>>,
}
@ -54,7 +53,7 @@ impl WindowsHotplugWatch {
pub fn new() -> Result<WindowsHotplugWatch, Error> {
let inner = Box::into_raw(Box::new(HotplugInner {
events: Mutex::new(VecDeque::new()),
waker: AtomicWaker::new(),
waker: Mutex::new(None),
}));
let mut registration = ptr::null_mut();
@ -98,7 +97,11 @@ impl WindowsHotplugWatch {
}
pub fn poll_next(&mut self, cx: &mut Context) -> Poll<HotplugEvent> {
self.inner().waker.register(cx.waker());
self.inner()
.waker
.lock()
.unwrap()
.replace(cx.waker().clone());
let event = self.inner().events.lock().unwrap().pop_front();
match event {
Some((Action::Connect, devinst)) => {
@ -162,6 +165,6 @@ unsafe extern "system" fn hotplug_callback(
debug!("Hotplug callback: action={action:?}, instance={device_instance}");
inner.events.lock().unwrap().push_back((action, devinst));
inner.waker.wake();
inner.waker.lock().unwrap().take().map(|w| w.wake());
return ERROR_SUCCESS;
}

View file

@ -198,7 +198,7 @@ pub fn take_completed_from_option<P>(option: &mut Option<Pending<P>>) -> Option<
impl<P> Drop for Pending<P> {
fn drop(&mut self) {
match self.state().swap(STATE_ABANDONED, Ordering::Acquire) {
match self.state().swap(STATE_ABANDONED, Ordering::AcqRel) {
STATE_PENDING => { /* handler responsible for dropping */ }
STATE_IDLE => {
// SAFETY: state means there is no concurrent access
@ -217,7 +217,7 @@ pub(crate) unsafe fn notify_completion<P>(transfer: *mut P) {
unsafe {
let transfer = transfer as *mut TransferInner<P>;
let notify = (*transfer).notify.clone();
match (*transfer).state.swap(STATE_IDLE, Ordering::Release) {
match (*transfer).state.swap(STATE_IDLE, Ordering::AcqRel) {
STATE_PENDING => (*notify).as_ref().notify(),
STATE_ABANDONED => {
drop(Box::from_raw(transfer));