From 4dc0c511f62204d0b2c33c29e50dd00674577059 Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Sun, 1 Oct 2023 11:34:42 -0600 Subject: [PATCH] Refactor transfers, add control transfers --- examples/control.rs | 35 +++ src/control.rs | 119 +++++++++ src/device.rs | 39 ++- src/lib.rs | 12 +- src/platform/linux_usbfs/device.rs | 27 +- src/platform/linux_usbfs/enumeration.rs | 4 - src/platform/linux_usbfs/mod.rs | 3 - src/platform/linux_usbfs/transfer.rs | 326 ++++++++++-------------- src/platform/linux_usbfs/usbfs.rs | 10 +- src/transfer.rs | 48 +++- src/transfer_internal.rs | 227 +++++++++++++++++ 11 files changed, 616 insertions(+), 234 deletions(-) create mode 100644 examples/control.rs create mode 100644 src/control.rs create mode 100644 src/transfer_internal.rs diff --git a/examples/control.rs b/examples/control.rs new file mode 100644 index 0000000..6e5c2f7 --- /dev/null +++ b/examples/control.rs @@ -0,0 +1,35 @@ +use futures_lite::future::block_on; +use nusb::{ControlIn, ControlOut, ControlType, Recipient}; + +fn main() { + env_logger::init(); + let di = nusb::list_devices() + .unwrap() + .find(|d| d.vendor_id() == 0x59e3 && d.product_id() == 0x0a23) + .expect("device should be connected"); + + println!("Device info: {di:?}"); + + let device = di.open().unwrap(); + let interface = device.claim_interface(0).unwrap(); + + let result = block_on(interface.control_transfer_out(ControlOut { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 0x81, + value: 0x9999, + index: 0x9999, + data: &[1, 2, 3, 4], + })); + println!("{result:?}"); + + let result = block_on(interface.control_transfer_in(ControlIn { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 0x81, + value: 0x9999, + index: 0x9999, + length: 256, + })); + println!("{result:?}"); +} diff --git a/src/control.rs b/src/control.rs new file mode 100644 index 0000000..ac5f4a8 --- /dev/null +++ b/src/control.rs @@ -0,0 +1,119 @@ +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[repr(u8)] +pub enum Direction { + /// Host to device + Out = 0, + + /// Device to host + In = 1, +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[repr(u8)] +pub enum ControlType { + Standard = 0, + Class = 1, + Vendor = 2, +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[repr(u8)] +pub enum Recipient { + Device = 0, + Interface = 1, + Endpoint = 2, + Other = 3, +} + +pub struct ControlOut<'a> { + #[doc(alias = "bmRequestType")] + pub control_type: ControlType, + + #[doc(alias = "bmRequestType")] + pub recipient: Recipient, + + #[doc(alias = "bRequest")] + pub request: u8, + + #[doc(alias = "windex")] + pub value: u16, + + #[doc(alias = "wIndex")] + pub index: u16, + + #[doc(alias = "wLength")] + pub data: &'a [u8], +} + +impl<'a> ControlOut<'a> { + pub(crate) fn setup_packet(&self) -> Result<[u8; SETUP_PACKET_SIZE], ()> { + Ok(pack_setup( + Direction::Out, + self.control_type, + self.recipient, + self.request, + self.value, + self.index, + self.data.len().try_into().map_err(|_| ())?, + )) + } +} + +pub struct ControlIn { + #[doc(alias = "bmRequestType")] + pub control_type: ControlType, + + #[doc(alias = "bmRequestType")] + pub recipient: Recipient, + + #[doc(alias = "bRequest")] + pub request: u8, + + #[doc(alias = "windex")] + pub value: u16, + + #[doc(alias = "wIndex")] + pub index: u16, + + #[doc(alias = "wLength")] + pub length: u16, +} + +impl ControlIn { + pub(crate) fn setup_packet(&self) -> [u8; SETUP_PACKET_SIZE] { + pack_setup( + Direction::In, + self.control_type, + self.recipient, + self.request, + self.value, + self.index, + self.length, + ) + } +} + +pub(crate) const SETUP_PACKET_SIZE: usize = 8; + +fn pack_setup( + direction: Direction, + control_type: ControlType, + recipient: Recipient, + request: u8, + value: u16, + index: u16, + length: u16, +) -> [u8; SETUP_PACKET_SIZE] { + let bmrequesttype = ((direction as u8) << 7) | ((control_type as u8) << 5) | (recipient as u8); + + [ + bmrequesttype, + request, + (value & 0xFF) as u8, + (value >> 8) as u8, + (index & 0xFF) as u8, + (index >> 8) as u8, + (length & 0xFF) as u8, + (length >> 8) as u8, + ] +} diff --git a/src/device.rs b/src/device.rs index 1254cfa..549ad0d 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1,6 +1,11 @@ use std::{collections::VecDeque, sync::Arc, time::Duration}; -use crate::{transfer::EndpointType, Completion, DeviceInfo, Error, Transfer}; +use crate::{ + control::{ControlIn, ControlOut}, + platform, + transfer_internal::TransferHandle, + Completion, DeviceInfo, EndpointType, Error, TransferFuture, +}; type TransferError = Error; type Buffer = Vec; @@ -39,22 +44,34 @@ impl Interface { todo!() } - pub fn bulk_transfer(&self, endpoint: u8, buf: Vec) -> Transfer { - let mut t = Transfer::new(self.backend.clone(), endpoint, EndpointType::Bulk); - t.submit(buf); - t + pub fn control_transfer_in(&self, data: ControlIn) -> TransferFuture { + let mut t = TransferHandle::new(self.backend.clone(), 0, EndpointType::Control); + t.submit::(data); + TransferFuture::new(t) } - pub fn interrupt_transfer(&self, endpoint: u8, buf: Vec) -> Transfer { - let mut t = Transfer::new(self.backend.clone(), endpoint, EndpointType::Interrupt); + pub fn control_transfer_out(&self, data: ControlOut) -> TransferFuture { + let mut t = TransferHandle::new(self.backend.clone(), 0, EndpointType::Control); + t.submit::(data); + TransferFuture::new(t) + } + + pub fn bulk_transfer(&self, endpoint: u8, buf: Vec) -> TransferFuture> { + let mut t = TransferHandle::new(self.backend.clone(), endpoint, EndpointType::Bulk); t.submit(buf); - t + TransferFuture::new(t) + } + + pub fn interrupt_transfer(&self, endpoint: u8, buf: Vec) -> TransferFuture> { + let mut t = TransferHandle::new(self.backend.clone(), endpoint, EndpointType::Interrupt); + t.submit(buf); + TransferFuture::new(t) } } struct Queue { - pending: VecDeque, - cached: Option, + pending: VecDeque>, + cached: Option>, } impl Queue { @@ -79,7 +96,7 @@ impl Queue { /// /// For an OUT endpoint, the buffer is unmodified, but can be /// reused for another transfer. - pub fn complete(&mut self, timeout: Option) -> Option { + pub fn complete(&mut self, timeout: Option) -> Option>> { todo!() } diff --git a/src/lib.rs b/src/lib.rs index ff31726..1d60a57 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,20 @@ -use std::{fmt::Display, io, str::FromStr}; +use std::io; pub mod platform; - -use device::Device; pub use platform::list_devices; +mod control; +pub use control::{ControlIn, ControlOut, ControlType, Direction, Recipient}; + mod enumeration; pub use enumeration::{DeviceInfo, Speed, UnknownValue}; mod device; +use device::Device; mod transfer; -pub use transfer::{Completion, Transfer, TransferStatus}; +pub use transfer::{Completion, EndpointType, TransferFuture, TransferStatus}; + +mod transfer_internal; pub type Error = io::Error; diff --git a/src/platform/linux_usbfs/device.rs b/src/platform/linux_usbfs/device.rs index 2549e6e..4bf74c7 100644 --- a/src/platform/linux_usbfs/device.rs +++ b/src/platform/linux_usbfs/device.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{ffi::c_void, path::PathBuf, sync::Arc}; use log::{debug, error}; use rustix::{ @@ -7,11 +7,11 @@ use rustix::{ io::Errno, }; -use super::{events, usbfs}; -use crate::{ - platform::linux_usbfs::transfer::{Transfer, TransferInner}, - DeviceInfo, Error, +use super::{ + events, + usbfs::{self, Urb}, }; +use crate::{transfer_internal, DeviceInfo, Error}; pub(crate) struct LinuxDevice { fd: OwnedFd, @@ -53,14 +53,17 @@ impl LinuxDevice { debug!("Handling events for device {}", self.events_id); match usbfs::reap_urb_ndelay(&self.fd) { Ok(urb_ptr) => { - { + let user_data = { let urb = unsafe { &*urb_ptr }; debug!( "URB {:?} for ep {:x} completed, status={} actual_length={}", urb_ptr, urb.endpoint, urb.status, urb.actual_length ); - } - unsafe { Transfer::notify_completion(urb_ptr as *mut TransferInner) } + urb.usercontext + }; + + // SAFETY: pointer came from submit via kernel an we're now done with it + unsafe { transfer_internal::notify_completion::(user_data) } } Err(Errno::AGAIN) => {} Err(Errno::NODEV) => { @@ -117,8 +120,7 @@ pub(crate) struct LinuxInterface { } impl LinuxInterface { - pub(crate) unsafe fn submit_transfer(&self, transfer: *mut TransferInner) { - let urb = transfer as *mut usbfs::Urb; + pub(crate) unsafe fn submit_urb(&self, urb: *mut Urb) { let ep = unsafe { (&mut *urb).endpoint }; if let Err(e) = usbfs::submit_urb(&self.device.fd, urb) { // SAFETY: Transfer was not submitted. We still own the transfer @@ -130,16 +132,15 @@ impl LinuxInterface { u.actual_length = 0; u.status = e.raw_os_error(); } - Transfer::notify_completion(transfer) + transfer_internal::notify_completion::(urb as *mut c_void) } } else { debug!("Submitted URB {urb:?} on ep {ep:x}"); } } - pub(crate) unsafe fn cancel_transfer(&self, transfer: *mut TransferInner) { + pub(crate) unsafe fn cancel_urb(&self, urb: *mut Urb) { unsafe { - let urb = transfer as *mut usbfs::Urb; if let Err(e) = usbfs::discard_urb(&self.device.fd, urb) { debug!("Failed to cancel URB {urb:?}: {e}"); } diff --git a/src/platform/linux_usbfs/enumeration.rs b/src/platform/linux_usbfs/enumeration.rs index d46c2f8..c1b754e 100644 --- a/src/platform/linux_usbfs/enumeration.rs +++ b/src/platform/linux_usbfs/enumeration.rs @@ -81,7 +81,3 @@ pub fn probe_device(path: SysfsPath) -> Result { path: path, }) } -/// Returns the path of a device in usbfs -fn usb_devfs_path(busnum: u8, devnum: u8) -> PathBuf { - PathBuf::from(format!("/dev/bus/usb/{busnum:03}/{devnum:03}")) -} diff --git a/src/platform/linux_usbfs/mod.rs b/src/platform/linux_usbfs/mod.rs index 7779628..9e47419 100644 --- a/src/platform/linux_usbfs/mod.rs +++ b/src/platform/linux_usbfs/mod.rs @@ -1,8 +1,5 @@ -use std::path::PathBuf; - mod transfer; mod usbfs; -pub use transfer::Transfer; mod enumeration; mod events; diff --git a/src/platform/linux_usbfs/transfer.rs b/src/platform/linux_usbfs/transfer.rs index 4847ac6..a363fd7 100644 --- a/src/platform/linux_usbfs/transfer.rs +++ b/src/platform/linux_usbfs/transfer.rs @@ -1,109 +1,77 @@ use std::{ - cell::UnsafeCell, - future::Future, + ffi::c_void, mem::{self, ManuallyDrop}, - ptr::{null_mut, NonNull}, - sync::{ - atomic::{AtomicU8, Ordering}, - Arc, - }, - task::{Context, Poll}, + ptr::null_mut, }; -use atomic_waker::AtomicWaker; use rustix::io::Errno; -use crate::{transfer::EndpointType, Completion, TransferStatus}; - -use super::{ - usbfs::{ - Urb, USBDEVFS_URB_TYPE_BULK, USBDEVFS_URB_TYPE_CONTROL, USBDEVFS_URB_TYPE_INTERRUPT, - USBDEVFS_URB_TYPE_ISO, - }, - Interface, +use crate::{ + control::{ControlIn, ControlOut, SETUP_PACKET_SIZE}, + transfer_internal, Completion, EndpointType, TransferStatus, }; +use super::usbfs::{ + Urb, USBDEVFS_URB_TYPE_BULK, USBDEVFS_URB_TYPE_CONTROL, USBDEVFS_URB_TYPE_INTERRUPT, + USBDEVFS_URB_TYPE_ISO, +}; + +/// Linux-specific transfer state. +/// +/// This logically contains a `Vec` with urb.buffer and capacity. +/// It also owns the `urb` allocation itself, which is stored out-of-line +/// to avoid violating noalias when submitting the transfer while holding +/// `&mut TransferData`. #[repr(C)] -pub(crate) struct TransferInner { - urb: UnsafeCell, - state: AtomicU8, - waker: AtomicWaker, - interface: Arc, +pub(crate) struct TransferData { + urb: *mut Urb, + capacity: usize, } -impl TransferInner { - /// Transfer ownership of `buf` into the transfer's `urb`. - /// SAFETY: requires that there is no concurrent access to `urb` - unsafe fn put_buffer(&self, buf: Vec) { - unsafe { - let mut buf = ManuallyDrop::new(buf); - let urb = &mut *self.urb.get(); - urb.buffer = buf.as_mut_ptr(); - assert!(buf.len() < i32::MAX as usize, "Buffer too large"); - urb.actual_length = buf.len() as i32; - assert!(buf.capacity() < i32::MAX as usize, "Buffer too large"); - urb.buffer_length = buf.capacity() as i32; - } +unsafe impl Send for TransferData {} + +impl TransferData { + fn urb_mut(&mut self) -> &mut Urb { + // SAFETY: if we have `&mut`, the transfer is not pending + unsafe { &mut *self.urb } } - /// Transfer ownership of transfer's `urb` buffer back to a `Vec`. - /// SAFETY: requires that the the buffer is present and there is no concurrent - /// access to `urb`. Invalidates the buffer. - unsafe fn take_buffer(&self) -> Vec { - unsafe { - let urb = &mut *self.urb.get(); - Vec::from_raw_parts( - mem::replace(&mut urb.buffer, null_mut()), - urb.actual_length as usize, - urb.buffer_length as usize, - ) - } + fn fill(&mut self, v: Vec, len: usize, user_data: *mut c_void) { + let mut v = ManuallyDrop::new(v); + let urb = self.urb_mut(); + urb.buffer = v.as_mut_ptr(); + urb.buffer_length = len.try_into().expect("buffer size should fit in i32"); + urb.usercontext = user_data; + urb.actual_length = 0; + self.capacity = v.capacity(); } - /// Get the transfer status - /// SAFETY: requires that there is no concurrent access to `urb` - unsafe fn status(&self) -> TransferStatus { - let status = unsafe { (&*self.urb.get()).status }; + /// SAFETY: requires that the transfer has completed and `length` bytes are initialized + unsafe fn take_buf(&mut self, length: usize) -> Vec { + let urb = self.urb_mut(); + assert!(!urb.buffer.is_null()); + let ptr = mem::replace(&mut urb.buffer, null_mut()); + let capacity = mem::replace(&mut self.capacity, 0); + assert!(length <= capacity); + Vec::from_raw_parts(ptr, length, capacity) + } +} - if status == 0 { - return TransferStatus::Complete; - } - - // It's sometimes positive, sometimes negative, but rustix panics if negative. - match Errno::from_raw_os_error(status.abs()) { - Errno::NODEV | Errno::SHUTDOWN => TransferStatus::Disconnected, - Errno::PIPE => TransferStatus::Stall, - Errno::NOENT | Errno::CONNRESET => TransferStatus::Cancelled, - Errno::PROTO | Errno::ILSEQ | Errno::OVERFLOW | Errno::COMM | Errno::TIME => { - TransferStatus::Fault +impl Drop for TransferData { + fn drop(&mut self) { + unsafe { + if !self.urb_mut().buffer.is_null() { + drop(Vec::from_raw_parts(self.urb_mut().buffer, 0, self.capacity)); } - _ => TransferStatus::UnknownError, + drop(Box::from_raw(self.urb)); } } } -pub struct Transfer { - ptr: NonNull, -} +impl transfer_internal::Platform for super::Interface { + type TransferData = TransferData; -/// The transfer has not been submitted. The buffer is not valid. -const STATE_IDLE: u8 = 0; - -/// The transfer has been submitted to the kernel and completion has not yet -/// been handled. The buffer points to valid memory but cannot be accessed by -/// userspace. There is a future or queue waiting for it completion. -const STATE_PENDING: u8 = 1; - -/// Like PENDING, but there is no one waiting for completion. The completion -/// handler will drop the buffer and transfer. -const STATE_ABANDONED: u8 = 3; - -/// The transfer completion has been handled. The buffer is valid and may -/// be accessed. -const STATE_COMPLETED: u8 = 3; - -impl Transfer { - pub(crate) fn new(interface: Arc, endpoint: u8, ep_type: EndpointType) -> Transfer { + fn make_transfer_data(&self, endpoint: u8, ep_type: crate::EndpointType) -> TransferData { let ep_type = match ep_type { EndpointType::Control => USBDEVFS_URB_TYPE_CONTROL, EndpointType::Interrupt => USBDEVFS_URB_TYPE_INTERRUPT, @@ -111,8 +79,8 @@ impl Transfer { EndpointType::Isochronous => USBDEVFS_URB_TYPE_ISO, }; - let b = Box::new(TransferInner { - urb: UnsafeCell::new(Urb { + TransferData { + urb: Box::into_raw(Box::new(Urb { ep_type, endpoint, status: 0, @@ -125,118 +93,102 @@ impl Transfer { error_count: 0, signr: 0, usercontext: null_mut(), - }), - state: AtomicU8::new(STATE_IDLE), - waker: AtomicWaker::new(), - interface, - }); - - Transfer { - ptr: Box::leak(b).into(), + })), + capacity: 0, } } - fn inner(&self) -> &TransferInner { - // Safety: while Transfer is alive, its TransferInner is alive - unsafe { self.ptr.as_ref() } + fn cancel(&self, data: &TransferData) { + unsafe { + self.cancel_urb(data.urb); + } + } +} + +impl transfer_internal::PlatformSubmit> for super::Interface { + unsafe fn submit(&self, data: Vec, transfer: &mut TransferData, user_data: *mut c_void) { + let ep = transfer.urb_mut().endpoint; + let len = if ep & 0x80 == 0 { + data.len() + } else { + data.capacity() + }; + transfer.fill(data, len, user_data); + + // SAFETY: we just properly filled the buffer and it is not already pending + unsafe { self.submit_urb(transfer.urb) } } - /// Prepare the transfer for submission by filling the buffer fields - /// and setting the state to PENDING. Returns a `*mut TransferInner` - /// that must later be passed to `complete`. - /// - /// Panics if the transfer has already been submitted. - pub(crate) fn submit(&mut self, data: Vec) { - let inner = self.inner(); - assert_eq!( - inner.state.load(Ordering::Acquire), - STATE_IDLE, - "Transfer should be idle when submitted" + unsafe fn take_completed(transfer: &mut TransferData) -> Completion> { + let status = urb_status(transfer.urb_mut()); + let len = transfer.urb_mut().actual_length as usize; + + // SAFETY: transfer is completed (precondition) and `actual_length` bytes were initialized. + let data = unsafe { transfer.take_buf(len) }; + Completion { data, status } + } +} + +impl transfer_internal::PlatformSubmit for super::Interface { + unsafe fn submit(&self, data: ControlIn, transfer: &mut TransferData, user_data: *mut c_void) { + let buf_len = SETUP_PACKET_SIZE + data.length as usize; + let mut buf = Vec::with_capacity(buf_len); + buf.extend_from_slice(&data.setup_packet()); + transfer.fill(buf, buf_len, user_data); + + // SAFETY: we just properly filled the buffer and it is not already pending + unsafe { self.submit_urb(transfer.urb) } + } + + unsafe fn take_completed(transfer: &mut TransferData) -> Completion> { + let status = urb_status(transfer.urb_mut()); + let len = transfer.urb_mut().actual_length as usize; + + // SAFETY: transfer is completed (precondition) and `actual_length` + // bytes were initialized with setup buf in front + let mut data = unsafe { transfer.take_buf(SETUP_PACKET_SIZE + len) }; + data.splice(0..SETUP_PACKET_SIZE, []); + Completion { data, status } + } +} + +impl transfer_internal::PlatformSubmit> for super::Interface { + unsafe fn submit(&self, data: ControlOut, transfer: &mut TransferData, user_data: *mut c_void) { + let buf_len = SETUP_PACKET_SIZE + data.data.len(); + let mut buf = Vec::with_capacity(buf_len); + buf.extend_from_slice( + &data + .setup_packet() + .expect("data length should fit in setup packet's u16"), ); - unsafe { - // SAFETY: invariants guaranteed by being in state IDLE - inner.put_buffer(data); - } - inner.state.store(STATE_PENDING, Ordering::Release); - unsafe { - inner.interface.submit_transfer(self.ptr.as_ptr()); - } + buf.extend_from_slice(data.data); + transfer.fill(buf, buf_len, user_data); + + // SAFETY: we just properly filled the buffer and it is not already pending + unsafe { self.submit_urb(transfer.urb) } } - pub(crate) fn cancel(&mut self) { - let inner = self.inner(); - unsafe { - inner.interface.cancel_transfer(self.ptr.as_ptr()); - } - } - - pub fn poll_completion(&self, cx: &Context) -> Poll { - let inner = self.inner(); - inner.waker.register(cx.waker()); - match inner.state.load(Ordering::Acquire) { - STATE_PENDING => Poll::Pending, - STATE_COMPLETED => { - // SAFETY: state means we have exclusive access - // and the buffer is valid. - inner.state.store(STATE_IDLE, Ordering::Relaxed); - unsafe { - let data = inner.take_buffer(); - let status = inner.status(); - Poll::Ready(Completion { data, status }) - } - } - s => panic!("Polling transfer in unexpected state {s}"), - } - } - - pub(crate) unsafe fn notify_completion(transfer: *mut TransferInner) { - unsafe { - let waker = (*transfer).waker.take(); - match (*transfer).state.swap(STATE_COMPLETED, Ordering::Release) { - STATE_PENDING => { - if let Some(waker) = waker { - waker.wake() - } - } - STATE_ABANDONED => { - let b = Box::from_raw(transfer); - drop(b.take_buffer()); - drop(b); - } - s => panic!("Completing transfer in unexpected state {s}"), - } - } + unsafe fn take_completed(transfer: &mut TransferData) -> Completion { + let status = urb_status(transfer.urb_mut()); + let len = transfer.urb_mut().actual_length as usize; + drop(transfer.take_buf(0)); + Completion { data: len, status } } } -impl Drop for Transfer { - fn drop(&mut self) { - match self.inner().state.swap(STATE_ABANDONED, Ordering::Acquire) { - STATE_PENDING => { - self.cancel(); - /* handler responsible for dropping */ - } - STATE_IDLE => { - // SAFETY: state means there is no concurrent access - unsafe { drop(Box::from_raw(self.ptr.as_ptr())) } - } - STATE_COMPLETED => { - // SAFETY: state means buffer is valid and there is no concurrent access - unsafe { - let b = Box::from_raw(self.ptr.as_ptr()); - drop(b.take_buffer()); - drop(b); - } - } - s => panic!("Dropping transfer in unexpected state {s}"), +fn urb_status(urb: &Urb) -> TransferStatus { + if urb.status == 0 { + return TransferStatus::Complete; + } + + // It's sometimes positive, sometimes negative, but rustix panics if negative. + match Errno::from_raw_os_error(urb.status.abs()) { + Errno::NODEV | Errno::SHUTDOWN => TransferStatus::Disconnected, + Errno::PIPE => TransferStatus::Stall, + Errno::NOENT | Errno::CONNRESET => TransferStatus::Cancelled, + Errno::PROTO | Errno::ILSEQ | Errno::OVERFLOW | Errno::COMM | Errno::TIME => { + TransferStatus::Fault } - } -} - -impl Future for Transfer { - type Output = Completion; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().poll_completion(cx) + _ => TransferStatus::UnknownError, } } diff --git a/src/platform/linux_usbfs/usbfs.rs b/src/platform/linux_usbfs/usbfs.rs index 8b40e38..968823f 100644 --- a/src/platform/linux_usbfs/usbfs.rs +++ b/src/platform/linux_usbfs/usbfs.rs @@ -1,13 +1,13 @@ +//! Wrappers for the [usbfs] character device ioctls, translated from the +//! [C structures and ioctl definitions][uapi]. +//! [usbfs]: https://www.kernel.org/doc/html/latest/driver-api/usb/usb.html#the-usb-character-device-nodes +//! [uapi]: https://github.com/torvalds/linux/blob/master/tools/include/uapi/linux/usbdevice_fs.h + use std::{ ffi::{c_int, c_uchar, c_uint, c_void}, marker::PhantomData, }; -/// Wrappers for the [usbfs] character device ioctls, translated from the -/// [C structures and ioctl definitions][uapi]. -/// -/// [usbfs]: https://www.kernel.org/doc/html/latest/driver-api/usb/usb.html#the-usb-character-device-nodes -/// [uapi]: https://github.com/torvalds/linux/blob/master/tools/include/uapi/linux/usbdevice_fs.h use rustix::{ fd::AsFd, io, diff --git a/src/transfer.rs b/src/transfer.rs index e7ab4c8..e785b2d 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -1,11 +1,19 @@ -pub use crate::platform::Transfer; +use crate::{ + platform, + transfer_internal::{PlatformSubmit, TransferHandle, TransferRequest}, +}; +use std::{ + future::Future, + marker::PhantomData, + task::{Context, Poll}, +}; #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum EndpointType { - Control, - Interrupt, - Bulk, - Isochronous, + Control = 0, + Isochronous = 1, + Bulk = 2, + Interrupt = 3, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -19,7 +27,33 @@ pub enum TransferStatus { } #[derive(Debug, Clone)] -pub struct Completion { - pub data: Vec, +pub struct Completion { + pub data: T, pub status: TransferStatus, } + +pub struct TransferFuture { + transfer: TransferHandle, + ty: PhantomData, +} + +impl TransferFuture { + pub(crate) fn new(transfer: TransferHandle) -> TransferFuture { + TransferFuture { + transfer, + ty: PhantomData, + } + } +} + +impl Future for TransferFuture +where + platform::Interface: PlatformSubmit, + D::Response: Unpin, +{ + type Output = Completion; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().transfer.poll_completion::(cx) + } +} diff --git a/src/transfer_internal.rs b/src/transfer_internal.rs new file mode 100644 index 0000000..e81c378 --- /dev/null +++ b/src/transfer_internal.rs @@ -0,0 +1,227 @@ +use std::{ + cell::UnsafeCell, + ffi::c_void, + ptr::NonNull, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use atomic_waker::AtomicWaker; + +use crate::{ + control::{ControlIn, ControlOut}, + Completion, EndpointType, +}; + +pub(crate) trait Platform { + /// Platform-specific per-transfer data. + type TransferData: Send; + + /// Get a `TransferData`. + fn make_transfer_data(&self, endpoint: u8, ep_type: EndpointType) -> Self::TransferData; + + /// Request cancellation of a transfer that may or may not currently be + /// pending. + fn cancel(&self, transfer: &Self::TransferData); +} + +pub trait TransferRequest { + type Response; +} + +pub(crate) trait PlatformSubmit: Platform { + /// Fill the transfer with the data from `data` and submit it to the kernel. + /// Arrange for `notify_completion(transfer)` to be called once the transfer + /// has completed. + /// + /// SAFETY(caller): transfer is in an idle state + unsafe fn submit(&self, data: D, platform: &mut Self::TransferData, transfer: *mut c_void); + + /// SAFETY(caller): `transfer` is in a completed state + unsafe fn take_completed(transfer: &mut Self::TransferData) -> Completion; +} + +impl TransferRequest for Vec { + type Response = Vec; +} + +impl TransferRequest for ControlIn { + type Response = Vec; +} + +impl TransferRequest for ControlOut<'_> { + type Response = usize; +} + +struct TransferInner { + /// Platform-specific data. + /// + /// In an `UnsafeCell` because we provide `&mut` when the + /// state gurantees us exclusive access + platform_data: UnsafeCell, + + /// One of the `STATE_*` constants below, used to synchronize + /// the state. + state: AtomicU8, + + /// Waker that is notified when transfer completes. + waker: AtomicWaker, + + /// Platform + interface: Arc

, +} + +/// Handle to a transfer. +/// +/// Cancels the transfer and arranges for memory to be freed +/// when dropped. +pub(crate) struct TransferHandle { + ptr: NonNull>, +} + +unsafe impl Send for TransferHandle

