diff --git a/src/platform/linux_usbfs/device.rs b/src/platform/linux_usbfs/device.rs index 95ad0de..3a0585e 100644 --- a/src/platform/linux_usbfs/device.rs +++ b/src/platform/linux_usbfs/device.rs @@ -1,5 +1,5 @@ use std::io::{ErrorKind, Seek}; -use std::sync::Mutex; +use std::sync::{Mutex, Weak}; use std::{ffi::c_void, time::Duration}; use std::{ fs::File, @@ -20,6 +20,7 @@ use rustix::{ fs::{Mode, OFlags}, io::Errno, }; +use slab::Slab; use super::{ errno_to_transfer_error, events, @@ -28,7 +29,6 @@ use super::{ }; use crate::descriptors::{ConfigurationDescriptor, DeviceDescriptor}; use crate::maybe_future::{blocking::Blocking, MaybeFuture}; -use crate::platform::linux_usbfs::events::Watch; use crate::transfer::{ControlType, Recipient}; use crate::{ descriptors::{parse_concatenated_config_descriptors, DESCRIPTOR_LEN_DEVICE}, @@ -38,6 +38,8 @@ use crate::{ DeviceInfo, Error, Speed, }; +static DEVICES: Mutex>> = Mutex::new(Slab::new()); + pub(crate) struct LinuxDevice { fd: OwnedFd, events_id: usize, @@ -62,12 +64,7 @@ impl LinuxDevice { let path = PathBuf::from(format!("/dev/bus/usb/{busnum:03}/{devnum:03}")); let fd = rustix::fs::open(&path, OFlags::RDWR | OFlags::CLOEXEC, Mode::empty()) .inspect_err(|e| warn!("Failed to open device {path:?}: {e}"))?; - - let inner = Self::create_inner(fd, Some(sysfs_path), Some(active_config)); - if inner.is_ok() { - debug!("Opened device bus={busnum} addr={devnum}",); - } - inner + Self::create_inner(fd, Some(sysfs_path), Some(active_config)) }) } @@ -105,19 +102,8 @@ impl LinuxDevice { Self::get_config(&descriptors, &fd)? }; - // because there's no Arc::try_new_cyclic - let mut events_err = None; let arc = Arc::new_cyclic(|weak| { - let res = events::register( - fd.as_fd(), - Watch::Device(weak.clone()), - epoll::EventFlags::OUT, - ); - let events_id = *res.as_ref().unwrap_or(&usize::MAX); - events_err = res.err(); - if events_err.is_none() { - debug!("Opened device fd={} with id {}", fd.as_raw_fd(), events_id,); - } + let events_id = DEVICES.lock().unwrap().insert(weak.clone()); LinuxDevice { fd, events_id, @@ -127,15 +113,29 @@ impl LinuxDevice { } }); - if let Some(err) = events_err { - error!("Failed to initialize event loop: {err}"); - Err(err) - } else { - Ok(crate::Device::wrap(arc)) + debug!( + "Opened device fd={} with id {}", + arc.fd.as_raw_fd(), + arc.events_id + ); + + events::register_fd( + arc.fd.as_fd(), + events::Tag::Device(arc.events_id), + epoll::EventFlags::OUT, + )?; + + Ok(crate::Device::wrap(arc)) + } + + pub(crate) fn handle_usb_epoll(id: usize) { + let device = DEVICES.lock().unwrap().get(id).and_then(|w| w.upgrade()); + if let Some(device) = device { + device.handle_events(); } } - pub(crate) fn handle_events(&self) { + fn handle_events(&self) { debug!("Handling events for device {}", self.events_id); match usbfs::reap_urb_ndelay(&self.fd) { Ok(urb_ptr) => { @@ -457,7 +457,8 @@ impl LinuxDevice { impl Drop for LinuxDevice { fn drop(&mut self) { debug!("Closing device {}", self.events_id); - events::unregister(self.fd.as_fd(), self.events_id) + events::unregister_fd(self.fd.as_fd()); + DEVICES.lock().unwrap().remove(self.events_id); } } diff --git a/src/platform/linux_usbfs/events.rs b/src/platform/linux_usbfs/events.rs index c717970..ef9d1b0 100644 --- a/src/platform/linux_usbfs/events.rs +++ b/src/platform/linux_usbfs/events.rs @@ -1,16 +1,17 @@ -use atomic_waker::AtomicWaker; -/// Epoll based event loop for Linux. -/// -/// Launches a thread when opening the first device that polls -/// for events on usbfs devices and arbitrary file descriptors -/// (used for udev hotplug). -/// -/// ### Why not share an event loop with `tokio` or `async-io`? -/// -/// This event loop will call USBFS_REAP_URB on the event thread and -/// dispatch to the transfer's waker directly. Since all USB transfers -/// on a device use the same file descriptor, putting USB-specific -/// dispatch in the event loop avoids additonal synchronization. +//! Epoll based event loop for Linux. +//! +//! Launches a thread when opening the first device that polls +//! for events on usbfs devices and arbitrary file descriptors +//! (used for udev hotplug). +//! +//! ### Why not share an event loop with `tokio` or `async-io`? +//! +//! This event loop will call USBFS_REAP_URB on the event thread and +//! dispatch to the transfer's waker directly. Since all USB transfers +//! on a device use the same file descriptor, putting USB-specific +//! dispatch in the event loop avoids additonal synchronization. + +use crate::Error; use once_cell::sync::OnceCell; use rustix::{ event::epoll::{self, EventData, EventFlags}, @@ -20,42 +21,63 @@ use rustix::{ use slab::Slab; use std::{ io, - sync::{Arc, Mutex, Weak}, + sync::{Arc, Mutex}, task::Waker, thread, }; -use crate::Error; +use atomic_waker::AtomicWaker; use super::Device; static EPOLL_FD: OnceCell = OnceCell::new(); -static WATCHES: Mutex> = Mutex::new(Slab::new()); -pub(super) enum Watch { - Device(Weak), - Fd(Arc), +pub(crate) enum Tag { + Device(usize), + Waker(usize), } -pub(super) fn register(fd: BorrowedFd, watch: Watch, flags: EventFlags) -> Result { +impl Tag { + const DEVICE: u64 = 1; + const WAKER: u64 = 3; + + fn as_event_data(&self) -> EventData { + let (tag, id) = match *self { + Tag::Device(id) => (Self::DEVICE, id), + Tag::Waker(id) => (Self::WAKER, id), + }; + EventData::new_u64((id as u64) << 3 | tag) + } + + fn from_event_data(data: EventData) -> Self { + let id = (data.u64() >> 3) as usize; + let tag = data.u64() & 0b111; + match (tag, id as usize) { + (Self::DEVICE, id) => Tag::Device(id), + (Self::WAKER, id) => Tag::Waker(id), + _ => panic!("Invalid event data"), + } + } +} + +pub(super) fn register_fd(fd: BorrowedFd, tag: Tag, flags: EventFlags) -> Result<(), Error> { let mut start_thread = false; let epoll_fd = EPOLL_FD.get_or_try_init(|| { start_thread = true; - epoll::create(epoll::CreateFlags::CLOEXEC) + epoll::create(epoll::CreateFlags::CLOEXEC).inspect_err(|e| { + log::error!("Failed to initialize epoll: {e}"); + }) })?; - let id = { - let mut watches = WATCHES.lock().unwrap(); - watches.insert(watch) - }; - if start_thread { thread::spawn(event_loop); } - let data = EventData::new_u64(id as u64); - epoll::add(epoll_fd, fd, data, flags)?; - Ok(id) + epoll::add(epoll_fd, fd, tag.as_event_data(), flags).inspect_err(|e| { + log::error!("Failed to add epoll watch: {e}"); + })?; + + Ok(()) } pub(super) fn unregister_fd(fd: BorrowedFd) { @@ -63,39 +85,27 @@ pub(super) fn unregister_fd(fd: BorrowedFd) { epoll::delete(epoll_fd, fd).ok(); } -pub(super) fn unregister(fd: BorrowedFd, events_id: usize) { - let epoll_fd = EPOLL_FD.get().unwrap(); - epoll::delete(epoll_fd, fd).ok(); - WATCHES.lock().unwrap().remove(events_id); -} - fn event_loop() { let epoll_fd = EPOLL_FD.get().unwrap(); let mut event_list = epoll::EventVec::with_capacity(4); loop { retry_on_intr(|| epoll::wait(epoll_fd, &mut event_list, -1)).unwrap(); for event in &event_list { - let key = event.data.u64() as usize; - log::trace!("event on {key}"); - let lock = WATCHES.lock().unwrap(); - let Some(watch) = lock.get(key) else { continue }; - - match watch { - Watch::Device(w) => { - if let Some(device) = w.upgrade() { - drop(lock); - device.handle_events(); - // `device` gets dropped here. if it was the last reference, the LinuxDevice will be dropped. - // That will unregister its fd, so it's important that WATCHES is unlocked here, or we'd deadlock. + match Tag::from_event_data(event.data) { + Tag::Device(id) => Device::handle_usb_epoll(id), + Tag::Waker(id) => { + if let Some(waker) = WAKERS.lock().unwrap().get(id) { + waker.wake(); } } - Watch::Fd(waker) => waker.wake(), } } } } -pub(crate) struct Async { +static WAKERS: Mutex>> = Mutex::new(Slab::new()); + +pub(crate) struct Async { pub(crate) inner: T, waker: Arc, id: usize, @@ -104,7 +114,8 @@ pub(crate) struct Async { impl Async { pub fn new(inner: T) -> Result { let waker = Arc::new(AtomicWaker::new()); - let id = register(inner.as_fd(), Watch::Fd(waker.clone()), EventFlags::empty())?; + let id = WAKERS.lock().unwrap().insert(waker.clone()); + register_fd(inner.as_fd(), Tag::Waker(id), EventFlags::empty())?; Ok(Async { inner, id, waker }) } @@ -114,9 +125,16 @@ impl Async { epoll::modify( epoll_fd, self.inner.as_fd(), - EventData::new_u64(self.id as u64), + Tag::Waker(self.id).as_event_data(), EventFlags::ONESHOT | EventFlags::IN, )?; Ok(()) } } + +impl Drop for Async { + fn drop(&mut self) { + unregister_fd(self.inner.as_fd()); + WAKERS.lock().unwrap().remove(self.id); + } +}