linux: Refactor events slab

This commit is contained in:
Kevin Mehall 2025-02-01 19:33:37 -07:00
parent 99eaf768fd
commit 8dfb3e5f1d
2 changed files with 97 additions and 78 deletions

View file

@ -1,5 +1,5 @@
use std::io::{ErrorKind, Seek};
use std::sync::Mutex;
use std::sync::{Mutex, Weak};
use std::{ffi::c_void, time::Duration};
use std::{
fs::File,
@ -20,6 +20,7 @@ use rustix::{
fs::{Mode, OFlags},
io::Errno,
};
use slab::Slab;
use super::{
errno_to_transfer_error, events,
@ -28,7 +29,6 @@ use super::{
};
use crate::descriptors::{ConfigurationDescriptor, DeviceDescriptor};
use crate::maybe_future::{blocking::Blocking, MaybeFuture};
use crate::platform::linux_usbfs::events::Watch;
use crate::transfer::{ControlType, Recipient};
use crate::{
descriptors::{parse_concatenated_config_descriptors, DESCRIPTOR_LEN_DEVICE},
@ -38,6 +38,8 @@ use crate::{
DeviceInfo, Error, Speed,
};
static DEVICES: Mutex<Slab<Weak<LinuxDevice>>> = Mutex::new(Slab::new());
pub(crate) struct LinuxDevice {
fd: OwnedFd,
events_id: usize,
@ -62,12 +64,7 @@ impl LinuxDevice {
let path = PathBuf::from(format!("/dev/bus/usb/{busnum:03}/{devnum:03}"));
let fd = rustix::fs::open(&path, OFlags::RDWR | OFlags::CLOEXEC, Mode::empty())
.inspect_err(|e| warn!("Failed to open device {path:?}: {e}"))?;
let inner = Self::create_inner(fd, Some(sysfs_path), Some(active_config));
if inner.is_ok() {
debug!("Opened device bus={busnum} addr={devnum}",);
}
inner
Self::create_inner(fd, Some(sysfs_path), Some(active_config))
})
}
@ -105,19 +102,8 @@ impl LinuxDevice {
Self::get_config(&descriptors, &fd)?
};
// because there's no Arc::try_new_cyclic
let mut events_err = None;
let arc = Arc::new_cyclic(|weak| {
let res = events::register(
fd.as_fd(),
Watch::Device(weak.clone()),
epoll::EventFlags::OUT,
);
let events_id = *res.as_ref().unwrap_or(&usize::MAX);
events_err = res.err();
if events_err.is_none() {
debug!("Opened device fd={} with id {}", fd.as_raw_fd(), events_id,);
}
let events_id = DEVICES.lock().unwrap().insert(weak.clone());
LinuxDevice {
fd,
events_id,
@ -127,15 +113,29 @@ impl LinuxDevice {
}
});
if let Some(err) = events_err {
error!("Failed to initialize event loop: {err}");
Err(err)
} else {
Ok(crate::Device::wrap(arc))
debug!(
"Opened device fd={} with id {}",
arc.fd.as_raw_fd(),
arc.events_id
);
events::register_fd(
arc.fd.as_fd(),
events::Tag::Device(arc.events_id),
epoll::EventFlags::OUT,
)?;
Ok(crate::Device::wrap(arc))
}
pub(crate) fn handle_usb_epoll(id: usize) {
let device = DEVICES.lock().unwrap().get(id).and_then(|w| w.upgrade());
if let Some(device) = device {
device.handle_events();
}
}
pub(crate) fn handle_events(&self) {
fn handle_events(&self) {
debug!("Handling events for device {}", self.events_id);
match usbfs::reap_urb_ndelay(&self.fd) {
Ok(urb_ptr) => {
@ -457,7 +457,8 @@ impl LinuxDevice {
impl Drop for LinuxDevice {
fn drop(&mut self) {
debug!("Closing device {}", self.events_id);
events::unregister(self.fd.as_fd(), self.events_id)
events::unregister_fd(self.fd.as_fd());
DEVICES.lock().unwrap().remove(self.events_id);
}
}

View file

@ -1,16 +1,17 @@
use atomic_waker::AtomicWaker;
/// Epoll based event loop for Linux.
///
/// Launches a thread when opening the first device that polls
/// for events on usbfs devices and arbitrary file descriptors
/// (used for udev hotplug).
///
/// ### Why not share an event loop with `tokio` or `async-io`?
///
/// This event loop will call USBFS_REAP_URB on the event thread and
/// dispatch to the transfer's waker directly. Since all USB transfers
/// on a device use the same file descriptor, putting USB-specific
/// dispatch in the event loop avoids additonal synchronization.
//! Epoll based event loop for Linux.
//!
//! Launches a thread when opening the first device that polls
//! for events on usbfs devices and arbitrary file descriptors
//! (used for udev hotplug).
//!
//! ### Why not share an event loop with `tokio` or `async-io`?
//!
//! This event loop will call USBFS_REAP_URB on the event thread and
//! dispatch to the transfer's waker directly. Since all USB transfers
//! on a device use the same file descriptor, putting USB-specific
//! dispatch in the event loop avoids additonal synchronization.
use crate::Error;
use once_cell::sync::OnceCell;
use rustix::{
event::epoll::{self, EventData, EventFlags},
@ -20,42 +21,63 @@ use rustix::{
use slab::Slab;
use std::{
io,
sync::{Arc, Mutex, Weak},
sync::{Arc, Mutex},
task::Waker,
thread,
};
use crate::Error;
use atomic_waker::AtomicWaker;
use super::Device;
static EPOLL_FD: OnceCell<OwnedFd> = OnceCell::new();
static WATCHES: Mutex<Slab<Watch>> = Mutex::new(Slab::new());
pub(super) enum Watch {
Device(Weak<Device>),
Fd(Arc<AtomicWaker>),
pub(crate) enum Tag {
Device(usize),
Waker(usize),
}
pub(super) fn register(fd: BorrowedFd, watch: Watch, flags: EventFlags) -> Result<usize, Error> {
impl Tag {
const DEVICE: u64 = 1;
const WAKER: u64 = 3;
fn as_event_data(&self) -> EventData {
let (tag, id) = match *self {
Tag::Device(id) => (Self::DEVICE, id),
Tag::Waker(id) => (Self::WAKER, id),
};
EventData::new_u64((id as u64) << 3 | tag)
}
fn from_event_data(data: EventData) -> Self {
let id = (data.u64() >> 3) as usize;
let tag = data.u64() & 0b111;
match (tag, id as usize) {
(Self::DEVICE, id) => Tag::Device(id),
(Self::WAKER, id) => Tag::Waker(id),
_ => panic!("Invalid event data"),
}
}
}
pub(super) fn register_fd(fd: BorrowedFd, tag: Tag, flags: EventFlags) -> Result<(), Error> {
let mut start_thread = false;
let epoll_fd = EPOLL_FD.get_or_try_init(|| {
start_thread = true;
epoll::create(epoll::CreateFlags::CLOEXEC)
epoll::create(epoll::CreateFlags::CLOEXEC).inspect_err(|e| {
log::error!("Failed to initialize epoll: {e}");
})
})?;
let id = {
let mut watches = WATCHES.lock().unwrap();
watches.insert(watch)
};
if start_thread {
thread::spawn(event_loop);
}
let data = EventData::new_u64(id as u64);
epoll::add(epoll_fd, fd, data, flags)?;
Ok(id)
epoll::add(epoll_fd, fd, tag.as_event_data(), flags).inspect_err(|e| {
log::error!("Failed to add epoll watch: {e}");
})?;
Ok(())
}
pub(super) fn unregister_fd(fd: BorrowedFd) {
@ -63,39 +85,27 @@ pub(super) fn unregister_fd(fd: BorrowedFd) {
epoll::delete(epoll_fd, fd).ok();
}
pub(super) fn unregister(fd: BorrowedFd, events_id: usize) {
let epoll_fd = EPOLL_FD.get().unwrap();
epoll::delete(epoll_fd, fd).ok();
WATCHES.lock().unwrap().remove(events_id);
}
fn event_loop() {
let epoll_fd = EPOLL_FD.get().unwrap();
let mut event_list = epoll::EventVec::with_capacity(4);
loop {
retry_on_intr(|| epoll::wait(epoll_fd, &mut event_list, -1)).unwrap();
for event in &event_list {
let key = event.data.u64() as usize;
log::trace!("event on {key}");
let lock = WATCHES.lock().unwrap();
let Some(watch) = lock.get(key) else { continue };
match watch {
Watch::Device(w) => {
if let Some(device) = w.upgrade() {
drop(lock);
device.handle_events();
// `device` gets dropped here. if it was the last reference, the LinuxDevice will be dropped.
// That will unregister its fd, so it's important that WATCHES is unlocked here, or we'd deadlock.
match Tag::from_event_data(event.data) {
Tag::Device(id) => Device::handle_usb_epoll(id),
Tag::Waker(id) => {
if let Some(waker) = WAKERS.lock().unwrap().get(id) {
waker.wake();
}
}
Watch::Fd(waker) => waker.wake(),
}
}
}
}
pub(crate) struct Async<T> {
static WAKERS: Mutex<Slab<Arc<AtomicWaker>>> = Mutex::new(Slab::new());
pub(crate) struct Async<T: AsFd> {
pub(crate) inner: T,
waker: Arc<AtomicWaker>,
id: usize,
@ -104,7 +114,8 @@ pub(crate) struct Async<T> {
impl<T: AsFd> Async<T> {
pub fn new(inner: T) -> Result<Self, io::Error> {
let waker = Arc::new(AtomicWaker::new());
let id = register(inner.as_fd(), Watch::Fd(waker.clone()), EventFlags::empty())?;
let id = WAKERS.lock().unwrap().insert(waker.clone());
register_fd(inner.as_fd(), Tag::Waker(id), EventFlags::empty())?;
Ok(Async { inner, id, waker })
}
@ -114,9 +125,16 @@ impl<T: AsFd> Async<T> {
epoll::modify(
epoll_fd,
self.inner.as_fd(),
EventData::new_u64(self.id as u64),
Tag::Waker(self.id).as_event_data(),
EventFlags::ONESHOT | EventFlags::IN,
)?;
Ok(())
}
}
impl<T: AsFd> Drop for Async<T> {
fn drop(&mut self) {
unregister_fd(self.inner.as_fd());
WAKERS.lock().unwrap().remove(self.id);
}
}