{} +unsafe impl Sync for TransferHandle

{} + +/// The transfer has not been submitted. The buffer is not valid. +const STATE_IDLE: u8 = 0; + +/// The transfer has been or is about to be submitted to the kernel and +/// completion has not yet been handled. The buffer points to valid memory but +/// cannot necessarily be accessed by userspace. There is a future or queue +/// waiting for it completion. +const STATE_PENDING: u8 = 1; + +/// Like PENDING, but there is no one waiting for completion. The completion +/// handler will drop the buffer and transfer. +const STATE_ABANDONED: u8 = 3; + +/// The transfer completion has been handled on the event loop thread. The +/// buffer is valid and may be accessed by the `TransferHandle`. +const STATE_COMPLETED: u8 = 3; + +impl TransferHandle

{ + /// Create a new transfer and get a handle. + pub(crate) fn new(interface: Arc

, endpoint: u8, ep_type: EndpointType) -> TransferHandle

{ + let b = Box::new(TransferInner { + platform_data: UnsafeCell::new(interface.make_transfer_data(endpoint, ep_type)), + state: AtomicU8::new(STATE_IDLE), + waker: AtomicWaker::new(), + interface, + }); + + TransferHandle { + ptr: Box::leak(b).into(), + } + } + + fn inner(&self) -> &TransferInner

{ + // SAFETY: while `TransferHandle` is alive, its `TransferInner` is alive + // (it may be shared by `notify_completion` on the event thread, so can't be &mut) + unsafe { self.ptr.as_ref() } + } + + fn platform_data(&self) -> &P::TransferData { + // SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data` + // is via this `TransferHandle`. + unsafe { &*self.inner().platform_data.get() } + } + + pub(crate) fn submit(&mut self, data: D) + where + D: TransferRequest, + P: PlatformSubmit, + { + let inner = self.inner(); + + // It's the syscall that submits the transfer that actually performs the + // release ordering. + let prev = self.inner().state.swap(STATE_PENDING, Ordering::Relaxed); + assert_eq!(prev, STATE_IDLE, "Transfer should be idle when submitted"); + + // SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data` + // is via this `TransferHandle`. Verified that it is idle. + unsafe { + inner.interface.submit( + data, + &mut *inner.platform_data.get(), + self.ptr.as_ptr() as *mut c_void, + ); + } + } + + pub(crate) fn cancel(&mut self) { + self.inner().interface.cancel(self.platform_data()); + } + + fn poll_completion_generic(&mut self, cx: &Context) -> Poll<&mut P::TransferData> { + let inner = self.inner(); + inner.waker.register(cx.waker()); + match inner.state.load(Ordering::Acquire) { + STATE_PENDING => Poll::Pending, + STATE_COMPLETED => { + // Relaxed because this doesn't synchronize with anything, + // just marks that we no longer need to drop the buffer + inner.state.store(STATE_IDLE, Ordering::Relaxed); + + // SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data` + // is via this `TransferHandle`. + Poll::Ready(unsafe { &mut *inner.platform_data.get() }) + } + s => panic!("Polling transfer in unexpected state {s}"), + } + } + + pub fn poll_completion( + &mut self, + cx: &Context, + ) -> Poll> + where + D: TransferRequest, + P: PlatformSubmit, + { + // SAFETY: `poll_completion_generic` checks that it is completed + self.poll_completion_generic(cx) + .map(|u| unsafe { P::take_completed(u) }) + } +} + +impl Drop for TransferHandle

{ + fn drop(&mut self) { + match self.inner().state.swap(STATE_ABANDONED, Ordering::Acquire) { + STATE_PENDING => { + self.cancel(); + /* handler responsible for dropping */ + } + STATE_IDLE | STATE_COMPLETED => { + // SAFETY: state means there is no concurrent access + unsafe { drop(Box::from_raw(self.ptr.as_ptr())) } + } + s => panic!("Dropping transfer in unexpected state {s}"), + } + } +} + +/// Notify that a transfer has completed. +/// +/// SAFETY: `transfer` must be a pointer previously passed to `submit`, and +/// the caller / kernel must no longer dereference it or its buffer. +pub(crate) unsafe fn notify_completion(transfer: *mut c_void) { + unsafe { + let transfer = transfer as *mut TransferInner

; + let waker = (*transfer).waker.take(); + match (*transfer).state.swap(STATE_COMPLETED, Ordering::Release) { + STATE_PENDING => { + if let Some(waker) = waker { + waker.wake() + } + } + STATE_ABANDONED => { + drop(Box::from_raw(transfer)); + } + s => panic!("Completing transfer in unexpected state {s}"), + } + } +}