diff --git a/Cargo.toml b/Cargo.toml index b7d5706..06976a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ env_logger = "0.10.0" futures-lite = "1.13.0" [target.'cfg(any(target_os="linux", target_os="android"))'.dependencies] -rustix = { version = "1.0.1", features = ["fs", "event", "net"] } +rustix = { version = "1.0.1", features = ["fs", "event", "net", "time"] } linux-raw-sys = { version = "0.9.2", features = ["ioctl"] } [target.'cfg(target_os="windows")'.dependencies] diff --git a/examples/blocking.rs b/examples/blocking.rs deleted file mode 100644 index 7e3f9c1..0000000 --- a/examples/blocking.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::time::Duration; - -use nusb::{ - transfer::{Control, ControlType, Recipient}, - MaybeFuture, -}; - -fn main() { - env_logger::init(); - let di = nusb::list_devices() - .wait() - .unwrap() - .find(|d| d.vendor_id() == 0x59e3 && d.product_id() == 0x0a23) - .expect("device should be connected"); - - println!("Device info: {di:?}"); - - let device = di.open().wait().unwrap(); - - // Linux can make control transfers without claiming an interface - #[cfg(any(target_os = "linux", target_os = "macos"))] - { - let result = device.control_out_blocking( - Control { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - }, - &[1, 2, 3, 4], - Duration::from_secs(1), - ); - println!("{result:?}"); - - let mut buf = [0; 64]; - - let len = device - .control_in_blocking( - Control { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - }, - &mut buf, - Duration::from_secs(1), - ) - .unwrap(); - - println!("{result:?}, {data:?}", data = &buf[..len]); - } - - // but we also provide an API on the `Interface` to support Windows - let interface = device.claim_interface(0).wait().unwrap(); - - let result = interface.control_out_blocking( - Control { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - }, - &[1, 2, 3, 4, 5], - Duration::from_secs(1), - ); - println!("{result:?}"); - - let mut buf = [0; 64]; - - let len = interface - .control_in_blocking( - Control { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - }, - &mut buf, - Duration::from_secs(1), - ) - .unwrap(); - println!("{data:?}", data = &buf[..len]); -} diff --git a/examples/bulk.rs b/examples/bulk.rs index 97a9a36..2b45821 100644 --- a/examples/bulk.rs +++ b/examples/bulk.rs @@ -1,5 +1,8 @@ use futures_lite::future::block_on; -use nusb::{transfer::RequestBuffer, MaybeFuture}; +use nusb::{ + transfer::{Bulk, In, Out}, + MaybeFuture, +}; fn main() { env_logger::init(); @@ -13,20 +16,22 @@ fn main() { let device = di.open().wait().unwrap(); let interface = device.claim_interface(0).wait().unwrap(); + let mut ep_out = interface.endpoint::(0x02).unwrap(); + let mut ep_in = interface.endpoint::(0x81).unwrap(); - block_on(interface.bulk_out(0x02, Vec::from([1, 2, 3, 4, 5]))) - .into_result() - .unwrap(); - - let mut queue = interface.bulk_in_queue(0x81); + let mut transfer = ep_out.allocate(64); + transfer.extend_from_slice(&[1, 2, 3, 4, 5]); + ep_out.submit(transfer); + block_on(ep_out.next_complete()).status().unwrap(); loop { - while queue.pending() < 8 { - queue.submit(RequestBuffer::new(256)); + while ep_in.pending() < 8 { + let transfer = ep_in.allocate(256); + ep_in.submit(transfer); } - let result = block_on(queue.next_complete()); + let result = block_on(ep_in.next_complete()); println!("{result:?}"); - if result.status.is_err() { + if result.status().is_err() { break; } } diff --git a/examples/control.rs b/examples/control.rs index a129fe5..ced08fd 100644 --- a/examples/control.rs +++ b/examples/control.rs @@ -1,4 +1,5 @@ -use futures_lite::future::block_on; +use std::time::Duration; + use nusb::{ transfer::{ControlIn, ControlOut, ControlType, Recipient}, MaybeFuture, @@ -19,47 +20,67 @@ fn main() { // Linux can make control transfers without claiming an interface #[cfg(any(target_os = "linux", target_os = "macos"))] { - let result = block_on(device.control_out(ControlOut { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - data: &[1, 2, 3, 4], - })); + let result = device + .control_out( + ControlOut { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 0x81, + value: 0x9999, + index: 0x9999, + data: &[1, 2, 3, 4], + }, + Duration::from_millis(100), + ) + .wait(); println!("{result:?}"); - let result = block_on(device.control_in(ControlIn { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - length: 256, - })); + let result = device + .control_in( + ControlIn { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 0x81, + value: 0x9999, + index: 0x9999, + length: 256, + }, + Duration::from_millis(100), + ) + .wait(); println!("{result:?}"); } // but we also provide an API on the `Interface` to support Windows let interface = device.claim_interface(0).wait().unwrap(); - let result = block_on(interface.control_out(ControlOut { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - data: &[1, 2, 3, 4], - })); + let result = interface + .control_out( + ControlOut { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 0x81, + value: 0x9999, + index: 0x9999, + data: &[1, 2, 3, 4], + }, + Duration::from_millis(100), + ) + .wait(); println!("{result:?}"); - let result = block_on(interface.control_in(ControlIn { - control_type: ControlType::Vendor, - recipient: Recipient::Device, - request: 0x81, - value: 0x9999, - index: 0x9999, - length: 256, - })); + let result = interface + .control_in( + ControlIn { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 0x81, + value: 0x9999, + index: 0x9999, + length: 256, + }, + Duration::from_millis(100), + ) + .wait(); println!("{result:?}"); } diff --git a/examples/string_descriptors.rs b/examples/string_descriptors.rs index ba397f9..bb6cd93 100644 --- a/examples/string_descriptors.rs +++ b/examples/string_descriptors.rs @@ -33,6 +33,7 @@ fn inspect_device(dev: DeviceInfo) { let languages: Vec = dev .get_string_descriptor_supported_languages(timeout) + .wait() .map(|i| i.collect()) .unwrap_or_default(); println!(" Languages: {languages:02x?}"); @@ -40,17 +41,23 @@ fn inspect_device(dev: DeviceInfo) { let language = languages.first().copied().unwrap_or(US_ENGLISH); if let Some(i_manufacturer) = dev_descriptor.manufacturer_string_index() { - let s = dev.get_string_descriptor(i_manufacturer, language, timeout); + let s = dev + .get_string_descriptor(i_manufacturer, language, timeout) + .wait(); println!(" Manufacturer({i_manufacturer}): {s:?}"); } if let Some(i_product) = dev_descriptor.product_string_index() { - let s = dev.get_string_descriptor(i_product, language, timeout); + let s = dev + .get_string_descriptor(i_product, language, timeout) + .wait(); println!(" Product({i_product}): {s:?}"); } if let Some(i_serial) = dev_descriptor.serial_number_string_index() { - let s = dev.get_string_descriptor(i_serial, language, timeout); + let s = dev + .get_string_descriptor(i_serial, language, timeout) + .wait(); println!(" Serial({i_serial}): {s:?}"); } diff --git a/src/bitset.rs b/src/bitset.rs new file mode 100644 index 0000000..ac9dce1 --- /dev/null +++ b/src/bitset.rs @@ -0,0 +1,26 @@ +/// Bitset capable of storing 0x00..=0x0f and 0x80..=0x8f +#[derive(Default, Clone, Copy)] +pub struct EndpointBitSet(u32); + +impl EndpointBitSet { + pub fn mask(ep: u8) -> u32 { + let bit = ((ep & 0x0f) << 1) | (ep >> 7); + 1 << bit + } + + pub fn is_set(&self, bit: u8) -> bool { + self.0 & Self::mask(bit) != 0 + } + + pub fn is_empty(&self) -> bool { + self.0 == 0 + } + + pub fn set(&mut self, bit: u8) { + self.0 |= Self::mask(bit) + } + + pub fn clear(&mut self, bit: u8) { + self.0 &= !Self::mask(bit) + } +} diff --git a/src/descriptors.rs b/src/descriptors.rs index 17dec44..b819277 100644 --- a/src/descriptors.rs +++ b/src/descriptors.rs @@ -13,10 +13,7 @@ use std::{ use log::warn; -use crate::{ - transfer::{Direction, TransferType}, - Error, -}; +use crate::{transfer::Direction, Error}; pub(crate) const DESCRIPTOR_TYPE_DEVICE: u8 = 0x01; pub(crate) const DESCRIPTOR_LEN_DEVICE: u8 = 18; @@ -696,6 +693,23 @@ impl<'a> Debug for EndpointDescriptor<'a> { } } +/// Endpoint type. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[allow(dead_code)] +pub enum TransferType { + /// Control endpoint. + Control = 0, + + /// Isochronous endpoint. + Isochronous = 1, + + /// Bulk endpoint. + Bulk = 2, + + /// Interrupt endpoint. + Interrupt = 3, +} + /// Error from [`crate::Device::active_configuration`] #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct ActiveConfigurationError { diff --git a/src/device.rs b/src/device.rs index 9eff6ad..d88d2c8 100644 --- a/src/device.rs +++ b/src/device.rs @@ -5,13 +5,26 @@ use crate::{ }, platform, transfer::{ - Control, ControlIn, ControlOut, Queue, RequestBuffer, TransferError, TransferFuture, - TransferType, + BulkOrInterrupt, ControlIn, ControlOut, Direction, EndpointDirection, EndpointType, In, + Out, TransferError, }, + util::write_copy_of_slice, DeviceInfo, Error, MaybeFuture, Speed, }; +use core::slice; use log::error; -use std::{io::ErrorKind, num::NonZeroU8, sync::Arc, time::Duration}; +use std::{ + fmt::Debug, + future::{poll_fn, Future}, + io::ErrorKind, + marker::PhantomData, + mem::MaybeUninit, + num::NonZeroU8, + ops::{Deref, DerefMut}, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; /// An opened USB device. /// @@ -173,11 +186,12 @@ impl Device { desc_index: u8, language_id: u16, timeout: Duration, - ) -> Result, Error> { + ) -> impl MaybeFuture, Error>> { #[cfg(target_os = "windows")] { let _ = timeout; self.backend + .clone() .get_descriptor(desc_type, desc_index, language_id) } @@ -186,21 +200,17 @@ impl Device { const STANDARD_REQUEST_GET_DESCRIPTOR: u8 = 0x06; use crate::transfer::{ControlType, Recipient}; - let mut buf = vec![0; 4096]; - let len = self.control_in_blocking( - Control { + self.control_in( + ControlIn { control_type: ControlType::Standard, recipient: Recipient::Device, request: STANDARD_REQUEST_GET_DESCRIPTOR, value: ((desc_type as u16) << 8) | desc_index as u16, index: language_id, + length: 4096, }, - &mut buf, timeout, - )?; - - buf.truncate(len); - Ok(buf) + ) } } @@ -212,22 +222,24 @@ impl Device { pub fn get_string_descriptor_supported_languages( &self, timeout: Duration, - ) -> Result, Error> { - let data = self.get_descriptor(DESCRIPTOR_TYPE_STRING, 0, 0, timeout)?; + ) -> impl MaybeFuture, Error>> { + self.get_descriptor(DESCRIPTOR_TYPE_STRING, 0, 0, timeout) + .map(move |r| { + let data = r?; + if !validate_string_descriptor(&data) { + error!("String descriptor language list read {data:?}, not a valid string descriptor"); + return Err(Error::new( + ErrorKind::InvalidData, + "string descriptor data was invalid", + )); + } - if !validate_string_descriptor(&data) { - error!("String descriptor language list read {data:?}, not a valid string descriptor"); - return Err(Error::new( - ErrorKind::InvalidData, - "string descriptor data was invalid", - )); - } - - //TODO: Use array_chunks once stable - let mut iter = data.into_iter().skip(2); - Ok(std::iter::from_fn(move || { - Some(u16::from_le_bytes([iter.next()?, iter.next()?])) - })) + //TODO: Use array_chunks once stable + let mut iter = data.into_iter().skip(2); + Ok(std::iter::from_fn(move || { + Some(u16::from_le_bytes([iter.next()?, iter.next()?])) + })) + }) } /// Request a string descriptor from the device. @@ -244,16 +256,19 @@ impl Device { desc_index: NonZeroU8, language_id: u16, timeout: Duration, - ) -> Result { - let data = self.get_descriptor( + ) -> impl MaybeFuture> { + self.get_descriptor( DESCRIPTOR_TYPE_STRING, desc_index.get(), language_id, timeout, - )?; - - decode_string_descriptor(&data) - .map_err(|_| Error::new(ErrorKind::InvalidData, "string descriptor data was invalid")) + ) + .map(|r| { + let data = r?; + decode_string_descriptor(&data).map_err(|_| { + Error::new(ErrorKind::InvalidData, "string descriptor data was invalid") + }) + }) } /// Reset the device, forcing it to re-enumerate. @@ -267,47 +282,12 @@ impl Device { self.backend.clone().reset() } - /// Synchronously perform a single **IN (device-to-host)** transfer on the default **control** endpoint. - /// - /// ### Platform-specific notes - /// - /// * Not supported on Windows. You must [claim an interface][`Device::claim_interface`] - /// and use the interface handle to submit transfers. - /// * On Linux, this takes a device-wide lock, so if you have multiple threads, you - /// are better off using the async methods. - #[cfg(any(target_os = "linux", target_os = "macos", target_os = "android"))] - pub fn control_in_blocking( - &self, - control: Control, - data: &mut [u8], - timeout: Duration, - ) -> Result { - self.backend.control_in_blocking(control, data, timeout) - } - - /// Synchronously perform a single **OUT (host-to-device)** transfer on the default **control** endpoint. - /// - /// ### Platform-specific notes - /// - /// * Not supported on Windows. You must [claim an interface][`Device::claim_interface`] - /// and use the interface handle to submit transfers. - /// * On Linux, this takes a device-wide lock, so if you have multiple threads, you - /// are better off using the async methods. - #[cfg(any(target_os = "linux", target_os = "macos", target_os = "android"))] - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - self.backend.control_out_blocking(control, data, timeout) - } - /// Asynchronously submit a single **IN (device-to-host)** transfer on the default **control** endpoint. /// /// ### Example /// /// ```no_run + /// use std::time::Duration; /// use futures_lite::future::block_on; /// use nusb::transfer::{ ControlIn, ControlType, Recipient }; /// # use nusb::MaybeFuture; @@ -315,14 +295,14 @@ impl Device { /// # let di = nusb::list_devices().wait().unwrap().next().unwrap(); /// # let device = di.open().wait().unwrap(); /// - /// let data: Vec = block_on(device.control_in(ControlIn { + /// let data: Vec = device.control_in(ControlIn { /// control_type: ControlType::Vendor, /// recipient: Recipient::Device, /// request: 0x30, /// value: 0x0, /// index: 0x0, /// length: 64, - /// })).into_result()?; + /// }, Duration::from_millis(100)).wait()?; /// # Ok(()) } /// ``` /// @@ -331,10 +311,12 @@ impl Device { /// * Not supported on Windows. You must [claim an interface][`Device::claim_interface`] /// and use the interface handle to submit transfers. #[cfg(any(target_os = "linux", target_os = "macos", target_os = "android"))] - pub fn control_in(&self, data: ControlIn) -> TransferFuture { - let mut t = self.backend.make_control_transfer(); - t.submit::(data); - TransferFuture::new(t) + pub fn control_in( + &self, + data: ControlIn, + timeout: Duration, + ) -> impl MaybeFuture, Error>> { + self.backend.clone().control_in(data, timeout) } /// Submit a single **OUT (host-to-device)** transfer on the default **control** endpoint. @@ -342,6 +324,7 @@ impl Device { /// ### Example /// /// ```no_run + /// use std::time::Duration; /// use futures_lite::future::block_on; /// use nusb::transfer::{ ControlOut, ControlType, Recipient }; /// # use nusb::MaybeFuture; @@ -349,14 +332,14 @@ impl Device { /// # let di = nusb::list_devices().wait().unwrap().next().unwrap(); /// # let device = di.open().wait().unwrap(); /// - /// block_on(device.control_out(ControlOut { + /// device.control_out(ControlOut { /// control_type: ControlType::Vendor, /// recipient: Recipient::Device, /// request: 0x32, /// value: 0x0, /// index: 0x0, /// data: &[0x01, 0x02, 0x03, 0x04], - /// })).into_result()?; + /// }, Duration::from_millis(100)).wait()?; /// # Ok(()) } /// ``` /// @@ -365,10 +348,12 @@ impl Device { /// * Not supported on Windows. You must [claim an interface][`Device::claim_interface`] /// and use the interface handle to submit transfers. #[cfg(any(target_os = "linux", target_os = "macos", target_os = "android"))] - pub fn control_out(&self, data: ControlOut) -> TransferFuture { - let mut t = self.backend.make_control_transfer(); - t.submit::(data); - TransferFuture::new(t) + pub fn control_out( + &self, + data: ControlOut, + timeout: Duration, + ) -> impl MaybeFuture> { + self.backend.clone().control_out(data, timeout) } } @@ -388,6 +373,7 @@ impl Interface { pub(crate) fn wrap(backend: Arc) -> Self { Interface { backend } } + /// Select the alternate setting of this interface. /// /// An alternate setting is a mode of the interface that makes particular endpoints available @@ -402,51 +388,12 @@ impl Interface { self.backend.get_alt_setting() } - /// Synchronously perform a single **IN (device-to-host)** transfer on the default **control** endpoint. - /// - /// ### Platform-specific notes - /// - /// * On Linux, this takes a device-wide lock, so if you have multiple - /// threads, you are better off using the async methods. - /// * On Windows, if the `recipient` is `Interface`, the WinUSB driver sends - /// the interface number in the least significant byte of `index`, - /// overriding any value passed. A warning is logged if the passed `index` - /// least significant byte differs from the interface number, and this may - /// become an error in the future. - pub fn control_in_blocking( - &self, - control: Control, - data: &mut [u8], - timeout: Duration, - ) -> Result { - self.backend.control_in_blocking(control, data, timeout) - } - - /// Synchronously perform a single **OUT (host-to-device)** transfer on the default **control** endpoint. - /// - /// ### Platform-specific notes - /// - /// * On Linux, this takes a device-wide lock, so if you have multiple - /// threads, you are better off using the async methods. - /// * On Windows, if the `recipient` is `Interface`, the WinUSB driver sends - /// the interface number in the least significant byte of `index`, - /// overriding any value passed. A warning is logged if the passed `index` - /// least significant byte differs from the interface number, and this may - /// become an error in the future. - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - self.backend.control_out_blocking(control, data, timeout) - } - /// Submit a single **IN (device-to-host)** transfer on the default **control** endpoint. /// /// ### Example /// /// ```no_run + /// use std::time::Duration; /// use futures_lite::future::block_on; /// use nusb::transfer::{ ControlIn, ControlType, Recipient }; /// # use nusb::MaybeFuture; @@ -455,14 +402,14 @@ impl Interface { /// # let device = di.open().wait().unwrap(); /// # let interface = device.claim_interface(0).wait().unwrap(); /// - /// let data: Vec = block_on(interface.control_in(ControlIn { + /// let data: Vec = interface.control_in(ControlIn { /// control_type: ControlType::Vendor, /// recipient: Recipient::Device, /// request: 0x30, /// value: 0x0, /// index: 0x0, /// length: 64, - /// })).into_result()?; + /// }, Duration::from_millis(100)).wait()?; /// # Ok(()) } /// ``` /// @@ -472,10 +419,12 @@ impl Interface { /// overriding any value passed. A warning is logged if the passed `index` /// least significant byte differs from the interface number, and this may /// become an error in the future. - pub fn control_in(&self, data: ControlIn) -> TransferFuture { - let mut t = self.backend.make_transfer(0, TransferType::Control); - t.submit::(data); - TransferFuture::new(t) + pub fn control_in( + &self, + data: ControlIn, + timeout: Duration, + ) -> impl MaybeFuture, Error>> { + self.backend.clone().control_in(data, timeout) } /// Submit a single **OUT (host-to-device)** transfer on the default **control** endpoint. @@ -483,6 +432,7 @@ impl Interface { /// ### Example /// /// ```no_run + /// use std::time::Duration; /// use futures_lite::future::block_on; /// use nusb::transfer::{ ControlOut, ControlType, Recipient }; /// # use nusb::MaybeFuture; @@ -491,14 +441,14 @@ impl Interface { /// # let device = di.open().wait().unwrap(); /// # let interface = device.claim_interface(0).wait().unwrap(); /// - /// block_on(interface.control_out(ControlOut { + /// interface.control_out(ControlOut { /// control_type: ControlType::Vendor, /// recipient: Recipient::Device, /// request: 0x32, /// value: 0x0, /// index: 0x0, /// data: &[0x01, 0x02, 0x03, 0x04], - /// })).into_result()?; + /// }, Duration::from_millis(100)).wait()?; /// # Ok(()) } /// ``` /// @@ -508,94 +458,12 @@ impl Interface { /// overriding any value passed. A warning is logged if the passed `index` /// least significant byte differs from the interface number, and this may /// become an error in the future. - pub fn control_out(&self, data: ControlOut) -> TransferFuture { - let mut t = self.backend.make_transfer(0, TransferType::Control); - t.submit::(data); - TransferFuture::new(t) - } - - /// Submit a single **IN (device-to-host)** transfer on the specified **bulk** endpoint. - /// - /// * The requested length must be a multiple of the endpoint's maximum packet size - /// * An IN endpoint address must have the top (`0x80`) bit set. - pub fn bulk_in(&self, endpoint: u8, buf: RequestBuffer) -> TransferFuture { - let mut t = self.backend.make_transfer(endpoint, TransferType::Bulk); - t.submit(buf); - TransferFuture::new(t) - } - - /// Submit a single **OUT (host-to-device)** transfer on the specified **bulk** endpoint. - /// - /// * An OUT endpoint address must have the top (`0x80`) bit clear. - pub fn bulk_out(&self, endpoint: u8, buf: Vec) -> TransferFuture> { - let mut t = self.backend.make_transfer(endpoint, TransferType::Bulk); - t.submit(buf); - TransferFuture::new(t) - } - - /// Create a queue for managing multiple **IN (device-to-host)** transfers on a **bulk** endpoint. - /// - /// * An IN endpoint address must have the top (`0x80`) bit set. - pub fn bulk_in_queue(&self, endpoint: u8) -> Queue { - Queue::new(self.backend.clone(), endpoint, TransferType::Bulk) - } - - /// Create a queue for managing multiple **OUT (host-to-device)** transfers on a **bulk** endpoint. - /// - /// * An OUT endpoint address must have the top (`0x80`) bit clear. - pub fn bulk_out_queue(&self, endpoint: u8) -> Queue> { - Queue::new(self.backend.clone(), endpoint, TransferType::Bulk) - } - - /// Submit a single **IN (device-to-host)** transfer on the specified **interrupt** endpoint. - /// - /// * The requested length must be a multiple of the endpoint's maximum packet size - /// * An IN endpoint address must have the top (`0x80`) bit set. - pub fn interrupt_in(&self, endpoint: u8, buf: RequestBuffer) -> TransferFuture { - let mut t = self - .backend - .make_transfer(endpoint, TransferType::Interrupt); - t.submit(buf); - TransferFuture::new(t) - } - - /// Submit a single **OUT (host-to-device)** transfer on the specified **interrupt** endpoint. - /// - /// * An OUT endpoint address must have the top (`0x80`) bit clear. - pub fn interrupt_out(&self, endpoint: u8, buf: Vec) -> TransferFuture> { - let mut t = self - .backend - .make_transfer(endpoint, TransferType::Interrupt); - t.submit(buf); - TransferFuture::new(t) - } - - /// Create a queue for managing multiple **IN (device-to-host)** transfers on an **interrupt** endpoint. - /// - /// * An IN endpoint address must have the top (`0x80`) bit set. - pub fn interrupt_in_queue(&self, endpoint: u8) -> Queue { - Queue::new(self.backend.clone(), endpoint, TransferType::Interrupt) - } - - /// Create a queue for managing multiple **OUT (device-to-host)** transfers on an **interrupt** endpoint. - /// - /// * An OUT endpoint address must have the top (`0x80`) bit clear. - pub fn interrupt_out_queue(&self, endpoint: u8) -> Queue> { - Queue::new(self.backend.clone(), endpoint, TransferType::Interrupt) - } - - /// Clear a bulk or interrupt endpoint's halt / stall condition. - /// - /// Sends a `CLEAR_FEATURE` `ENDPOINT_HALT` control transfer to tell the - /// device to reset the endpoint's data toggle and clear the halt / stall - /// condition, and resets the host-side data toggle. - /// - /// Use this after receiving [`TransferError::Stall`] to clear the error and - /// resume use of the endpoint. - /// - /// This should not be called when transfers are pending on the endpoint. - pub fn clear_halt(&self, endpoint: u8) -> impl MaybeFuture> { - self.backend.clone().clear_halt(endpoint) + pub fn control_out( + &self, + data: ControlOut, + timeout: Duration, + ) -> impl MaybeFuture> { + self.backend.clone().control_out(data, timeout) } /// Get the interface number. @@ -626,11 +494,395 @@ impl Interface { self.descriptors() .find(|i| i.alternate_setting() == self.get_alt_setting()) } + + /// Open an endpoint. + pub fn endpoint( + &self, + address: u8, + ) -> Result, ClaimEndpointError> { + let intf_desc = self.descriptor(); + let ep_desc = + intf_desc.and_then(|desc| desc.endpoints().find(|ep| ep.address() == address)); + let Some(ep_desc) = ep_desc else { + return Err(ClaimEndpointError::InvalidAddress); + }; + + if ep_desc.transfer_type() != EpType::TYPE || address & Direction::MASK != Dir::DIR as u8 { + return Err(ClaimEndpointError::InvalidType); + } + + let backend = self.backend.endpoint(ep_desc)?; + Ok(Endpoint { + backend, + ep_type: PhantomData, + ep_dir: PhantomData, + }) + } +} + +/// Error from [`Interface::endpoint`]. +#[non_exhaustive] +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ClaimEndpointError { + /// The specified address does not exist on this interface and alternate setting + InvalidAddress, + + /// The type or direction does not match the endpoint descriptor for this address + InvalidType, + + /// The endpoint is already open + Busy, +} + +/// Exclusive access to an endpoint of a USB device. +/// +/// Obtain an `Endpoint` with the [`Interface::endpoint`] method. +pub struct Endpoint { + backend: platform::Endpoint, + ep_type: PhantomData, + ep_dir: PhantomData, +} + +impl Endpoint { + /// Get the endpoint address. + pub fn endpoint_address(&self) -> u8 { + self.backend.endpoint_address() + } +} + +/// Methods for Bulk and Interrupt endpoints. +impl Endpoint { + /// Get the maximum packet size for this endpoint. + /// + /// Transfers can consist of multiple packets, but are split into packets + /// of this size when transmitted. + pub fn max_packet_size(&self) -> usize { + self.backend.max_packet_size + } + + /// Create a transfer with a buffer of `len` bytes. + /// + /// `len` is rounded up to a multiple of `max_packet_size`. + /// + /// For an `IN` endpoint, the request length defaults to `len`. For an `OUT` + /// endpoint, `len` is the capacity which can be written to the `Request` + /// before submitting it. + pub fn allocate(&mut self, len: usize) -> Request { + let len = len.div_ceil(self.max_packet_size()) * self.max_packet_size(); + Request { + transfer: self.backend.make_transfer(len), + _phantom: PhantomData, + } + } + + /// Begin a transfer on the endpoint. + /// + /// Submitted transfers are queued and completed in order. Once the transfer + /// completes, it will be returned from [`Self::next_complete`]. Any error + /// in submitting or performing the transfer is deferred until + /// [`next_complete`][`Self::next_complete`]. + pub fn submit(&mut self, transfer: Request) { + self.backend.submit(transfer.transfer) + } + + /// Return a `Future` that waits for the next pending transfer to complete. + /// + /// This future is cancel-safe: it can be cancelled and re-created without + /// side effects, enabling its use in `select!{}` or similar. + /// + /// ## Panics + /// * if there are no transfers pending (that is, if [`Self::pending()`] + /// would return 0). + pub fn next_complete( + &mut self, + ) -> impl Future> + Send + Sync + '_ { + poll_fn(|cx| self.poll_next_complete(cx)) + } + + /// Poll for a pending transfer completion. + /// + /// Returns a completed transfer if one is available, or arranges for the + /// context's waker to be notified when a transfer completes. + /// + /// ## Panics + /// * if there are no transfers pending (that is, if [`Self::pending()`] + /// would return 0). + pub fn poll_next_complete(&mut self, cx: &mut Context<'_>) -> Poll> { + self.backend + .poll_next_complete(cx) + .map(|transfer| Completion { + transfer, + _phantom: PhantomData, + }) + } + + /// Get the number of transfers that have been submitted with `submit` that + /// have not yet been returned from `next_complete`. + pub fn pending(&self) -> usize { + self.backend.pending() + } + + /// Request cancellation of all pending transfers. + /// + /// The transfers are cancelled asynchronously. Once cancelled, they will be + /// returned from calls to `next_complete` so you can tell which were + /// completed, partially-completed, or cancelled. + pub fn cancel_all(&mut self) { + self.backend.cancel_all() + } + + /// Clear the endpoint's halt / stall condition. + /// + /// Sends a `CLEAR_FEATURE` `ENDPOINT_HALT` control transfer to tell the + /// device to reset the endpoint's data toggle and clear the halt / stall + /// condition, and resets the host-side data toggle. + /// + /// Use this after receiving + /// [`TransferError::Stall`][crate::transfer::TransferError::Stall] to clear + /// the error and resume use of the endpoint. + /// + /// This should not be called when transfers are pending on the endpoint. + pub fn clear_halt(&mut self) -> impl MaybeFuture> { + self.backend.clear_halt() + } +} + +/// A transfer that has not yet been submitted. +/// +/// A request contains of a fixed-size buffer and other platform-specific +/// resources used to perform the transfer. +/// +/// Create a `Request` with [`Endpoint::allocate`], or turn a [`Completion`] +/// back into a `Request` with [`Completion::reuse`]. +pub struct Request { + transfer: platform::Transfer, + _phantom: PhantomData<(EpType, Dir)>, +} + +impl Request { + /// Get the allocated buffer length. + #[inline] + pub fn capacity(&self) -> usize { + self.transfer.buffer().len() + } + + /// Get the number of bytes requested by this transfer. + #[inline] + pub fn len(&self) -> usize { + self.transfer.request_len() + } + + /// Set the number of bytes requested by this transfer. + /// + /// ## Panics + /// * If `len` is greater than the buffer [capacity][`Self::capacity`]. + #[inline] + pub fn set_len(&mut self, len: usize) { + assert!(len <= self.capacity()); + unsafe { + self.transfer.set_request_len(len); + } + } +} + +impl Debug for Request { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Request") + .field( + "endpoint", + &format_args!("0x{:02X}", self.transfer.endpoint()), + ) + .field("len", &self.len()) + .finish() + } +} + +impl Request { + /// Get the number of initialized bytes which will be sent if the transfer is submitted. + #[inline] + pub fn len(&self) -> usize { + self.transfer.request_len() + } + + /// Get the allocated capacity of the buffer. + #[inline] + pub fn capacity(&self) -> usize { + self.transfer.buffer().len() + } + + /// Get the number of bytes that can be written to the buffer. + /// + /// This is a convenience method for `capacity() - len()`. + #[inline] + pub fn remaining_capacity(&self) -> usize { + self.capacity() - self.len() + } + + /// Immutable access to the full allocated buffer, which may be uninitialized. + #[inline] + pub fn buffer(&self) -> &[MaybeUninit] { + self.transfer.buffer() + } + + /// Mutable access to the full allocated buffer, which may be uninitialized. + #[inline] + pub fn buffer_mut(&mut self) -> &mut [MaybeUninit] { + self.transfer.buffer_mut() + } + + /// Set the transfer length, assuming that it has been manually initialized. + /// + /// ## Safety + /// * The buffer must be initialized up to `len`. + /// * `len` must be less than or equal to the buffer capacity. + #[inline] + pub unsafe fn set_len(&mut self, len: usize) { + self.transfer.set_request_len(len); + } + + /// Clear the data by setting the length to zero. + #[inline] + pub fn clear(&mut self) { + unsafe { + self.set_len(0); + } + } + + /// Append a slice of bytes to the transfer. + /// + /// ## Panics + /// * If the buffer capacity is exceeded (`len() + slice.len() > capacity()`). + #[inline] + pub fn extend_from_slice<'a>(&mut self, slice: &'a [u8]) { + unsafe { + let prev_len = self.len(); + let dest = self + .buffer_mut() + .get_mut(prev_len..prev_len + slice.len()) + .expect("capacity exceeded"); + write_copy_of_slice(dest, slice); + self.set_len(prev_len + slice.len()) + } + } +} + +impl Deref for Request { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { slice::from_raw_parts(self.buffer().as_ptr().cast::(), self.len()) } + } +} + +impl DerefMut for Request { + fn deref_mut(&mut self) -> &mut [u8] { + unsafe { + slice::from_raw_parts_mut(self.buffer_mut().as_mut_ptr().cast::(), self.len()) + } + } +} + +impl Debug for Request { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Request") + .field( + "endpoint", + &format_args!("0x{:02X}", self.transfer.endpoint()), + ) + .field("len", &self.len()) + .field("data", &&self[..]) + .finish() + } +} + +/// A completed transfer returned from [`Endpoint::next_complete`]. +/// +/// A transfer can partially complete even in the case of failure or +/// cancellation, thus the [`actual_len`][`Self::actual_len`] may be nonzero +/// even if the [`status`][`Self::status`] is an error. +/// +/// An `IN` transfer's received data is accessed by accessing the Completion +/// as a slice of bytes via `Deref`. +pub struct Completion { + transfer: platform::Transfer, + _phantom: PhantomData<(EpType, D)>, +} + +impl Completion { + /// Get the status of the transfer. + pub fn status(&self) -> Result<(), TransferError> { + self.transfer.status() + } + + /// Get the number of bytes transferred. + pub fn actual_len(&self) -> usize { + self.transfer.actual_len() + } + + /// Turn the transfer back into a `Request`, reusing the buffer. + /// + /// An `OUT` `Request`'s length is reset to zero so new data can be written to + /// the `Request`. An `IN` `Request`'s length is unchanged. + pub fn reuse(mut self) -> Request { + if Dir::DIR == Direction::In { + unsafe { + self.transfer.set_request_len(0); + } + } + Request { + transfer: self.transfer, + _phantom: PhantomData, + } + } +} + +impl From> + for Request +{ + fn from(value: Completion) -> Self { + value.reuse() + } +} + +impl Debug for Completion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Completion") + .field("status", &self.status()) + .field("len", &self.actual_len()) + .finish() + } +} + +impl Deref for Completion { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { slice::from_raw_parts(self.transfer.buffer().as_ptr().cast(), self.actual_len()) } + } +} + +impl Debug for Completion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Completion") + .field("status", &self.status()) + .field("data", &&self[..]) + .finish() + } } #[test] fn assert_send_sync() { + use crate::transfer::{Bulk, Interrupt}; + fn require_send_sync() {} require_send_sync::(); require_send_sync::(); + require_send_sync::>(); + require_send_sync::>(); + require_send_sync::>(); + require_send_sync::>(); + require_send_sync::>(); + require_send_sync::>(); + require_send_sync::>(); + require_send_sync::>(); } diff --git a/src/lib.rs b/src/lib.rs index bd2f10d..c04a092 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -139,10 +139,11 @@ mod platform; pub mod descriptors; mod enumeration; +mod util; pub use enumeration::{BusInfo, DeviceId, DeviceInfo, InterfaceInfo, Speed, UsbControllerType}; mod device; -pub use device::{Device, Interface}; +pub use device::{ClaimEndpointError, Completion, Device, Endpoint, Interface, Request}; pub mod transfer; @@ -151,6 +152,8 @@ pub mod hotplug; mod maybe_future; pub use maybe_future::MaybeFuture; +mod bitset; + /// OS error returned from operations other than transfers. pub type Error = io::Error; diff --git a/src/platform/linux_usbfs/device.rs b/src/platform/linux_usbfs/device.rs index e55d2fc..6379864 100644 --- a/src/platform/linux_usbfs/device.rs +++ b/src/platform/linux_usbfs/device.rs @@ -1,42 +1,56 @@ -use std::io::{ErrorKind, Seek}; -use std::sync::{Mutex, Weak}; -use std::{ffi::c_void, time::Duration}; use std::{ + collections::{BTreeMap, VecDeque}, + ffi::c_void, fs::File, - io::Read, + io::{ErrorKind, Read, Seek}, mem::ManuallyDrop, path::PathBuf, sync::{ atomic::{AtomicU8, Ordering}, - Arc, + Arc, Mutex, MutexGuard, Weak, }, + task::{Context, Poll}, + time::{Duration, Instant}, }; use log::{debug, error, warn}; -use rustix::event::epoll; -use rustix::fd::AsFd; use rustix::{ - fd::{AsRawFd, FromRawFd, OwnedFd}, - fs::{Mode, OFlags}, + event::epoll::EventFlags, + fd::{AsFd, AsRawFd, FromRawFd, OwnedFd}, + fs::{Mode, OFlags, Timespec}, io::Errno, + time::{timerfd_create, timerfd_settime, Itimerspec, TimerfdFlags, TimerfdTimerFlags}, }; use slab::Slab; use super::{ errno_to_transfer_error, events, usbfs::{self, Urb}, - SysfsPath, + SysfsPath, TransferData, }; -use crate::descriptors::{ConfigurationDescriptor, DeviceDescriptor}; +use crate::bitset::EndpointBitSet; +use crate::descriptors::{ + parse_concatenated_config_descriptors, ConfigurationDescriptor, DeviceDescriptor, + EndpointDescriptor, TransferType, DESCRIPTOR_LEN_DEVICE, +}; +use crate::device::ClaimEndpointError; use crate::maybe_future::{blocking::Blocking, MaybeFuture}; -use crate::transfer::{ControlType, Recipient}; -use crate::{ - descriptors::{parse_concatenated_config_descriptors, DESCRIPTOR_LEN_DEVICE}, - transfer::{ - notify_completion, Control, Direction, TransferError, TransferHandle, TransferType, +use crate::transfer::{ + internal::{ + notify_completion, take_completed_from_queue, Idle, Notify, Pending, TransferFuture, }, - DeviceInfo, Error, Speed, + request_type, ControlIn, ControlOut, ControlType, Direction, Recipient, }; +use crate::{DeviceInfo, Error, Speed}; + +#[derive(PartialEq, Eq, PartialOrd, Ord)] +struct TimeoutEntry { + deadline: Instant, + urb: *mut Urb, +} + +unsafe impl Send for TimeoutEntry {} +unsafe impl Sync for TimeoutEntry {} static DEVICES: Mutex>> = Mutex::new(Slab::new()); @@ -49,6 +63,9 @@ pub(crate) struct LinuxDevice { sysfs: Option, active_config: AtomicU8, + + timerfd: OwnedFd, + timeouts: Mutex>, } impl LinuxDevice { @@ -104,6 +121,12 @@ impl LinuxDevice { Self::get_config(&descriptors, &fd)? }; + let timerfd = timerfd_create( + rustix::time::TimerfdClockId::Monotonic, + TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK, + ) + .inspect_err(|e| log::error!("Failed to create timerfd: {e}"))?; + let arc = Arc::new_cyclic(|weak| { let events_id = DEVICES.lock().unwrap().insert(weak.clone()); LinuxDevice { @@ -112,6 +135,8 @@ impl LinuxDevice { descriptors, sysfs, active_config: AtomicU8::new(active_config), + timerfd, + timeouts: Mutex::new(BTreeMap::new()), } }); @@ -124,7 +149,13 @@ impl LinuxDevice { events::register_fd( arc.fd.as_fd(), events::Tag::Device(arc.events_id), - epoll::EventFlags::OUT, + EventFlags::OUT, + )?; + + events::register_fd( + arc.timerfd.as_fd(), + events::Tag::DeviceTimer(arc.events_id), + EventFlags::IN, )?; Ok(arc) @@ -140,18 +171,29 @@ impl LinuxDevice { fn handle_events(&self) { debug!("Handling events for device {}", self.events_id); match usbfs::reap_urb_ndelay(&self.fd) { - Ok(urb_ptr) => { - let user_data = { - let urb = unsafe { &*urb_ptr }; + Ok(urb) => { + let transfer_data: *mut TransferData = unsafe { &(*urb) }.usercontext.cast(); + + { + let transfer = unsafe { &*transfer_data }; + debug_assert!(transfer.urb_ptr() == urb); debug!( "URB {:?} for ep {:x} completed, status={} actual_length={}", - urb_ptr, urb.endpoint, urb.status, urb.actual_length + transfer.urb_ptr(), + transfer.urb().endpoint, + transfer.urb().status, + transfer.urb().actual_length ); - urb.usercontext + + if let Some(deadline) = transfer.deadline { + let mut timeouts = self.timeouts.lock().unwrap(); + timeouts.remove(&TimeoutEntry { deadline, urb }); + self.update_timeouts(timeouts, Instant::now()); + } }; - // SAFETY: pointer came from submit via kernel an we're now done with it - unsafe { notify_completion::(user_data) } + // SAFETY: pointer came from submit via kernel and we're now done with it + unsafe { notify_completion::(transfer_data) } } Err(Errno::AGAIN) => {} Err(Errno::NODEV) => { @@ -169,6 +211,71 @@ impl LinuxDevice { } } + pub(crate) fn handle_timer_epoll(id: usize) { + let device = DEVICES.lock().unwrap().get(id).and_then(|w| w.upgrade()); + if let Some(device) = device { + device.handle_timeouts(); + } + } + + fn handle_timeouts(&self) { + debug!("Handling timeouts for device {}", self.events_id); + let now = Instant::now(); + + rustix::io::read(self.timerfd.as_fd(), &mut [0u8; 8]).ok(); + + let mut timeouts = self.timeouts.lock().unwrap(); + while let Some(entry) = timeouts.first_entry() { + if entry.key().deadline > now { + break; + } + + let urb = entry.remove_entry().0.urb; + + unsafe { + match usbfs::discard_urb(&self.fd, urb) { + Ok(()) => debug!("Cancelled URB {urb:?} after timeout"), + Err(e) => debug!("Failed to cancel timed out URB {urb:?}: {e}"), + } + } + } + + self.update_timeouts(timeouts, now); + } + + fn update_timeouts(&self, timeouts: MutexGuard>, now: Instant) { + const TIMESPEC_ZERO: Timespec = Timespec { + tv_sec: 0, + tv_nsec: 0, + }; + + let next = if let Some((TimeoutEntry { deadline, .. }, _)) = timeouts.first_key_value() { + let duration = deadline + .checked_duration_since(now) + .unwrap_or(Duration::from_nanos(1)); + log::debug!("Next timeout in {duration:?}"); + Timespec { + tv_sec: duration.as_secs() as i64, + tv_nsec: duration.subsec_nanos() as i64, + } + } else { + TIMESPEC_ZERO + }; + + timerfd_settime( + self.timerfd.as_fd(), + TimerfdTimerFlags::empty(), + &Itimerspec { + it_interval: TIMESPEC_ZERO, + it_value: next, + }, + ) + .inspect_err(|e| { + log::error!("Failed to set timerfd: {e}"); + }) + .ok(); + } + pub(crate) fn device_descriptor(&self) -> DeviceDescriptor { DeviceDescriptor::new(&self.descriptors).unwrap() } @@ -212,75 +319,28 @@ impl LinuxDevice { }) } - /// SAFETY: `data` must be valid for `len` bytes to read or write, depending on `Direction` - unsafe fn control_blocking( + pub fn control_in( &self, - direction: Direction, - control: Control, - data: *mut u8, - len: usize, + data: ControlIn, timeout: Duration, - ) -> Result { - let r = usbfs::control( - &self.fd, - usbfs::CtrlTransfer { - bRequestType: control.request_type(direction), - bRequest: control.request, - wValue: control.value, - wIndex: control.index, - wLength: len.try_into().expect("length must fit in u16"), - timeout: timeout - .as_millis() - .try_into() - .expect("timeout must fit in u32 ms"), - data: data as *mut c_void, - }, - ); - - r.map_err(errno_to_transfer_error) + ) -> impl MaybeFuture, Error>> { + let t = TransferData::new_control_in(data); + TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(|t| { + t.status()?; + Ok(t.control_in_data().to_owned()) + }) } - pub fn control_in_blocking( + pub fn control_out( &self, - control: Control, - data: &mut [u8], + data: ControlOut, timeout: Duration, - ) -> Result { - unsafe { - self.control_blocking( - Direction::In, - control, - data.as_mut_ptr(), - data.len(), - timeout, - ) - } - } - - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - unsafe { - self.control_blocking( - Direction::Out, - control, - data.as_ptr() as *mut u8, - data.len(), - timeout, - ) - } - } - - pub(crate) fn make_control_transfer(self: &Arc) -> TransferHandle { - TransferHandle::new(super::TransferData::new( - self.clone(), - None, - 0, - TransferType::Control, - )) + ) -> impl MaybeFuture> { + let t = TransferData::new_control_out(data); + TransferFuture::new(t, |t| self.submit_timeout(t, timeout)).map(|t| { + t.status()?; + Ok(()) + }) } pub(crate) fn claim_interface( @@ -342,27 +402,59 @@ impl LinuxDevice { usbfs::attach_kernel_driver(&self.fd, interface_number).map_err(|e| e.into()) } - pub(crate) unsafe fn submit_urb(&self, urb: *mut Urb) { - let ep = unsafe { (*urb).endpoint }; - if let Err(e) = usbfs::submit_urb(&self.fd, urb) { - // SAFETY: Transfer was not submitted. We still own the transfer - // and can write to the URB and complete it in place of the handler. - unsafe { - let user_data = { - let u = &mut *urb; - debug!("Failed to submit URB {urb:?} on ep {ep:x}: {e} {u:?}"); - u.actual_length = 0; - u.status = e.raw_os_error(); - u.usercontext - }; - notify_completion::(user_data) + pub(crate) fn submit(&self, transfer: Idle) -> Pending { + let pending = transfer.pre_submit(); + let urb = pending.urb_ptr(); + + // SAFETY: We got the urb from `Idle`, which always points to + // a valid URB with valid buffers, which is not already pending + unsafe { + let ep = (*urb).endpoint; + (*urb).usercontext = pending.as_ptr().cast(); + if let Err(e) = usbfs::submit_urb(&self.fd, urb) { + // SAFETY: Transfer was not submitted. We still own the transfer + // and can write to the URB and complete it in place of the handler. + let u = &mut *urb; + debug!("Failed to submit URB {urb:?} on ep {ep:x}: {e} {u:?}"); + u.actual_length = 0; + u.status = e.raw_os_error(); + notify_completion::(pending.as_ptr().cast()); + } else { + debug!("Submitted URB {urb:?} on ep {ep:x}"); } - } else { - debug!("Submitted URB {urb:?} on ep {ep:x}"); - } + }; + + pending } - pub(crate) unsafe fn cancel_urb(&self, urb: *mut Urb) { + fn submit_timeout( + &self, + mut transfer: Idle, + timeout: Duration, + ) -> Pending { + let urb = transfer.urb_ptr(); + let now = Instant::now(); + let deadline = now + timeout; + transfer.deadline = Some(deadline); + + // Hold the lock across `submit`, so that it can't complete before we + // insert the timeout entry. + let mut timeouts = self.timeouts.lock().unwrap(); + + let r = self.submit(transfer); + + // This can only be false if submit failed, because we hold the timeouts lock + // and would block the completion handler. + if !r.is_complete() { + timeouts.insert(TimeoutEntry { deadline, urb }, ()); + self.update_timeouts(timeouts, now); + } + + r + } + + pub(crate) fn cancel(&self, transfer: &mut Pending) { + let urb = transfer.urb_ptr(); unsafe { if let Err(e) = usbfs::discard_urb(&self.fd, urb) { debug!("Failed to cancel URB {urb:?}: {e}"); @@ -374,22 +466,13 @@ impl LinuxDevice { const REQUEST_GET_CONFIGURATION: u8 = 0x08; let mut dst = [0u8; 1]; - - let control = Control { - control_type: ControlType::Standard, - recipient: Recipient::Device, - request: REQUEST_GET_CONFIGURATION, - value: 0, - index: 0, - }; - let r = usbfs::control( &fd, usbfs::CtrlTransfer { - bRequestType: control.request_type(Direction::In), - bRequest: control.request, - wValue: control.value, - wIndex: control.index, + bRequestType: request_type(Direction::In, ControlType::Standard, Recipient::Device), + bRequest: REQUEST_GET_CONFIGURATION, + wValue: 0, + wIndex: 0, wLength: dst.len() as u16, timeout: Duration::from_millis(50) .as_millis() @@ -460,6 +543,7 @@ impl Drop for LinuxDevice { fn drop(&mut self) { debug!("Closing device {}", self.events_id); events::unregister_fd(self.fd.as_fd()); + events::unregister_fd(self.timerfd.as_fd()); DEVICES.lock().unwrap().remove(self.events_id); } } @@ -473,39 +557,25 @@ pub(crate) struct LinuxInterface { #[derive(Default)] struct InterfaceState { + endpoints: EndpointBitSet, alt_setting: u8, } impl LinuxInterface { - pub(crate) fn make_transfer( - self: &Arc, - endpoint: u8, - ep_type: TransferType, - ) -> TransferHandle { - TransferHandle::new(super::TransferData::new( - self.device.clone(), - Some(self.clone()), - endpoint, - ep_type, - )) + pub fn control_in( + &self, + data: ControlIn, + timeout: Duration, + ) -> impl MaybeFuture, Error>> { + self.device.control_in(data, timeout) } - pub fn control_in_blocking( + pub fn control_out( &self, - control: Control, - data: &mut [u8], + data: ControlOut, timeout: Duration, - ) -> Result { - self.device.control_in_blocking(control, data, timeout) - } - - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - self.device.control_out_blocking(control, data, timeout) + ) -> impl MaybeFuture> { + self.device.control_out(data, timeout) } pub fn get_alt_setting(&self) -> u8 { @@ -518,6 +588,13 @@ impl LinuxInterface { ) -> impl MaybeFuture> { Blocking::new(move || { let mut state = self.state.lock().unwrap(); + if !state.endpoints.is_empty() { + // TODO: Use ErrorKind::ResourceBusy once compatible with MSRV + return Err(Error::new( + ErrorKind::Other, + "must drop endpoints before changing alt setting", + )); + } debug!( "Set interface {} alt setting to {alt_setting}", self.interface_number @@ -528,13 +605,30 @@ impl LinuxInterface { }) } - pub fn clear_halt( - self: Arc, - endpoint: u8, - ) -> impl MaybeFuture> { - Blocking::new(move || { - debug!("Clear halt, endpoint {endpoint:02x}"); - Ok(usbfs::clear_halt(&self.device.fd, endpoint)?) + pub fn endpoint( + self: &Arc, + descriptor: EndpointDescriptor, + ) -> Result { + let address = descriptor.address(); + let ep_type = descriptor.transfer_type(); + let max_packet_size = descriptor.max_packet_size(); + + let mut state = self.state.lock().unwrap(); + + if state.endpoints.is_set(address) { + return Err(ClaimEndpointError::Busy); + } + state.endpoints.set(address); + + Ok(LinuxEndpoint { + inner: Arc::new(EndpointInner { + address, + ep_type, + interface: self.clone(), + notify: Notify::new(), + }), + max_packet_size, + pending: VecDeque::new(), }) } } @@ -556,3 +650,90 @@ impl Drop for LinuxInterface { } } } + +pub(crate) struct LinuxEndpoint { + inner: Arc, + + pub(crate) max_packet_size: usize, + + /// A queue of pending transfers, expected to complete in order + pending: VecDeque>, +} + +struct EndpointInner { + interface: Arc, + address: u8, + ep_type: TransferType, + notify: Notify, +} + +impl LinuxEndpoint { + pub(crate) fn endpoint_address(&self) -> u8 { + self.inner.address + } + + pub(crate) fn pending(&self) -> usize { + self.pending.len() + } + + pub(crate) fn cancel_all(&mut self) { + // Cancel transfers in reverse order to ensure subsequent transfers + // can't complete out of order while we're going through them. + for transfer in self.pending.iter_mut().rev() { + self.inner.interface.device.cancel(transfer); + } + } + + pub(crate) fn make_transfer(&mut self, len: usize) -> Idle { + Idle::new( + self.inner.clone(), + super::TransferData::new(self.inner.address, self.inner.ep_type, len), + ) + } + + pub(crate) fn submit(&mut self, transfer: Idle) { + assert!( + transfer.notify_eq(&self.inner), + "transfer can only be submitted on the same endpoint" + ); + self.pending + .push_back(self.inner.interface.device.submit(transfer)); + } + + pub(crate) fn poll_next_complete(&mut self, cx: &mut Context) -> Poll> { + self.inner.notify.subscribe(cx); + if let Some(transfer) = take_completed_from_queue(&mut self.pending) { + Poll::Ready(transfer) + } else { + Poll::Pending + } + } + + pub(crate) fn clear_halt(&self) -> impl MaybeFuture> { + let inner = self.inner.clone(); + Blocking::new(move || { + let endpoint = inner.address; + debug!("Clear halt, endpoint {endpoint:02x}"); + Ok(usbfs::clear_halt(&inner.interface.device.fd, endpoint)?) + }) + } +} + +impl Drop for LinuxEndpoint { + fn drop(&mut self) { + self.cancel_all(); + } +} + +impl AsRef for EndpointInner { + fn as_ref(&self) -> &Notify { + &self.notify + } +} + +impl Drop for EndpointInner { + fn drop(&mut self) { + let mut state = self.interface.state.lock().unwrap(); + state.endpoints.clear(self.address); + } +} diff --git a/src/platform/linux_usbfs/events.rs b/src/platform/linux_usbfs/events.rs index a74e438..6c4ab4b 100644 --- a/src/platform/linux_usbfs/events.rs +++ b/src/platform/linux_usbfs/events.rs @@ -35,16 +35,19 @@ static EPOLL_FD: OnceCell = OnceCell::new(); pub(crate) enum Tag { Device(usize), + DeviceTimer(usize), Waker(usize), } impl Tag { const DEVICE: u64 = 1; + const DEVICE_TIMER: u64 = 2; const WAKER: u64 = 3; fn as_event_data(&self) -> EventData { let (tag, id) = match *self { Tag::Device(id) => (Self::DEVICE, id), + Tag::DeviceTimer(id) => (Self::DEVICE_TIMER, id), Tag::Waker(id) => (Self::WAKER, id), }; EventData::new_u64((id as u64) << 3 | tag) @@ -53,8 +56,9 @@ impl Tag { fn from_event_data(data: EventData) -> Self { let id = (data.u64() >> 3) as usize; let tag = data.u64() & 0b111; - match (tag, id as usize) { + match (tag, id) { (Self::DEVICE, id) => Tag::Device(id), + (Self::DEVICE_TIMER, id) => Tag::DeviceTimer(id), (Self::WAKER, id) => Tag::Waker(id), _ => panic!("Invalid event data"), } @@ -98,6 +102,7 @@ fn event_loop() { for event in events { match Tag::from_event_data(event.data) { Tag::Device(id) => Device::handle_usb_epoll(id), + Tag::DeviceTimer(id) => Device::handle_timer_epoll(id), Tag::Waker(id) => { if let Some(waker) = WAKERS.lock().unwrap().get(id) { waker.wake(); diff --git a/src/platform/linux_usbfs/mod.rs b/src/platform/linux_usbfs/mod.rs index 80566e6..30a28df 100644 --- a/src/platform/linux_usbfs/mod.rs +++ b/src/platform/linux_usbfs/mod.rs @@ -9,12 +9,14 @@ pub use enumeration::{list_buses, list_devices, SysfsPath}; mod device; pub(crate) use device::LinuxDevice as Device; +pub(crate) use device::LinuxEndpoint as Endpoint; pub(crate) use device::LinuxInterface as Interface; +pub(crate) type Transfer = Idle; mod hotplug; pub(crate) use hotplug::LinuxHotplugWatch as HotplugWatch; -use crate::transfer::TransferError; +use crate::transfer::{internal::Idle, TransferError}; #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] pub struct DeviceId { diff --git a/src/platform/linux_usbfs/transfer.rs b/src/platform/linux_usbfs/transfer.rs index 106f766..395bdfc 100644 --- a/src/platform/linux_usbfs/transfer.rs +++ b/src/platform/linux_usbfs/transfer.rs @@ -1,16 +1,16 @@ use std::{ - ffi::c_void, - mem::{self, ManuallyDrop}, - ptr::null_mut, - sync::Arc, + mem::{ManuallyDrop, MaybeUninit}, + ptr::{addr_of_mut, null_mut}, + slice, + time::Instant, }; use rustix::io::Errno; use crate::transfer::{ - Completion, ControlIn, ControlOut, PlatformSubmit, PlatformTransfer, RequestBuffer, - ResponseBuffer, TransferError, TransferType, SETUP_PACKET_SIZE, + internal::Pending, ControlIn, ControlOut, Direction, TransferError, SETUP_PACKET_SIZE, }; +use crate::{descriptors::TransferType, util::write_copy_of_slice}; use super::{ errno_to_transfer_error, @@ -24,27 +24,19 @@ use super::{ /// /// This logically contains a `Vec` with urb.buffer and capacity. /// It also owns the `urb` allocation itself, which is stored out-of-line -/// to avoid violating noalias when submitting the transfer while holding -/// `&mut TransferData`. +/// to enable isochronous transfers to allocate the variable-length +/// `iso_packet_desc` array. pub struct TransferData { urb: *mut Urb, capacity: usize, - device: Arc, - - /// Not directly used, exists just to keep the interface from being released - /// while active. - _interface: Option>, + pub(crate) deadline: Option, } unsafe impl Send for TransferData {} +unsafe impl Sync for TransferData {} impl TransferData { - pub(super) fn new( - device: Arc, - interface: Option>, - endpoint: u8, - ep_type: TransferType, - ) -> TransferData { + pub(super) fn new(endpoint: u8, ep_type: TransferType, capacity: usize) -> TransferData { let ep_type = match ep_type { TransferType::Control => USBDEVFS_URB_TYPE_CONTROL, TransferType::Interrupt => USBDEVFS_URB_TYPE_INTERRUPT, @@ -52,14 +44,21 @@ impl TransferData { TransferType::Isochronous => USBDEVFS_URB_TYPE_ISO, }; + let request_len: i32 = match Direction::from_address(endpoint) { + Direction::Out => 0, + Direction::In => capacity.try_into().unwrap(), + }; + + let mut v = ManuallyDrop::new(Vec::with_capacity(capacity)); + TransferData { urb: Box::into_raw(Box::new(Urb { ep_type, endpoint, status: 0, flags: 0, - buffer: null_mut(), - buffer_length: 0, + buffer: v.as_mut_ptr(), + buffer_length: request_len, actual_length: 0, start_frame: 0, number_of_packets_or_stream_id: 0, @@ -67,156 +66,127 @@ impl TransferData { signr: 0, usercontext: null_mut(), })), - capacity: 0, - device, - _interface: interface, + capacity: v.capacity(), + deadline: None, } } - fn urb_mut(&mut self) -> &mut Urb { - // SAFETY: if we have `&mut`, the transfer is not pending + pub(super) fn new_control_out(data: ControlOut) -> TransferData { + let len = SETUP_PACKET_SIZE + data.data.len(); + let mut t = TransferData::new(0x00, TransferType::Control, len); + + write_copy_of_slice( + &mut t.buffer_mut()[..SETUP_PACKET_SIZE], + &data.setup_packet(), + ); + write_copy_of_slice( + &mut t.buffer_mut()[SETUP_PACKET_SIZE..SETUP_PACKET_SIZE + data.data.len()], + &data.data, + ); + unsafe { + t.set_request_len(len); + } + + t + } + + pub(super) fn new_control_in(data: ControlIn) -> TransferData { + let len = SETUP_PACKET_SIZE + data.length as usize; + let mut t = TransferData::new(0x80, TransferType::Control, len); + write_copy_of_slice( + &mut t.buffer_mut()[..SETUP_PACKET_SIZE], + &data.setup_packet(), + ); + unsafe { + t.set_request_len(len); + } + t + } + + #[inline] + pub fn endpoint(&self) -> u8 { + unsafe { (*self.urb).endpoint } + } + + #[inline] + pub(super) fn urb(&self) -> &Urb { + unsafe { &*self.urb } + } + + #[inline] + pub(super) fn urb_mut(&mut self) -> &mut Urb { unsafe { &mut *self.urb } } - fn fill(&mut self, v: Vec, len: usize, user_data: *mut c_void) { - let mut v = ManuallyDrop::new(v); - let urb = self.urb_mut(); - urb.buffer = v.as_mut_ptr(); - urb.buffer_length = len.try_into().expect("buffer size should fit in i32"); - urb.usercontext = user_data; - urb.actual_length = 0; - self.capacity = v.capacity(); + #[inline] + pub(super) fn urb_ptr(&self) -> *mut Urb { + self.urb } - /// SAFETY: requires that the transfer has completed and `length` bytes are initialized - unsafe fn take_buf(&mut self, length: usize) -> Vec { - let urb = self.urb_mut(); - assert!(!urb.buffer.is_null()); - let ptr = mem::replace(&mut urb.buffer, null_mut()); - let capacity = mem::replace(&mut self.capacity, 0); - assert!(length <= capacity); - Vec::from_raw_parts(ptr, length, capacity) + #[inline] + pub fn buffer(&self) -> &[MaybeUninit] { + unsafe { slice::from_raw_parts(self.urb().buffer.cast(), self.capacity) } + } + + #[inline] + pub fn buffer_mut(&mut self) -> &mut [MaybeUninit] { + unsafe { slice::from_raw_parts_mut(self.urb().buffer.cast(), self.capacity) } + } + + #[inline] + pub fn request_len(&self) -> usize { + self.urb().buffer_length as usize + } + + #[inline] + pub unsafe fn set_request_len(&mut self, len: usize) { + assert!(len <= self.capacity); + self.urb_mut().buffer_length = len.try_into().unwrap(); + } + + #[inline] + pub fn actual_len(&self) -> usize { + self.urb().actual_length as usize + } + + #[inline] + pub fn status(&self) -> Result<(), TransferError> { + if self.urb().status == 0 { + return Ok(()); + } + + // It's sometimes positive, sometimes negative, but rustix panics if negative. + Err(errno_to_transfer_error(Errno::from_raw_os_error( + self.urb().status.abs(), + ))) + } + + #[inline] + pub fn control_in_data(&self) -> &[u8] { + debug_assert!(self.urb().endpoint == 0x80); + let urb = self.urb(); + unsafe { + slice::from_raw_parts( + urb.buffer.add(SETUP_PACKET_SIZE), + urb.actual_length as usize, + ) + } + } +} + +impl Pending { + pub fn urb_ptr(&self) -> *mut Urb { + // Get urb pointer without dereferencing as `TransferData`, because + // it may be mutably aliased. + unsafe { *addr_of_mut!((*self.as_ptr()).urb) } } } impl Drop for TransferData { fn drop(&mut self) { unsafe { - if !self.urb_mut().buffer.is_null() { - drop(Vec::from_raw_parts(self.urb_mut().buffer, 0, self.capacity)); - } + drop(Vec::from_raw_parts((*self.urb).buffer, 0, self.capacity)); drop(Box::from_raw(self.urb)); } } } - -impl PlatformTransfer for TransferData { - fn cancel(&self) { - unsafe { - self.device.cancel_urb(self.urb); - } - } -} - -impl PlatformSubmit> for TransferData { - unsafe fn submit(&mut self, data: Vec, user_data: *mut c_void) { - let ep = self.urb_mut().endpoint; - assert!(ep & 0x80 == 0); - let len = data.len(); - self.fill(data, len, user_data); - - // SAFETY: we just properly filled the buffer and it is not already pending - unsafe { self.device.submit_urb(self.urb) } - } - - unsafe fn take_completed(&mut self) -> Completion { - let status = urb_status(self.urb_mut()); - let len = self.urb_mut().actual_length as usize; - - // SAFETY: self is completed (precondition) - let data = ResponseBuffer::from_vec(self.take_buf(0), len); - Completion { data, status } - } -} - -impl PlatformSubmit for TransferData { - unsafe fn submit(&mut self, data: RequestBuffer, user_data: *mut c_void) { - let ep = self.urb_mut().endpoint; - let ty = self.urb_mut().ep_type; - assert!(ep & 0x80 == 0x80); - assert!(ty == USBDEVFS_URB_TYPE_BULK || ty == USBDEVFS_URB_TYPE_INTERRUPT); - - let (data, len) = data.into_vec(); - self.fill(data, len, user_data); - - // SAFETY: we just properly filled the buffer and it is not already pending - unsafe { self.device.submit_urb(self.urb) } - } - - unsafe fn take_completed(&mut self) -> Completion> { - let status = urb_status(self.urb_mut()); - let len = self.urb_mut().actual_length as usize; - - // SAFETY: self is completed (precondition) and `actual_length` bytes were initialized. - let data = unsafe { self.take_buf(len) }; - Completion { data, status } - } -} - -impl PlatformSubmit for TransferData { - unsafe fn submit(&mut self, data: ControlIn, user_data: *mut c_void) { - let buf_len = SETUP_PACKET_SIZE + data.length as usize; - let mut buf = Vec::with_capacity(buf_len); - buf.extend_from_slice(&data.setup_packet()); - self.fill(buf, buf_len, user_data); - - // SAFETY: we just properly filled the buffer and it is not already pending - unsafe { self.device.submit_urb(self.urb) } - } - - unsafe fn take_completed(&mut self) -> Completion> { - let status = urb_status(self.urb_mut()); - let len = self.urb_mut().actual_length as usize; - - // SAFETY: transfer is completed (precondition) and `actual_length` - // bytes were initialized with setup buf in front - let mut data = unsafe { self.take_buf(SETUP_PACKET_SIZE + len) }; - data.splice(0..SETUP_PACKET_SIZE, []); - Completion { data, status } - } -} - -impl PlatformSubmit> for TransferData { - unsafe fn submit(&mut self, data: ControlOut, user_data: *mut c_void) { - let buf_len = SETUP_PACKET_SIZE + data.data.len(); - let mut buf = Vec::with_capacity(buf_len); - buf.extend_from_slice( - &data - .setup_packet() - .expect("data length should fit in setup packet's u16"), - ); - buf.extend_from_slice(data.data); - self.fill(buf, buf_len, user_data); - - // SAFETY: we just properly filled the buffer and it is not already pending - unsafe { self.device.submit_urb(self.urb) } - } - - unsafe fn take_completed(&mut self) -> Completion { - let status = urb_status(self.urb_mut()); - let len = self.urb_mut().actual_length as usize; - let data = ResponseBuffer::from_vec(self.take_buf(0), len); - Completion { data, status } - } -} - -fn urb_status(urb: &Urb) -> Result<(), TransferError> { - if urb.status == 0 { - return Ok(()); - } - - // It's sometimes positive, sometimes negative, but rustix panics if negative. - Err(errno_to_transfer_error(Errno::from_raw_os_error( - urb.status.abs(), - ))) -} diff --git a/src/platform/macos_iokit/device.rs b/src/platform/macos_iokit/device.rs index 67c5a74..4af43dd 100644 --- a/src/platform/macos_iokit/device.rs +++ b/src/platform/macos_iokit/device.rs @@ -1,20 +1,30 @@ use std::{ - collections::BTreeMap, + collections::VecDeque, ffi::c_void, io::ErrorKind, sync::{ atomic::{AtomicU8, AtomicUsize, Ordering}, Arc, Mutex, }, + task::{Context, Poll}, time::Duration, }; +use io_kit_sys::ret::{kIOReturnSuccess, IOReturn}; use log::{debug, error}; use crate::{ - descriptors::{ConfigurationDescriptor, DeviceDescriptor}, + bitset::EndpointBitSet, + descriptors::{ConfigurationDescriptor, DeviceDescriptor, EndpointDescriptor}, + device::ClaimEndpointError, maybe_future::blocking::Blocking, - transfer::{Control, Direction, TransferError, TransferHandle, TransferType}, + transfer::{ + internal::{ + notify_completion, take_completed_from_queue, Idle, Notify, Pending, TransferFuture, + }, + ControlIn, ControlOut, Direction, + }, + util::write_copy_of_slice, DeviceInfo, Error, MaybeFuture, Speed, }; @@ -23,8 +33,8 @@ use super::{ events::{add_event_source, EventRegistration}, iokit::{call_iokit_function, check_iokit_return}, iokit_c::IOUSBDevRequestTO, - iokit_usb::{EndpointInfo, IoKitDevice, IoKitInterface}, - status_to_transfer_result, + iokit_usb::{IoKitDevice, IoKitInterface}, + TransferData, }; pub(crate) struct MacDevice { @@ -168,71 +178,6 @@ impl MacDevice { }) } - /// SAFETY: `data` must be valid for `len` bytes to read or write, depending on `Direction` - unsafe fn control_blocking( - &self, - direction: Direction, - control: Control, - data: *mut u8, - len: usize, - timeout: Duration, - ) -> Result { - let timeout_ms = timeout.as_millis().min(u32::MAX as u128) as u32; - let mut req = IOUSBDevRequestTO { - bmRequestType: control.request_type(direction), - bRequest: control.request, - wValue: control.value, - wIndex: control.index, - wLength: len.try_into().expect("length must fit in u16"), - pData: data.cast::(), - wLenDone: 0, - noDataTimeout: timeout_ms, - completionTimeout: timeout_ms, - }; - - let r = unsafe { call_iokit_function!(self.device.raw, DeviceRequestTO(&mut req)) }; - - status_to_transfer_result(r).map(|()| req.wLenDone as usize) - } - - pub fn control_in_blocking( - &self, - control: Control, - data: &mut [u8], - timeout: Duration, - ) -> Result { - unsafe { - self.control_blocking( - Direction::In, - control, - data.as_mut_ptr(), - data.len(), - timeout, - ) - } - } - - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - unsafe { - self.control_blocking( - Direction::Out, - control, - data.as_ptr() as *mut u8, - data.len(), - timeout, - ) - } - } - - pub(crate) fn make_control_transfer(self: &Arc) -> TransferHandle { - TransferHandle::new(super::TransferData::new_control(self.clone())) - } - pub(crate) fn claim_interface( self: Arc, interface_number: u8, @@ -257,17 +202,12 @@ impl MacDevice { let _event_registration = add_event_source(interface.create_async_event_source()?); interface.open()?; - - let endpoints = interface.endpoints()?; - debug!("Found endpoints: {endpoints:?}"); - self.claimed_interfaces.fetch_add(1, Ordering::Acquire); Ok(Arc::new(MacInterface { device: self.clone(), interface_number, interface, - endpoints: Mutex::new(endpoints), state: Mutex::new(InterfaceState::default()), _event_registration, })) @@ -280,6 +220,92 @@ impl MacDevice { ) -> impl MaybeFuture, Error>> { self.claim_interface(interface) } + + pub fn control_in( + self: &Arc, + data: ControlIn, + timeout: Duration, + ) -> impl MaybeFuture, Error>> { + let timeout = timeout.as_millis().try_into().expect("timeout too long"); + let t = TransferData::new(0x80, data.length as usize); + + let req = IOUSBDevRequestTO { + bmRequestType: data.request_type(), + bRequest: data.request, + wValue: data.value, + wIndex: data.index, + wLength: data.length, + pData: t.buf as *mut c_void, + wLenDone: 0, + completionTimeout: timeout, + noDataTimeout: timeout, + }; + + TransferFuture::new(t, |t| self.submit_control(t, req)).map(|mut t| { + t.status()?; + Ok(unsafe { t.take_vec() }) + }) + } + + pub fn control_out( + self: &Arc, + data: ControlOut, + timeout: Duration, + ) -> impl MaybeFuture> { + let timeout = timeout.as_millis().try_into().expect("timeout too long"); + let mut t = TransferData::new(0, data.data.len()); + write_copy_of_slice(t.buffer_mut(), &data.data); + + let req = IOUSBDevRequestTO { + bmRequestType: data.request_type(), + bRequest: data.request, + wValue: data.value, + wIndex: data.index, + wLength: u16::try_from(data.data.len()).expect("request too long"), + pData: t.buf as *mut c_void, + wLenDone: 0, + completionTimeout: timeout, + noDataTimeout: timeout, + }; + + TransferFuture::new(t, |t| self.submit_control(t, req)).map(|t| { + t.status()?; + Ok(()) + }) + } + + fn submit_control( + &self, + mut t: Idle, + mut req: IOUSBDevRequestTO, + ) -> Pending { + t.actual_len = 0; + let dir = Direction::from_address(t.endpoint_addr); + assert!(req.pData == t.buf.cast()); + + let t = t.pre_submit(); + let ptr = t.as_ptr(); + + let res = unsafe { + call_iokit_function!( + self.device.raw, + DeviceRequestAsyncTO(&mut req, transfer_callback, ptr as *mut c_void) + ) + }; + + if res == kIOReturnSuccess { + debug!("Submitted control {dir:?} {ptr:?}"); + } else { + error!("Failed to submit control {dir:?} {ptr:?}: {res:x}"); + unsafe { + // Complete the transfer in the place of the callback + (*ptr).status = res; + notify_completion::(ptr); + } + } + + t + } } impl Drop for MacDevice { @@ -298,71 +324,31 @@ pub(crate) struct MacInterface { _event_registration: EventRegistration, pub(crate) interface: IoKitInterface, pub(crate) device: Arc, - /// Map from address to a structure that contains the `pipe_ref` used by iokit - pub(crate) endpoints: Mutex>, state: Mutex, } #[derive(Default)] struct InterfaceState { alt_setting: u8, + endpoints_used: EndpointBitSet, } impl MacInterface { - pub(crate) fn make_transfer( - self: &Arc, - endpoint: u8, - ep_type: TransferType, - ) -> TransferHandle { - if ep_type == TransferType::Control { - assert!(endpoint == 0); - TransferHandle::new(super::TransferData::new_control(self.device.clone())) - } else { - let endpoints = self.endpoints.lock().unwrap(); - - // This function can't fail, so if the endpoint is not found, use an invalid - // pipe_ref that will fail when submitting the transfer. - let pipe_ref = endpoints.get(&endpoint).map(|e| e.pipe_ref).unwrap_or(0); - - TransferHandle::new(super::TransferData::new( - self.device.clone(), - self.clone(), - endpoint, - pipe_ref, - )) - } - } - - pub fn control_in_blocking( - &self, - control: Control, - data: &mut [u8], - timeout: Duration, - ) -> Result { - self.device.control_in_blocking(control, data, timeout) - } - - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - self.device.control_out_blocking(control, data, timeout) - } - pub fn set_alt_setting( self: Arc, alt_setting: u8, ) -> impl MaybeFuture> { Blocking::new(move || { let mut state = self.state.lock().unwrap(); - debug!( - "Set interface {} alt setting to {alt_setting}", - self.interface_number - ); - let mut endpoints = self.endpoints.lock().unwrap(); + if !state.endpoints_used.is_empty() { + // TODO: Use ErrorKind::ResourceBusy once compatible with MSRV + + return Err(Error::new( + ErrorKind::Other, + "must drop endpoints before changing alt setting", + )); + } unsafe { check_iokit_return(call_iokit_function!( @@ -371,8 +357,11 @@ impl MacInterface { ))?; } - *endpoints = self.interface.endpoints()?; - debug!("Found endpoints: {endpoints:?}"); + debug!( + "Set interface {} alt setting to {alt_setting}", + self.interface_number + ); + state.alt_setting = alt_setting; Ok(()) @@ -383,27 +372,50 @@ impl MacInterface { self.state.lock().unwrap().alt_setting } - pub fn clear_halt( - self: Arc, - endpoint: u8, + pub fn control_in( + self: &Arc, + data: ControlIn, + timeout: Duration, + ) -> impl MaybeFuture, Error>> { + self.device.control_in(data, timeout) + } + + pub fn control_out( + self: &Arc, + data: ControlOut, + timeout: Duration, ) -> impl MaybeFuture> { - Blocking::new(move || { - debug!("Clear halt, endpoint {endpoint:02x}"); + self.device.control_out(data, timeout) + } - let pipe_ref = { - let endpoints = self.endpoints.lock().unwrap(); - let ep = endpoints - .get(&endpoint) - .ok_or_else(|| Error::new(ErrorKind::NotFound, "Endpoint not found"))?; - ep.pipe_ref - }; + pub fn endpoint( + self: &Arc, + descriptor: EndpointDescriptor, + ) -> Result { + let address = descriptor.address(); + let max_packet_size = descriptor.max_packet_size(); - unsafe { - check_iokit_return(call_iokit_function!( - self.interface.raw, - ClearPipeStallBothEnds(pipe_ref) - )) - } + let mut state = self.state.lock().unwrap(); + + let Some(pipe_ref) = self.interface.find_pipe_ref(address) else { + debug!("Endpoint {address:02X} not found in iokit"); + return Err(ClaimEndpointError::InvalidAddress); + }; + + if state.endpoints_used.is_set(address) { + return Err(ClaimEndpointError::Busy); + } + state.endpoints_used.set(address); + + Ok(MacEndpoint { + inner: Arc::new(EndpointInner { + pipe_ref, + address, + interface: self.clone(), + notify: Notify::new(), + }), + max_packet_size, + pending: VecDeque::new(), }) } } @@ -418,3 +430,156 @@ impl Drop for MacInterface { .fetch_sub(1, Ordering::Release); } } + +pub(crate) struct MacEndpoint { + inner: Arc, + pub(crate) max_packet_size: usize, + + /// A queue of pending transfers, expected to complete in order + pending: VecDeque>, +} + +struct EndpointInner { + interface: Arc, + pipe_ref: u8, + address: u8, + notify: Notify, +} + +impl MacEndpoint { + pub(crate) fn endpoint_address(&self) -> u8 { + self.inner.address + } + + pub(crate) fn pending(&self) -> usize { + self.pending.len() + } + + pub(crate) fn cancel_all(&mut self) { + let r = unsafe { + call_iokit_function!( + self.inner.interface.interface.raw, + AbortPipe(self.inner.pipe_ref) + ) + }; + debug!( + "Cancelled all transfers on endpoint {ep:02x}. status={r:x}", + ep = self.inner.address + ); + } + + pub(crate) fn make_transfer(&mut self, len: usize) -> Idle { + Idle::new( + self.inner.clone(), + TransferData::new(self.inner.address, len), + ) + } + + pub(crate) fn submit(&mut self, mut t: Idle) { + assert!( + t.notify_eq(&self.inner), + "transfer can only be submitted on the same endpoint" + ); + let endpoint = t.endpoint_addr; + let dir = Direction::from_address(endpoint); + let pipe_ref = self.inner.pipe_ref; + let len = t.request_len; + let buf = t.buf; + t.actual_len = 0; + + let t = t.pre_submit(); + let ptr = t.as_ptr(); + + let res = unsafe { + match dir { + Direction::Out => call_iokit_function!( + self.inner.interface.interface.raw, + WritePipeAsync( + pipe_ref, + buf as *mut c_void, + u32::try_from(len).expect("request too large"), + transfer_callback, + ptr as *mut c_void + ) + ), + Direction::In => call_iokit_function!( + self.inner.interface.interface.raw, + ReadPipeAsync( + pipe_ref, + buf as *mut c_void, + u32::try_from(len).expect("request too large"), + transfer_callback, + ptr as *mut c_void + ) + ), + } + }; + + if res == kIOReturnSuccess { + debug!("Submitted {dir:?} transfer {ptr:?} on endpoint {endpoint:02X}, {len} bytes"); + } else { + error!("Failed to submit transfer {ptr:?} on endpoint {endpoint:02X}: {res:x}"); + unsafe { + // Complete the transfer in the place of the callback + (*ptr).status = res; + notify_completion::(ptr); + } + } + + self.pending.push_back(t); + } + + pub(crate) fn poll_next_complete(&mut self, cx: &mut Context) -> Poll> { + self.inner.notify.subscribe(cx); + if let Some(transfer) = take_completed_from_queue(&mut self.pending) { + Poll::Ready(transfer) + } else { + Poll::Pending + } + } + + pub(crate) fn clear_halt(&mut self) -> impl MaybeFuture> { + let inner = self.inner.clone(); + Blocking::new(move || { + debug!("Clear halt, endpoint {:02x}", inner.address); + + unsafe { + check_iokit_return(call_iokit_function!( + inner.interface.interface.raw, + ClearPipeStallBothEnds(inner.pipe_ref) + )) + } + }) + } +} + +impl Drop for MacEndpoint { + fn drop(&mut self) { + self.cancel_all(); + } +} + +impl AsRef for EndpointInner { + fn as_ref(&self) -> &Notify { + &self.notify + } +} + +impl Drop for EndpointInner { + fn drop(&mut self) { + let mut state = self.interface.state.lock().unwrap(); + state.endpoints_used.clear(self.address); + } +} + +extern "C" fn transfer_callback(refcon: *mut c_void, result: IOReturn, len: *mut c_void) { + let len = len as usize; + let transfer: *mut TransferData = refcon.cast(); + debug!("Completion for transfer {transfer:?}, status={result:x}, len={len}"); + + unsafe { + (*transfer).actual_len = len; + (*transfer).status = result; + notify_completion::(transfer) + } +} diff --git a/src/platform/macos_iokit/iokit_usb.rs b/src/platform/macos_iokit/iokit_usb.rs index 68758dd..a51da9e 100644 --- a/src/platform/macos_iokit/iokit_usb.rs +++ b/src/platform/macos_iokit/iokit_usb.rs @@ -3,7 +3,7 @@ //! Based on Kate Temkin's [usrs](https://github.com/ktemkin/usrs) //! licensed under MIT OR Apache-2.0. -use std::{collections::BTreeMap, io::ErrorKind, ptr, slice, time::Duration}; +use std::{io::ErrorKind, ptr, slice, time::Duration}; use core_foundation::{base::TCFType, runloop::CFRunLoopSource}; use core_foundation_sys::runloop::CFRunLoopSourceRef; @@ -191,30 +191,6 @@ impl Drop for IoKitDevice { unsafe impl Send for IoKitDevice {} unsafe impl Sync for IoKitDevice {} -#[derive(Debug)] -#[allow(dead_code)] -pub(crate) struct EndpointInfo { - pub(crate) pipe_ref: u8, - pub(crate) direction: u8, - pub(crate) number: u8, - pub(crate) transfer_type: u8, - pub(crate) max_packet_size: u16, - pub(crate) interval: u8, - pub(crate) max_burst: u8, - pub(crate) mult: u8, - pub(crate) bytes_per_interval: u16, -} - -impl EndpointInfo { - pub(crate) fn address(&self) -> u8 { - if self.direction == 0 { - self.number - } else { - self.number | 0x80 - } - } -} - /// Wrapper around an IOKit UsbInterface pub(crate) struct IoKitInterface { pub(crate) raw: *mut *mut iokit::UsbInterface, @@ -287,11 +263,10 @@ impl IoKitInterface { } } - pub(crate) fn endpoints(&self) -> Result, Error> { + pub(crate) fn find_pipe_ref(&self, endpoint_addr: u8) -> Option { unsafe { - let mut endpoints = BTreeMap::new(); let mut count = 0; - check_iokit_return(call_iokit_function!(self.raw, GetNumEndpoints(&mut count)))?; + check_iokit_return(call_iokit_function!(self.raw, GetNumEndpoints(&mut count))).ok()?; // Pipe references are 1-indexed for pipe_ref in 1..=count { @@ -300,40 +275,26 @@ impl IoKitInterface { let mut transfer_type: u8 = 0; let mut max_packet_size: u16 = 0; let mut interval: u8 = 0; - let mut max_burst: u8 = 0; - let mut mult: u8 = 0; - let mut bytes_per_interval: u16 = 0; - check_iokit_return(call_iokit_function!( + let Ok(()) = check_iokit_return(call_iokit_function!( self.raw, - GetPipePropertiesV2( + GetPipeProperties( pipe_ref, &mut direction, &mut number, &mut transfer_type, &mut max_packet_size, - &mut interval, - &mut max_burst, - &mut mult, - &mut bytes_per_interval + &mut interval ) - ))?; - - let endpoint = EndpointInfo { - pipe_ref, - direction, - number, - transfer_type, - max_packet_size, - interval, - max_burst, - mult, - bytes_per_interval, + )) else { + continue; }; - endpoints.insert(endpoint.address(), endpoint); + if number | (((direction != 0) as u8) << 7) == endpoint_addr { + return Some(pipe_ref); + } } - Ok(endpoints) + None } } } diff --git a/src/platform/macos_iokit/mod.rs b/src/platform/macos_iokit/mod.rs index 317f8be..1b4edba 100644 --- a/src/platform/macos_iokit/mod.rs +++ b/src/platform/macos_iokit/mod.rs @@ -1,3 +1,5 @@ +use crate::transfer::{internal::Idle, TransferError}; + mod transfer; use io_kit_sys::ret::IOReturn; pub(crate) use transfer::TransferData; @@ -8,13 +10,13 @@ pub use enumeration::{list_buses, list_devices}; mod device; pub(crate) use device::MacDevice as Device; +pub(crate) use device::MacEndpoint as Endpoint; pub(crate) use device::MacInterface as Interface; +pub(crate) type Transfer = Idle; mod hotplug; pub(crate) use hotplug::MacHotplugWatch as HotplugWatch; -use crate::transfer::TransferError; - mod iokit; mod iokit_c; mod iokit_usb; diff --git a/src/platform/macos_iokit/transfer.rs b/src/platform/macos_iokit/transfer.rs index c8c5ec0..580637f 100644 --- a/src/platform/macos_iokit/transfer.rs +++ b/src/platform/macos_iokit/transfer.rs @@ -1,316 +1,94 @@ use std::{ - ffi::c_void, - mem::{self, ManuallyDrop}, - ptr::null_mut, - sync::Arc, + mem::{ManuallyDrop, MaybeUninit}, + slice, }; use io_kit_sys::ret::{kIOReturnSuccess, IOReturn}; -use log::{error, info}; -use crate::{ - platform::macos_iokit::iokit_c::IOUSBDevRequest, - transfer::{ - notify_completion, Completion, ControlIn, ControlOut, PlatformSubmit, PlatformTransfer, - RequestBuffer, ResponseBuffer, TransferError, - }, -}; +use crate::transfer::{Direction, TransferError}; -use super::{iokit::call_iokit_function, status_to_transfer_result}; - -extern "C" fn transfer_callback(refcon: *mut c_void, result: IOReturn, len: *mut c_void) { - info!( - "Completion callback for transfer {refcon:?}, status={result:x}, len={len}", - len = len as usize - ); - - unsafe { - let callback_data = { - let inner = &mut *(refcon as *mut TransferDataInner); - inner.actual_len = len as usize; - inner.status = result; - inner.callback_data - }; - notify_completion::(callback_data) - } -} +use super::status_to_transfer_result; pub struct TransferData { - endpoint_addr: u8, - pipe_ref: u8, - buf: *mut u8, - capacity: usize, - inner: *mut TransferDataInner, - device: Arc, - interface: Option>, + pub(super) endpoint_addr: u8, + pub(super) buf: *mut u8, + pub(super) capacity: usize, + pub(super) request_len: usize, + pub(super) actual_len: usize, + pub(super) status: IOReturn, } impl Drop for TransferData { fn drop(&mut self) { - if !self.buf.is_null() { - unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity)) } - } - unsafe { drop(Box::from_raw(self.inner)) } + unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity)) } } } -/// Bring the data accessed on the transfer callback out-of-line -/// so that we can have a reference to it while the callback may -/// write to other fields concurrently. This could be included -/// in `TransferData`` with the proposed [`UnsafePinned`](https://github.com/rust-lang/rfcs/pull/3467) -pub struct TransferDataInner { - actual_len: usize, - callback_data: *mut c_void, - status: IOReturn, -} - impl TransferData { - pub(super) fn new( - device: Arc, - interface: Arc, - endpoint_addr: u8, - pipe_ref: u8, - ) -> TransferData { + pub(super) fn new(endpoint_addr: u8, capacity: usize) -> TransferData { + let request_len = match Direction::from_address(endpoint_addr) { + Direction::Out => 0, + Direction::In => capacity, + }; + + let mut v = ManuallyDrop::new(Vec::with_capacity(capacity)); + TransferData { endpoint_addr, - pipe_ref, - buf: null_mut(), - capacity: 0, - inner: Box::into_raw(Box::new(TransferDataInner { - actual_len: 0, - callback_data: null_mut(), - status: kIOReturnSuccess, - })), - device, - interface: Some(interface), + buf: v.as_mut_ptr(), + capacity: v.capacity(), + actual_len: 0, + request_len, + status: kIOReturnSuccess, } } - pub(super) fn new_control(device: Arc) -> TransferData { - TransferData { - endpoint_addr: 0, - pipe_ref: 0, - buf: null_mut(), - capacity: 0, - inner: Box::into_raw(Box::new(TransferDataInner { - actual_len: 0, - callback_data: null_mut(), - status: kIOReturnSuccess, - })), - device, - interface: None, - } + #[inline] + pub fn endpoint(&self) -> u8 { + self.endpoint_addr } - /// SAFETY: Requires that the transfer is not active - unsafe fn fill(&mut self, buf: Vec, callback_data: *mut c_void) { - let mut buf = ManuallyDrop::new(buf); - self.buf = buf.as_mut_ptr(); - self.capacity = buf.capacity(); - - let inner = &mut *self.inner; - inner.actual_len = 0; - inner.status = kIOReturnSuccess; - inner.callback_data = callback_data; + #[inline] + pub fn buffer(&self) -> &[MaybeUninit] { + unsafe { slice::from_raw_parts(self.buf.cast(), self.capacity) } } - /// SAFETY: requires that the transfer has completed and `length` bytes are initialized - unsafe fn take_buf(&mut self, length: usize) -> Vec { - assert!(!self.buf.is_null()); - let ptr = mem::replace(&mut self.buf, null_mut()); - let capacity = mem::replace(&mut self.capacity, 0); - assert!(length <= capacity); - Vec::from_raw_parts(ptr, length, capacity) + #[inline] + pub fn buffer_mut(&mut self) -> &mut [MaybeUninit] { + unsafe { slice::from_raw_parts_mut(self.buf.cast(), self.capacity) } } - /// SAFETY: requires that the transfer is not active, but is fully prepared (as it is when submitting the transfer fails) - unsafe fn check_submit_result(&mut self, res: IOReturn) { - if res != kIOReturnSuccess { - error!( - "Failed to submit transfer on endpoint {ep}: {res:x}", - ep = self.endpoint_addr - ); - let callback_data = { - let inner = &mut *self.inner; - inner.status = res; - inner.callback_data - }; - - // Complete the transfer in the place of the callback - notify_completion::(callback_data) - } + #[inline] + pub fn request_len(&self) -> usize { + self.request_len as usize } - /// SAFETY: requires that the transfer is in a completed state - unsafe fn take_status(&mut self) -> (Result<(), TransferError>, usize) { - let inner = unsafe { &*self.inner }; + #[inline] + pub unsafe fn set_request_len(&mut self, len: usize) { + assert!(len <= self.capacity); + self.request_len = len; + } - (status_to_transfer_result(inner.status), inner.actual_len) + #[inline] + pub fn actual_len(&self) -> usize { + self.actual_len as usize + } + + #[inline] + pub fn status(&self) -> Result<(), TransferError> { + status_to_transfer_result(self.status) + } + + /// Safety: Must be an IN transfer and must have completed to initialize the buffer + pub unsafe fn take_vec(&mut self) -> Vec { + let mut n = ManuallyDrop::new(Vec::new()); + let v = unsafe { Vec::from_raw_parts(self.buf, self.actual_len as usize, self.capacity) }; + self.capacity = n.capacity(); + self.buf = n.as_mut_ptr(); + self.actual_len = 0; + v } } unsafe impl Send for TransferData {} - -impl PlatformTransfer for TransferData { - fn cancel(&self) { - if let Some(intf) = self.interface.as_ref() { - let r = unsafe { call_iokit_function!(intf.interface.raw, AbortPipe(self.pipe_ref)) }; - info!( - "Cancelled all transfers on endpoint {ep:02x}. status={r:x}", - ep = self.endpoint_addr - ); - } else { - assert!(self.pipe_ref == 0); - let r = - unsafe { call_iokit_function!(self.device.device.raw, USBDeviceAbortPipeZero()) }; - info!("Cancelled all transfers on control pipe. status={r:x}"); - } - } -} - -impl PlatformSubmit> for TransferData { - unsafe fn submit(&mut self, data: Vec, callback_data: *mut std::ffi::c_void) { - assert!(self.endpoint_addr & 0x80 == 0); - let len = data.len(); - self.fill(data, callback_data); - - // SAFETY: we just properly filled the buffer and it is not already pending - let res = call_iokit_function!( - self.interface.as_ref().unwrap().interface.raw, - WritePipeAsync( - self.pipe_ref, - self.buf as *mut c_void, - u32::try_from(len).expect("request too large"), - transfer_callback, - self.inner as *mut c_void - ) - ); - info!( - "Submitted OUT transfer {inner:?} on endpoint {ep:02x}", - inner = self.inner, - ep = self.endpoint_addr - ); - self.check_submit_result(res); - } - - unsafe fn take_completed(&mut self) -> crate::transfer::Completion { - let (status, actual_len) = self.take_status(); - - // SAFETY: self is completed (precondition) and `actual_length` bytes were initialized. - let data = ResponseBuffer::from_vec(unsafe { self.take_buf(0) }, actual_len); - Completion { data, status } - } -} - -impl PlatformSubmit for TransferData { - unsafe fn submit(&mut self, data: RequestBuffer, callback_data: *mut std::ffi::c_void) { - assert!(self.endpoint_addr & 0x80 == 0x80); - - let (data, len) = data.into_vec(); - self.fill(data, callback_data); - - // SAFETY: we just properly filled the buffer and it is not already pending - let res = call_iokit_function!( - self.interface.as_ref().unwrap().interface.raw, - ReadPipeAsync( - self.pipe_ref, - self.buf as *mut c_void, - u32::try_from(len).expect("request too large"), - transfer_callback, - self.inner as *mut c_void - ) - ); - info!( - "Submitted IN transfer {inner:?} on endpoint {ep:02x}", - inner = self.inner, - ep = self.endpoint_addr - ); - - self.check_submit_result(res); - } - - unsafe fn take_completed(&mut self) -> crate::transfer::Completion> { - let (status, actual_len) = self.take_status(); - - // SAFETY: self is completed (precondition) and `actual_length` bytes were initialized. - let data = unsafe { self.take_buf(actual_len) }; - Completion { data, status } - } -} - -impl PlatformSubmit for TransferData { - unsafe fn submit(&mut self, data: ControlIn, callback_data: *mut std::ffi::c_void) { - assert!(self.pipe_ref == 0); - - let buf = Vec::with_capacity(data.length as usize); - self.fill(buf, callback_data); - - let mut req = IOUSBDevRequest { - bmRequestType: data.request_type(), - bRequest: data.request, - wValue: data.value, - wIndex: data.index, - wLength: data.length, - pData: self.buf as *mut c_void, - wLenDone: 0, - }; - - // SAFETY: we just properly filled the buffer and it is not already pending - let res = call_iokit_function!( - self.device.device.raw, - DeviceRequestAsync(&mut req, transfer_callback, self.inner as *mut c_void) - ); - info!( - "Submitted Control IN transfer {inner:?}", - inner = self.inner - ); - self.check_submit_result(res); - } - - unsafe fn take_completed(&mut self) -> crate::transfer::Completion> { - let (status, actual_len) = self.take_status(); - - // SAFETY: self is completed (precondition) and `actual_length` bytes were initialized. - let data = unsafe { self.take_buf(actual_len) }; - Completion { data, status } - } -} - -impl PlatformSubmit> for TransferData { - unsafe fn submit(&mut self, data: ControlOut<'_>, callback_data: *mut std::ffi::c_void) { - assert!(self.pipe_ref == 0); - - let buf = data.data.to_vec(); - let len = buf.len(); - self.fill(buf, callback_data); - - let mut req = IOUSBDevRequest { - bmRequestType: data.request_type(), - bRequest: data.request, - wValue: data.value, - wIndex: data.index, - wLength: u16::try_from(len).expect("request too long"), - pData: self.buf as *mut c_void, - wLenDone: 0, - }; - - // SAFETY: we just properly filled the buffer and it is not already pending - let res = call_iokit_function!( - self.device.device.raw, - DeviceRequestAsync(&mut req, transfer_callback, self.inner as *mut c_void) - ); - info!( - "Submitted Control OUT transfer {inner:?}", - inner = self.inner - ); - self.check_submit_result(res); - } - - unsafe fn take_completed(&mut self) -> crate::transfer::Completion { - let (status, actual_len) = self.take_status(); - - // SAFETY: self is completed (precondition) and `actual_length` bytes were initialized. - let data = ResponseBuffer::from_vec(unsafe { self.take_buf(0) }, actual_len); - Completion { data, status } - } -} +unsafe impl Sync for TransferData {} diff --git a/src/platform/windows_winusb/device.rs b/src/platform/windows_winusb/device.rs index 15e466e..475d3b9 100644 --- a/src/platform/windows_winusb/device.rs +++ b/src/platform/windows_winusb/device.rs @@ -1,5 +1,5 @@ use std::{ - collections::{btree_map::Entry, BTreeMap}, + collections::{btree_map::Entry, BTreeMap, VecDeque}, ffi::c_void, io::{self, ErrorKind}, mem::{size_of_val, transmute}, @@ -7,28 +7,38 @@ use std::{ io::{AsRawHandle, RawHandle}, prelude::OwnedHandle, }, - ptr, + ptr::{self, null_mut}, sync::{Arc, Mutex}, + task::{Context, Poll}, time::Duration, }; -use log::{debug, error, info, warn}; +use log::{debug, error, warn}; use windows_sys::Win32::{ Devices::Usb::{ WinUsb_ControlTransfer, WinUsb_Free, WinUsb_GetAssociatedInterface, WinUsb_Initialize, - WinUsb_ResetPipe, WinUsb_SetCurrentAlternateSetting, WinUsb_SetPipePolicy, - PIPE_TRANSFER_TIMEOUT, WINUSB_INTERFACE_HANDLE, WINUSB_SETUP_PACKET, + WinUsb_ReadPipe, WinUsb_ResetPipe, WinUsb_SetCurrentAlternateSetting, WinUsb_WritePipe, + WINUSB_INTERFACE_HANDLE, WINUSB_SETUP_PACKET, }, - Foundation::{GetLastError, FALSE, TRUE}, + Foundation::{GetLastError, ERROR_IO_PENDING, ERROR_NOT_FOUND, FALSE, HANDLE, TRUE}, + System::IO::{CancelIoEx, OVERLAPPED}, }; use crate::{ + bitset::EndpointBitSet, descriptors::{ - ConfigurationDescriptor, DeviceDescriptor, DESCRIPTOR_LEN_DEVICE, + ConfigurationDescriptor, DeviceDescriptor, EndpointDescriptor, DESCRIPTOR_LEN_DEVICE, DESCRIPTOR_TYPE_CONFIGURATION, }, + device::ClaimEndpointError, maybe_future::{blocking::Blocking, Ready}, - transfer::{Control, Direction, Recipient, TransferError, TransferHandle, TransferType}, + transfer::{ + internal::{ + notify_completion, take_completed_from_queue, Idle, Notify, Pending, TransferFuture, + }, + ControlIn, ControlOut, Direction, Recipient, + }, + util::write_copy_of_slice, DeviceInfo, Error, MaybeFuture, Speed, }; @@ -37,6 +47,7 @@ use super::{ find_usbccgp_child, get_driver_name, get_usbccgp_winusb_device_path, get_winusb_device_path, }, hub::HubPort, + transfer::TransferData, util::{create_file, raw_handle, WCStr}, DevInst, }; @@ -127,12 +138,18 @@ impl WindowsDevice { } pub(crate) fn get_descriptor( - &self, + self: Arc, desc_type: u8, desc_index: u8, language_id: u16, - ) -> Result, Error> { - HubPort::by_child_devinst(self.devinst)?.get_descriptor(desc_type, desc_index, language_id) + ) -> impl MaybeFuture, Error>> { + Blocking::new(move || { + HubPort::by_child_devinst(self.devinst)?.get_descriptor( + desc_type, + desc_index, + language_id, + ) + }) } pub(crate) fn reset(&self) -> impl MaybeFuture> { @@ -342,6 +359,7 @@ pub(crate) struct WindowsInterface { #[derive(Default)] struct InterfaceState { alt_setting: u8, + endpoints: EndpointBitSet, } unsafe impl Send for WindowsInterface {} @@ -384,123 +402,55 @@ impl Drop for WindowsInterface { } impl WindowsInterface { - pub(crate) fn make_transfer( + pub fn control_in( self: &Arc, - endpoint: u8, - ep_type: TransferType, - ) -> TransferHandle { - TransferHandle::new(super::TransferData::new(self.clone(), endpoint, ep_type)) - } - - /// SAFETY: `data` must be valid for `len` bytes to read or write, depending on `Direction` - unsafe fn control_blocking( - &self, - direction: Direction, - control: Control, - data: *mut u8, - len: usize, + data: ControlIn, timeout: Duration, - ) -> Result { - info!("Blocking control {direction:?}, {len} bytes"); - - if control.recipient == Recipient::Interface && control.index as u8 != self.interface_number - { + ) -> impl MaybeFuture, Error>> { + if data.recipient == Recipient::Interface && data.index as u8 != self.interface_number { warn!("WinUSB sends interface number instead of passed `index` when performing a control transfer with `Recipient::Interface`"); } - let timeout_ms = timeout.as_millis().min(u32::MAX as u128) as u32; - let r = WinUsb_SetPipePolicy( - self.winusb_handle, - 0, - PIPE_TRANSFER_TIMEOUT, - size_of_val(&timeout_ms) as u32, - &timeout_ms as *const u32 as *const c_void, - ); - - if r != TRUE { - error!( - "WinUsb_SetPipePolicy PIPE_TRANSFER_TIMEOUT failed: {}", - io::Error::last_os_error() - ); - } + let t = TransferData::new(0x80, data.length as usize); let pkt = WINUSB_SETUP_PACKET { - RequestType: control.request_type(direction), - Request: control.request, - Value: control.value, - Index: control.index, - Length: len.try_into().expect("request size too large"), + RequestType: data.request_type(), + Request: data.request, + Value: data.value, + Index: data.index, + Length: data.length, }; - let mut actual_len = 0; - - let r = WinUsb_ControlTransfer( - self.winusb_handle, - pkt, - data, - len.try_into().expect("request size too large"), - &mut actual_len, - ptr::null_mut(), - ); - - if r == TRUE { - Ok(actual_len as usize) - } else { - error!( - "WinUsb_ControlTransfer failed: {}", - io::Error::last_os_error() - ); - Err(super::transfer::map_error(GetLastError())) - } + TransferFuture::new(t, |t| self.submit_control(t, pkt)).map(|mut t| { + t.status()?; + Ok(unsafe { t.take_vec() }) + }) } - pub fn control_in_blocking( - &self, - control: Control, - data: &mut [u8], + pub fn control_out( + self: &Arc, + data: ControlOut, timeout: Duration, - ) -> Result { - unsafe { - self.control_blocking( - Direction::In, - control, - data.as_mut_ptr(), - data.len(), - timeout, - ) + ) -> impl MaybeFuture> { + if data.recipient == Recipient::Interface && data.index as u8 != self.interface_number { + warn!("WinUSB sends interface number instead of passed `index` when performing a control transfer with `Recipient::Interface`"); } - } - pub fn control_out_blocking( - &self, - control: Control, - data: &[u8], - timeout: Duration, - ) -> Result { - // When passed a pointer to read-only memory (e.g. a constant slice), - // WinUSB fails with "Invalid access to memory location. (os error 998)". - // I assume the kernel is checking the pointer for write access - // regardless of the transfer direction. Copy the data to the stack to ensure - // we give it a pointer to writable memory. - let mut buf = [0; 4096]; - let Some(buf) = buf.get_mut(..data.len()) else { - error!( - "Control transfer length {} exceeds limit of 4096", - data.len() - ); - return Err(TransferError::Unknown); + let mut t = TransferData::new(0, data.data.len()); + write_copy_of_slice(t.buffer_mut(), &data.data); + + let pkt = WINUSB_SETUP_PACKET { + RequestType: data.request_type(), + Request: data.request, + Value: data.value, + Index: data.index, + Length: data.data.len().try_into().expect("transfer too large"), }; - buf.copy_from_slice(data); - unsafe { - self.control_blocking( - Direction::Out, - control, - buf.as_mut_ptr(), - buf.len(), - timeout, - ) - } + TransferFuture::new(t, |t| self.submit_control(t, pkt)).map(|t| { + t.status()?; + Ok(()) + }) } pub fn set_alt_setting( @@ -509,6 +459,13 @@ impl WindowsInterface { ) -> impl MaybeFuture> { Blocking::new(move || unsafe { let mut state = self.state.lock().unwrap(); + if !state.endpoints.is_empty() { + // TODO: Use ErrorKind::ResourceBusy once compatible with MSRV + return Err(Error::new( + ErrorKind::Other, + "must drop endpoints before changing alt setting", + )); + } let r = WinUsb_SetCurrentAlternateSetting(self.winusb_handle, alt_setting.into()); if r == TRUE { debug!( @@ -527,14 +484,201 @@ impl WindowsInterface { self.state.lock().unwrap().alt_setting } - pub fn clear_halt( - self: Arc, - endpoint: u8, - ) -> impl MaybeFuture> { + pub fn endpoint( + self: &Arc, + descriptor: EndpointDescriptor, + ) -> Result { + let address = descriptor.address(); + let max_packet_size = descriptor.max_packet_size(); + + let mut state = self.state.lock().unwrap(); + + if state.endpoints.is_set(address) { + return Err(ClaimEndpointError::Busy); + } + state.endpoints.set(address); + + Ok(WindowsEndpoint { + inner: Arc::new(EndpointInner { + address, + interface: self.clone(), + notify: Notify::new(), + }), + max_packet_size, + pending: VecDeque::new(), + }) + } + + fn submit(&self, mut t: Idle) -> Pending { + let endpoint = t.endpoint; + let dir = Direction::from_address(endpoint); + let len = t.request_len; + let buf = t.buf; + t.overlapped.InternalHigh = 0; + + let t = t.pre_submit(); + let ptr = t.as_ptr(); + + debug!("Submit transfer {ptr:?} on endpoint {endpoint:02X} for {len} bytes {dir:?}"); + + let r = unsafe { + match dir { + Direction::Out => WinUsb_WritePipe( + self.winusb_handle, + endpoint, + buf, + len.try_into().expect("transfer size should fit in u32"), + null_mut(), + ptr as *mut OVERLAPPED, + ), + Direction::In => WinUsb_ReadPipe( + self.winusb_handle, + endpoint, + buf, + len.try_into().expect("transfer size should fit in u32"), + null_mut(), + ptr as *mut OVERLAPPED, + ), + } + }; + + self.post_submit(r, t) + } + + fn submit_control( + &self, + mut t: Idle, + pkt: WINUSB_SETUP_PACKET, + ) -> Pending { + let endpoint = t.endpoint; + let dir = Direction::from_address(endpoint); + let len = t.request_len; + let buf = t.buf; + t.overlapped.InternalHigh = 0; + + let t = t.pre_submit(); + let ptr = t.as_ptr(); + + debug!("Submit control {dir:?} transfer {ptr:?} for {len} bytes"); + + let r = unsafe { + WinUsb_ControlTransfer( + self.winusb_handle, + pkt, + buf, + len, + null_mut(), + ptr as *mut OVERLAPPED, + ) + }; + + self.post_submit(r, t) + } + + fn post_submit(&self, r: i32, t: Pending) -> Pending { + if r == TRUE { + error!("Transfer submit completed synchronously") + } + + let err = unsafe { GetLastError() }; + + if err != ERROR_IO_PENDING { + error!("submit failed: {}", io::Error::from_raw_os_error(err as _)); + + // Safety: Transfer was not submitted, so we still own it + // and must complete it in place of the event thread. + unsafe { + (&mut *t.as_ptr()).overlapped.Internal = err as _; + notify_completion::(t.as_ptr()); + } + } + + t + } + + fn cancel(&self, t: &mut Pending) { + debug!("Cancelling transfer {:?}", t.as_ptr()); + unsafe { + let r = CancelIoEx(self.handle as HANDLE, t.as_ptr() as *mut OVERLAPPED); + if r == 0 { + let err = GetLastError(); + if err != ERROR_NOT_FOUND { + error!( + "CancelIoEx failed: {}", + io::Error::from_raw_os_error(err as i32) + ); + } + } + } + } +} + +pub(crate) struct WindowsEndpoint { + inner: Arc, + + pub(crate) max_packet_size: usize, + + /// A queue of pending transfers, expected to complete in order + pending: VecDeque>, +} + +struct EndpointInner { + interface: Arc, + address: u8, + notify: Notify, +} + +impl WindowsEndpoint { + pub(crate) fn endpoint_address(&self) -> u8 { + self.inner.address + } + + pub(crate) fn pending(&self) -> usize { + self.pending.len() + } + + pub(crate) fn cancel_all(&mut self) { + // Cancel transfers in reverse order to ensure subsequent transfers + // can't complete out of order while we're going through them. + for transfer in self.pending.iter_mut().rev() { + self.inner.interface.cancel(transfer); + } + } + + pub(crate) fn make_transfer(&mut self, len: usize) -> Idle { + let t = Idle::new( + self.inner.clone(), + TransferData::new(self.inner.address, len), + ); + + t + } + + pub(crate) fn submit(&mut self, transfer: Idle) { + assert!( + transfer.notify_eq(&self.inner), + "transfer can only be submitted on the same endpoint" + ); + let transfer = self.inner.interface.submit(transfer); + self.pending.push_back(transfer); + } + + pub(crate) fn poll_next_complete(&mut self, cx: &mut Context) -> Poll> { + self.inner.notify.subscribe(cx); + if let Some(transfer) = take_completed_from_queue(&mut self.pending) { + Poll::Ready(transfer) + } else { + Poll::Pending + } + } + + pub(crate) fn clear_halt(&mut self) -> impl MaybeFuture> { + let inner = self.inner.clone(); Blocking::new(move || { + let endpoint = inner.address; debug!("Clear halt, endpoint {endpoint:02x}"); unsafe { - let r = WinUsb_ResetPipe(self.winusb_handle, endpoint); + let r = WinUsb_ResetPipe(inner.interface.winusb_handle, endpoint); if r == TRUE { Ok(()) } else { @@ -544,3 +688,22 @@ impl WindowsInterface { }) } } + +impl Drop for WindowsEndpoint { + fn drop(&mut self) { + self.cancel_all(); + } +} + +impl AsRef for EndpointInner { + fn as_ref(&self) -> &Notify { + &self.notify + } +} + +impl Drop for EndpointInner { + fn drop(&mut self) { + let mut state = self.interface.state.lock().unwrap(); + state.endpoints.clear(self.address); + } +} diff --git a/src/platform/windows_winusb/mod.rs b/src/platform/windows_winusb/mod.rs index d3ee002..aa8217c 100644 --- a/src/platform/windows_winusb/mod.rs +++ b/src/platform/windows_winusb/mod.rs @@ -5,10 +5,11 @@ mod events; mod device; pub(crate) use device::WindowsDevice as Device; +pub(crate) use device::WindowsEndpoint as Endpoint; pub(crate) use device::WindowsInterface as Interface; +pub(crate) type Transfer = Idle; mod transfer; -pub(crate) use transfer::TransferData; mod cfgmgr32; mod hub; @@ -18,3 +19,5 @@ pub(crate) use DevInst as DeviceId; mod hotplug; mod util; pub(crate) use hotplug::WindowsHotplugWatch as HotplugWatch; + +use crate::transfer::internal::Idle; diff --git a/src/platform/windows_winusb/transfer.rs b/src/platform/windows_winusb/transfer.rs index cf3f5f7..01fc941 100644 --- a/src/platform/windows_winusb/transfer.rs +++ b/src/platform/windows_winusb/transfer.rs @@ -1,347 +1,126 @@ use std::{ - ffi::c_void, - io, - mem::{self, ManuallyDrop}, - ptr::{addr_of_mut, null_mut}, - sync::Arc, + mem::{self, ManuallyDrop, MaybeUninit}, + slice, }; -use log::{debug, error, warn}; +use log::debug; use windows_sys::Win32::{ - Devices::Usb::{ - WinUsb_ControlTransfer, WinUsb_GetOverlappedResult, WinUsb_ReadPipe, WinUsb_WritePipe, - WINUSB_SETUP_PACKET, - }, Foundation::{ - GetLastError, ERROR_DEVICE_NOT_CONNECTED, ERROR_FILE_NOT_FOUND, ERROR_GEN_FAILURE, - ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_NO_SUCH_DEVICE, ERROR_OPERATION_ABORTED, - ERROR_REQUEST_ABORTED, ERROR_SEM_TIMEOUT, ERROR_TIMEOUT, FALSE, HANDLE, TRUE, WIN32_ERROR, + ERROR_DEVICE_NOT_CONNECTED, ERROR_FILE_NOT_FOUND, ERROR_GEN_FAILURE, ERROR_NO_SUCH_DEVICE, + ERROR_OPERATION_ABORTED, ERROR_REQUEST_ABORTED, ERROR_SEM_TIMEOUT, ERROR_SUCCESS, + ERROR_TIMEOUT, WIN32_ERROR, }, - System::IO::{CancelIoEx, OVERLAPPED}, + System::IO::OVERLAPPED, }; -use crate::transfer::{ - notify_completion, Completion, ControlIn, ControlOut, PlatformSubmit, PlatformTransfer, - Recipient, RequestBuffer, ResponseBuffer, TransferError, TransferType, -}; +use crate::transfer::{internal::notify_completion, Direction, TransferError}; #[repr(C)] -pub(crate) struct EventNotify { - // first member of repr(C) struct; can cast pointer between types - overlapped: OVERLAPPED, - ptr: *mut c_void, -} - pub struct TransferData { - interface: Arc, - event: *mut EventNotify, - buf: *mut u8, - capacity: usize, - endpoint: u8, - ep_type: TransferType, - submit_error: Option, + // first member of repr(C) struct; can cast pointer between types + // overlapped.Internal contains the stauts + // overlapped.InternalHigh contains the number of bytes transferred + pub(crate) overlapped: OVERLAPPED, + pub(crate) buf: *mut u8, + pub(crate) capacity: usize, + pub(crate) request_len: u32, + pub(crate) endpoint: u8, } unsafe impl Send for TransferData {} +unsafe impl Sync for TransferData {} impl TransferData { - pub(crate) fn new( - interface: std::sync::Arc, - endpoint: u8, - ep_type: TransferType, - ) -> TransferData { - TransferData { - interface, - event: Box::into_raw(Box::new(unsafe { mem::zeroed() })), - buf: null_mut(), - capacity: 0, - endpoint, - ep_type, - submit_error: None, - } - } - - /// SAFETY: requires that the transfer has completed and `length` bytes are initialized - unsafe fn take_buf(&mut self, length: usize) -> Vec { - let v = Vec::from_raw_parts(self.buf, length, self.capacity); - self.buf = null_mut(); - self.capacity = 0; - v - } - - /// SAFETY: user_data must be the callback pointer passed to `submit` - unsafe fn post_submit(&mut self, r: i32, func: &str, user_data: *mut c_void) { - if r == TRUE { - error!("{func} completed synchronously") - } - - let err = GetLastError(); - - if err != ERROR_IO_PENDING { - self.submit_error = Some(err); - error!("{func} failed: {}", io::Error::from_raw_os_error(err as _)); - - // Safety: Transfer was not submitted, so we still own it - // and must complete it in place of the event thread. - notify_completion::(user_data); - } else { - self.submit_error = None; - } - } - - /// SAFETY: transfer must be completed - unsafe fn get_status(&mut self) -> (usize, Result<(), TransferError>) { - if let Some(err) = self.submit_error { - debug!( - "Transfer {:?} on endpoint {:02x} failed on submit: {}", - self.event, self.endpoint, err - ); - return (0, Err(map_error(err))); - } - - let mut actual_len = 0; - let r = WinUsb_GetOverlappedResult( - self.interface.winusb_handle, - self.event as *mut OVERLAPPED, - &mut actual_len, - FALSE, - ); - - let status = if r != 0 { - debug!( - "Transfer {:?} on endpoint {:02x} complete: {} bytes transferred", - self.event, self.endpoint, actual_len - ); - Ok(()) - } else { - let err = GetLastError(); - debug!( - "Transfer {:?} on endpoint {:02x} failed: {}, {} bytes transferred", - self.event, self.endpoint, err, actual_len - ); - Err(map_error(err)) + pub(crate) fn new(endpoint: u8, capacity: usize) -> TransferData { + let request_len = match Direction::from_address(endpoint) { + Direction::Out => 0, + Direction::In => capacity.try_into().expect("transfer size must fit in u32"), }; - (actual_len as usize, status) + let mut v = ManuallyDrop::new(Vec::with_capacity(capacity)); + + TransferData { + overlapped: unsafe { mem::zeroed() }, + buf: v.as_mut_ptr(), + capacity: v.capacity(), + request_len, + endpoint, + } + } + + #[inline] + pub fn endpoint(&self) -> u8 { + self.endpoint + } + + #[inline] + pub fn buffer(&self) -> &[MaybeUninit] { + unsafe { slice::from_raw_parts(self.buf.cast(), self.capacity) } + } + + #[inline] + pub fn buffer_mut(&mut self) -> &mut [MaybeUninit] { + unsafe { slice::from_raw_parts_mut(self.buf.cast(), self.capacity) } + } + + #[inline] + pub fn request_len(&self) -> usize { + self.request_len as usize + } + + #[inline] + pub unsafe fn set_request_len(&mut self, len: usize) { + assert!(len <= self.capacity); + self.request_len = len.try_into().expect("transfer size must fit in u32"); + } + + #[inline] + pub fn actual_len(&self) -> usize { + self.overlapped.InternalHigh + } + + pub fn status(&self) -> Result<(), TransferError> { + match self.overlapped.Internal as WIN32_ERROR { + ERROR_SUCCESS => Ok(()), + ERROR_GEN_FAILURE => Err(TransferError::Stall), + ERROR_REQUEST_ABORTED | ERROR_TIMEOUT | ERROR_SEM_TIMEOUT | ERROR_OPERATION_ABORTED => { + Err(TransferError::Cancelled) + } + ERROR_FILE_NOT_FOUND | ERROR_DEVICE_NOT_CONNECTED | ERROR_NO_SUCH_DEVICE => { + Err(TransferError::Disconnected) + } + _ => Err(TransferError::Unknown), + } + } + + /// Safety: Must be an IN transfer and must have completed to initialize the buffer + pub unsafe fn take_vec(&mut self) -> Vec { + let mut n = ManuallyDrop::new(Vec::new()); + let v = unsafe { Vec::from_raw_parts(self.buf, self.actual_len(), self.capacity) }; + self.capacity = n.capacity(); + self.buf = n.as_mut_ptr(); + self.overlapped.InternalHigh = 0; + v } } impl Drop for TransferData { fn drop(&mut self) { - if !self.buf.is_null() { - unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity)) } - } - unsafe { drop(Box::from_raw(self.event)) } - } -} - -impl PlatformTransfer for TransferData { - fn cancel(&self) { - debug!("Cancelling transfer {:?}", self.event); - unsafe { - let r = CancelIoEx( - self.interface.handle as HANDLE, - self.event as *mut OVERLAPPED, - ); - if r == 0 { - let err = GetLastError(); - if err != ERROR_NOT_FOUND { - error!( - "CancelIoEx failed: {}", - io::Error::from_raw_os_error(err as i32) - ); - } - } - } - } -} - -impl PlatformSubmit> for TransferData { - unsafe fn submit(&mut self, data: Vec, user_data: *mut c_void) { - addr_of_mut!((*self.event).ptr).write(user_data); - - let mut data = ManuallyDrop::new(data); - self.buf = data.as_mut_ptr(); - self.capacity = data.capacity(); - let len = data.len(); - - debug!( - "Submit transfer {:?} on endpoint {:02X} for {} bytes OUT", - self.event, self.endpoint, len - ); - - let r = WinUsb_WritePipe( - self.interface.winusb_handle, - self.endpoint, - self.buf, - len.try_into().expect("transfer size should fit in u32"), - null_mut(), - self.event as *mut OVERLAPPED, - ); - self.post_submit(r, "WinUsb_WritePipe", user_data); - } - - unsafe fn take_completed(&mut self) -> Completion { - let (actual_len, status) = self.get_status(); - let data = ResponseBuffer::from_vec(self.take_buf(0), actual_len); - Completion { data, status } - } -} - -impl PlatformSubmit for TransferData { - unsafe fn submit(&mut self, data: RequestBuffer, user_data: *mut c_void) { - addr_of_mut!((*self.event).ptr).write(user_data); - - let (buf, request_len) = data.into_vec(); - let mut buf = ManuallyDrop::new(buf); - self.buf = buf.as_mut_ptr(); - self.capacity = buf.capacity(); - - debug!( - "Submit transfer {:?} on endpoint {:02X} for {} bytes IN", - self.event, self.endpoint, request_len - ); - - let r = WinUsb_ReadPipe( - self.interface.winusb_handle, - self.endpoint, - self.buf, - request_len - .try_into() - .expect("transfer size should fit in u32"), - null_mut(), - self.event as *mut OVERLAPPED, - ); - self.post_submit(r, "WinUsb_ReadPipe", user_data); - } - - unsafe fn take_completed(&mut self) -> Completion> { - let (actual_len, status) = self.get_status(); - let data = self.take_buf(actual_len); - Completion { data, status } - } -} - -impl PlatformSubmit for TransferData { - unsafe fn submit(&mut self, data: ControlIn, user_data: *mut c_void) { - assert_eq!(self.endpoint, 0); - assert_eq!(self.ep_type, TransferType::Control); - - if data.recipient == Recipient::Interface - && data.index as u8 != self.interface.interface_number - { - warn!("WinUSB sends interface number instead of passed `index` when performing a control transfer with `Recipient::Interface`"); - } - - addr_of_mut!((*self.event).ptr).write(user_data); - - let mut buf = ManuallyDrop::new(Vec::with_capacity(data.length as usize)); - self.buf = buf.as_mut_ptr(); - self.capacity = buf.capacity(); - - debug!( - "Submit transfer {:?} on endpoint {:02X} for {} bytes ControlIN", - self.event, self.endpoint, data.length - ); - - let pkt = WINUSB_SETUP_PACKET { - RequestType: data.request_type(), - Request: data.request, - Value: data.value, - Index: data.index, - Length: data.length, - }; - - let r = WinUsb_ControlTransfer( - self.interface.winusb_handle, - pkt, - self.buf, - data.length as u32, - null_mut(), - self.event as *mut OVERLAPPED, - ); - - self.post_submit(r, "WinUsb_ControlTransfer", user_data); - } - - unsafe fn take_completed(&mut self) -> Completion> { - let (actual_len, status) = self.get_status(); - let data = self.take_buf(actual_len); - Completion { data, status } - } -} - -impl PlatformSubmit> for TransferData { - unsafe fn submit(&mut self, data: ControlOut, user_data: *mut c_void) { - assert_eq!(self.endpoint, 0); - assert_eq!(self.ep_type, TransferType::Control); - - if data.recipient == Recipient::Interface - && data.index as u8 != self.interface.interface_number - { - warn!("WinUSB sends interface number instead of passed `index` when performing a control transfer with `Recipient::Interface`"); - } - - addr_of_mut!((*self.event).ptr).write(user_data); - - let mut buf = ManuallyDrop::new(data.data.to_vec()); - self.buf = buf.as_mut_ptr(); - self.capacity = buf.capacity(); - let len: u16 = buf - .len() - .try_into() - .expect("transfer size should fit in u16"); - - debug!( - "Submit transfer {:?} on endpoint {:02X} for {} bytes ControlOUT", - self.event, self.endpoint, len - ); - - let pkt = WINUSB_SETUP_PACKET { - RequestType: data.request_type(), - Request: data.request, - Value: data.value, - Index: data.index, - Length: len as u16, - }; - - let r = WinUsb_ControlTransfer( - self.interface.winusb_handle, - pkt, - self.buf, - len as u32, - null_mut(), - self.event as *mut OVERLAPPED, - ); - - self.post_submit(r, "WinUsb_ControlTransfer", user_data); - } - - unsafe fn take_completed(&mut self) -> Completion { - let (actual_len, status) = self.get_status(); - let data = ResponseBuffer::from_vec(self.take_buf(0), actual_len); - Completion { data, status } + unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity)) } } } pub(super) fn handle_event(completion: *mut OVERLAPPED) { - let completion = completion as *mut EventNotify; - debug!("Handling completion for transfer {completion:?}"); - unsafe { - let p = addr_of_mut!((*completion).ptr).read(); - notify_completion::(p) - } -} + let t = completion as *mut TransferData; + { + let transfer = unsafe { &mut *t }; -pub(crate) fn map_error(err: WIN32_ERROR) -> TransferError { - match err { - ERROR_GEN_FAILURE => TransferError::Stall, - ERROR_REQUEST_ABORTED | ERROR_TIMEOUT | ERROR_SEM_TIMEOUT | ERROR_OPERATION_ABORTED => { - TransferError::Cancelled - } - ERROR_FILE_NOT_FOUND | ERROR_DEVICE_NOT_CONNECTED | ERROR_NO_SUCH_DEVICE => { - TransferError::Disconnected - } - _ => TransferError::Unknown, + debug!( + "Transfer {t:?} on endpoint {:02x} complete: status {}, {} bytes", + transfer.endpoint, + transfer.overlapped.Internal, + transfer.actual_len(), + ); } + unsafe { notify_completion::(t) } } diff --git a/src/transfer/buffer.rs b/src/transfer/buffer.rs deleted file mode 100644 index bce196a..0000000 --- a/src/transfer/buffer.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::fmt::Debug; -use std::mem::ManuallyDrop; - -use super::TransferRequest; - -/// A buffer for requesting an IN transfer. -/// -/// A `RequestBuffer` is passed when submitting an `IN` transfer to define the -/// requested length and provide a buffer to receive data into. The buffer is -/// returned in the [`Completion`][`crate::transfer::Completion`] as a `Vec` -/// with the data read from the endpoint. The `Vec`'s allocation can turned back -/// into a `RequestBuffer` to re-use it for another transfer. -/// -/// You can think of a `RequestBuffer` as a `Vec` with uninitialized contents. -pub struct RequestBuffer { - pub(crate) buf: *mut u8, - pub(crate) capacity: usize, - pub(crate) requested: usize, -} - -impl RequestBuffer { - /// Create a `RequestBuffer` of the specified size. - pub fn new(len: usize) -> RequestBuffer { - let mut v = ManuallyDrop::new(Vec::with_capacity(len)); - RequestBuffer { - buf: v.as_mut_ptr(), - capacity: v.capacity(), - requested: len, - } - } - - pub(crate) fn into_vec(self) -> (Vec, usize) { - let s = ManuallyDrop::new(self); - let v = unsafe { Vec::from_raw_parts(s.buf, 0, s.capacity) }; - (v, s.requested) - } - - /// Create a `RequestBuffer` by re-using the allocation of a `Vec`. - pub fn reuse(v: Vec, len: usize) -> RequestBuffer { - let mut v = ManuallyDrop::new(v); - v.clear(); - v.reserve_exact(len); - RequestBuffer { - buf: v.as_mut_ptr(), - capacity: v.capacity(), - requested: len, - } - } -} - -unsafe impl Send for RequestBuffer {} -unsafe impl Sync for RequestBuffer {} - -impl Drop for RequestBuffer { - fn drop(&mut self) { - unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity)) } - } -} - -impl Debug for RequestBuffer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RequestBuffer") - .field("requested", &self.requested) - .finish_non_exhaustive() - } -} - -impl TransferRequest for RequestBuffer { - type Response = Vec; -} - -/// Returned buffer and actual length for a completed OUT transfer. -/// -/// When an `OUT` transfer completes, a `ResponseBuffer` is returned in the -/// `Completion`. The [`actual_length`][`ResponseBuffer::actual_length`] tells -/// you how many bytes were successfully sent, which may be useful in the case -/// of a partially-completed transfer. -/// -/// The `ResponseBuffer` can be turned into an empty `Vec` to re-use the allocation -/// for another transfer, or dropped to free the memory. -pub struct ResponseBuffer { - pub(crate) buf: *mut u8, - pub(crate) capacity: usize, - pub(crate) transferred: usize, -} - -impl ResponseBuffer { - pub(crate) fn from_vec(v: Vec, transferred: usize) -> ResponseBuffer { - let mut v = ManuallyDrop::new(v); - ResponseBuffer { - buf: v.as_mut_ptr(), - capacity: v.capacity(), - transferred, - } - } - - /// Get the number of bytes successfully transferred. - pub fn actual_length(&self) -> usize { - self.transferred - } - - /// Extract the buffer as an empty `Vec` to re-use in another transfer. - pub fn reuse(self) -> Vec { - let s = ManuallyDrop::new(self); - unsafe { Vec::from_raw_parts(s.buf, 0, s.capacity) } - } -} - -impl Debug for ResponseBuffer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ResponseBuffer") - .field("transferred", &self.transferred) - .finish_non_exhaustive() - } -} - -unsafe impl Send for ResponseBuffer {} -unsafe impl Sync for ResponseBuffer {} - -impl Drop for ResponseBuffer { - fn drop(&mut self) { - unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity)) } - } -} - -impl TransferRequest for Vec { - type Response = ResponseBuffer; -} diff --git a/src/transfer/control.rs b/src/transfer/control.rs index feea377..805125c 100644 --- a/src/transfer/control.rs +++ b/src/transfer/control.rs @@ -1,5 +1,3 @@ -use super::{ResponseBuffer, TransferRequest}; - /// Transfer direction #[derive(Clone, Copy, PartialEq, Eq, Debug)] #[repr(u8)] @@ -52,38 +50,8 @@ pub enum Recipient { Other = 3, } -/// SETUP packet without direction or buffers -pub struct Control { - /// Request type used for the `bmRequestType` field sent in the SETUP packet. - #[doc(alias = "bmRequestType")] - pub control_type: ControlType, - - /// Recipient used for the `bmRequestType` field sent in the SETUP packet. - #[doc(alias = "bmRequestType")] - pub recipient: Recipient, - - /// `bRequest` field sent in the SETUP packet. - #[doc(alias = "bRequest")] - pub request: u8, - - /// `wValue` field sent in the SETUP packet. - #[doc(alias = "wValue")] - pub value: u16, - - /// `wIndex` field sent in the SETUP packet. - /// - /// For [`Recipient::Interface`] this is the interface number. For [`Recipient::Endpoint`] this is the endpoint number. - #[doc(alias = "wIndex")] - pub index: u16, -} - -impl Control { - pub(crate) fn request_type(&self, direction: Direction) -> u8 { - request_type(direction, self.control_type, self.recipient) - } -} - /// SETUP packet and associated data to make an **OUT** request on a control endpoint. +#[derive(Debug, Clone, Copy)] pub struct ControlOut<'a> { /// Request type used for the `bmRequestType` field sent in the SETUP packet. #[doc(alias = "bmRequestType")] @@ -114,16 +82,16 @@ pub struct ControlOut<'a> { impl<'a> ControlOut<'a> { #[allow(unused)] - pub(crate) fn setup_packet(&self) -> Result<[u8; SETUP_PACKET_SIZE], ()> { - Ok(pack_setup( + pub(crate) fn setup_packet(&self) -> [u8; SETUP_PACKET_SIZE] { + pack_setup( Direction::Out, self.control_type, self.recipient, self.request, self.value, self.index, - self.data.len().try_into().map_err(|_| ())?, - )) + self.data.len().try_into().expect("length must fit in u16"), + ) } #[allow(unused)] @@ -132,11 +100,8 @@ impl<'a> ControlOut<'a> { } } -impl TransferRequest for ControlOut<'_> { - type Response = ResponseBuffer; -} - /// SETUP packet to make an **IN** request on a control endpoint. +#[derive(Debug, Clone, Copy)] pub struct ControlIn { /// Request type used for the `bmRequestType` field sent in the SETUP packet. #[doc(alias = "bmRequestType")] @@ -210,10 +175,6 @@ fn pack_setup( ] } -impl TransferRequest for ControlIn { - type Response = Vec; -} - pub(crate) fn request_type( direction: Direction, control_type: ControlType, diff --git a/src/transfer/internal.rs b/src/transfer/internal.rs index 44cfe1d..2dbb164 100644 --- a/src/transfer/internal.rs +++ b/src/transfer/internal.rs @@ -1,173 +1,188 @@ use std::{ - cell::UnsafeCell, - ffi::c_void, - ptr::NonNull, + collections::VecDeque, + future::Future, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, + pin::Pin, + ptr::{addr_of_mut, NonNull}, sync::{ atomic::{AtomicU8, Ordering}, - Arc, + Arc, Mutex, }, - task::{Context, Poll}, + task::{Context, Poll, Waker}, + thread::{self, Thread}, }; -use atomic_waker::AtomicWaker; +use crate::MaybeFuture; -use super::Completion; - -pub trait PlatformTransfer: Send { - /// Request cancellation of a transfer that may or may not currently be - /// pending. - fn cancel(&self); +pub struct Notify { + state: Mutex, } -pub trait TransferRequest { - type Response; +pub enum NotifyState { + None, + Waker(Waker), + Thread(Thread), } -pub trait PlatformSubmit: PlatformTransfer { - /// Fill the transfer with the data from `data` and submit it to the kernel. - /// Arrange for `notify_completion(transfer)` to be called once the transfer - /// has completed. - /// - /// SAFETY(caller): transfer is in an idle state - unsafe fn submit(&mut self, data: D, transfer: *mut c_void); - - /// SAFETY(caller): `transfer` is in a completed state - unsafe fn take_completed(&mut self) -> Completion; +impl AsRef for Notify { + fn as_ref(&self) -> &Notify { + self + } } -struct TransferInner { +impl Notify { + pub fn new() -> Self { + Self { + state: Mutex::new(NotifyState::None), + } + } + + pub fn subscribe(&self, cx: &mut Context) { + *self.state.lock().unwrap() = NotifyState::Waker(cx.waker().clone()); + } + + pub fn wait(&self) { + *self.state.lock().unwrap() = NotifyState::Thread(thread::current()); + thread::park(); + } + + pub fn notify(&self) { + match &mut *self.state.lock().unwrap() { + NotifyState::None => {} + NotifyState::Waker(waker) => waker.wake_by_ref(), + NotifyState::Thread(thread) => thread.unpark(), + } + } +} + +#[repr(C)] +struct TransferInner

{ /// Platform-specific data. - /// - /// In an `UnsafeCell` because we provide `&mut` when the - /// state guarantees us exclusive access - platform_data: UnsafeCell

, + platform_data: P, /// One of the `STATE_*` constants below, used to synchronize /// the state. state: AtomicU8, - /// Waker that is notified when transfer completes. - waker: Arc, + /// Object notified when transfer completes. + notify: Arc + Send + Sync>, } -/// Handle to a transfer. -/// -/// Cancels the transfer and arranges for memory to be freed -/// when dropped. -pub(crate) struct TransferHandle { - ptr: NonNull>, -} - -unsafe impl Send for TransferHandle

{} -unsafe impl Sync for TransferHandle

{} - -/// The transfer has not been submitted. The buffer is not valid. +/// Either the transfer has not yet been submitted, or it has been completed. +/// The inner data may be accessed mutably. const STATE_IDLE: u8 = 0; /// The transfer has been or is about to be submitted to the kernel and -/// completion has not yet been handled. The buffer points to valid memory but -/// cannot necessarily be accessed by userspace. There is a future or queue -/// waiting for it completion. +/// completion has not yet been handled. The buffer cannot necessarily be +/// accessed by userspace. There is a future or queue waiting for its completion. const STATE_PENDING: u8 = 1; /// Like PENDING, but there is no one waiting for completion. The completion /// handler will drop the buffer and transfer. const STATE_ABANDONED: u8 = 2; -/// The transfer completion has been handled on the event loop thread. The -/// buffer is valid and may be accessed by the `TransferHandle`. -const STATE_COMPLETED: u8 = 3; +/// Handle to a transfer that is known to be idle. +pub(crate) struct Idle

(Box>); -impl TransferHandle

{ +impl

Idle

{ /// Create a new transfer and get a handle. - pub(crate) fn new(inner: P) -> TransferHandle

{ - let b = Box::new(TransferInner { - platform_data: UnsafeCell::new(inner), + pub(crate) fn new(notify: Arc + Send + Sync>, inner: P) -> Idle

{ + Idle(Box::new(TransferInner { + platform_data: inner, state: AtomicU8::new(STATE_IDLE), - waker: Arc::new(AtomicWaker::new()), - }); - - TransferHandle { - ptr: Box::leak(b).into(), - } + notify, + })) } - fn inner(&self) -> &TransferInner

{ - // SAFETY: while `TransferHandle` is alive, its `TransferInner` is alive - // (it may be shared by `notify_completion` on the event thread, so can't be &mut) - unsafe { self.ptr.as_ref() } - } - - fn platform_data(&self) -> &P { - // SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data` - // is via this `TransferHandle`. - unsafe { &*self.inner().platform_data.get() } - } - - pub(crate) fn submit(&mut self, data: D) - where - D: TransferRequest, - P: PlatformSubmit, - { - let inner = self.inner(); - + /// Mark the transfer as pending. The caller must submit the transfer to the kernel + /// and arrange for `notify_completion` to be called on the returned value. + pub(crate) fn pre_submit(self) -> Pending

{ // It's the syscall that submits the transfer that actually performs the // release ordering. - let prev = self.inner().state.swap(STATE_PENDING, Ordering::Relaxed); + let prev = self.0.state.swap(STATE_PENDING, Ordering::Relaxed); assert_eq!(prev, STATE_IDLE, "Transfer should be idle when submitted"); - - // SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data` - // is via this `TransferHandle`. Verified that it is idle. - unsafe { - let p = &mut *inner.platform_data.get(); - p.submit(data, self.ptr.as_ptr() as *mut c_void); + Pending { + ptr: unsafe { NonNull::new_unchecked(Box::into_raw(self.0)) }, } } - pub(crate) fn cancel(&mut self) { - self.platform_data().cancel(); + pub(crate) fn notify_eq(&self, other: &Arc) -> bool { + Arc::as_ptr(&self.0.notify) as *const () == Arc::as_ptr(other) as *const () + } +} + +impl

Deref for Idle

{ + type Target = P; + fn deref(&self) -> &Self::Target { + &self.0.platform_data + } +} + +impl

DerefMut for Idle

{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0.platform_data + } +} + +/// Handle to a transfer that may be pending. +pub(crate) struct Pending

{ + ptr: NonNull>, +} + +unsafe impl Send for Pending

{} +unsafe impl Sync for Pending

{} + +impl

Pending

{ + pub fn as_ptr(&self) -> *mut P { + // first member of repr(C) struct + self.ptr.as_ptr().cast() } - fn poll_completion_generic(&mut self, cx: &Context) -> Poll<&mut P> { - let inner = self.inner(); - inner.waker.register(cx.waker()); - match inner.state.load(Ordering::Acquire) { - STATE_PENDING => Poll::Pending, - STATE_COMPLETED => { - // Relaxed because this doesn't synchronize with anything, - // just marks that we no longer need to drop the buffer - inner.state.store(STATE_IDLE, Ordering::Relaxed); + fn state(&self) -> &AtomicU8 { + // Get state without dereferencing as `TransferInner`, because + // its `platform_data` may be mutably aliased. + unsafe { &*(addr_of_mut!((*self.ptr.as_ptr()).state)) } + } - // SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data` - // is via this `TransferHandle`. - Poll::Ready(unsafe { &mut *inner.platform_data.get() }) - } + pub fn is_complete(&self) -> bool { + match self.state().load(Ordering::Acquire) { + STATE_PENDING => false, + STATE_IDLE => true, s => panic!("Polling transfer in unexpected state {s}"), } } - pub fn poll_completion( - &mut self, - cx: &Context, - ) -> Poll> - where - D: TransferRequest, - P: PlatformSubmit, - { - // SAFETY: `poll_completion_generic` checks that it is completed - self.poll_completion_generic(cx) - .map(|u| unsafe { u.take_completed() }) + /// SAFETY: is_complete must have returned `true` + pub unsafe fn into_idle(self) -> Idle

{ + debug_assert!(self.is_complete()); + let transfer = ManuallyDrop::new(self); + Idle(Box::from_raw(transfer.ptr.as_ptr())) } } -impl Drop for TransferHandle

{ +pub fn take_completed_from_queue

(queue: &mut VecDeque>) -> Option> { + if queue.front().expect("no transfer pending").is_complete() { + Some(unsafe { queue.pop_front().unwrap().into_idle() }) + } else { + None + } +} + +pub fn take_completed_from_option

(option: &mut Option>) -> Option> { + // TODO: use Option::take_if once supported by MSRV + if option.as_mut().map_or(false, |next| next.is_complete()) { + option.take().map(|t| unsafe { t.into_idle() }) + } else { + None + } +} + +impl

Drop for Pending

{ fn drop(&mut self) { - match self.inner().state.swap(STATE_ABANDONED, Ordering::Acquire) { - STATE_PENDING => { - self.cancel(); - /* handler responsible for dropping */ - } - STATE_IDLE | STATE_COMPLETED => { + match self.state().swap(STATE_ABANDONED, Ordering::Acquire) { + STATE_PENDING => { /* handler responsible for dropping */ } + STATE_IDLE => { // SAFETY: state means there is no concurrent access unsafe { drop(Box::from_raw(self.ptr.as_ptr())) } } @@ -180,12 +195,12 @@ impl Drop for TransferHandle

{ /// /// SAFETY: `transfer` must be a pointer previously passed to `submit`, and /// the caller / kernel must no longer dereference it or its buffer. -pub(crate) unsafe fn notify_completion(transfer: *mut c_void) { +pub(crate) unsafe fn notify_completion

(transfer: *mut P) { unsafe { let transfer = transfer as *mut TransferInner

; - let waker = (*transfer).waker.clone(); - match (*transfer).state.swap(STATE_COMPLETED, Ordering::Release) { - STATE_PENDING => waker.wake(), + let notify = (*transfer).notify.clone(); + match (*transfer).state.swap(STATE_IDLE, Ordering::Release) { + STATE_PENDING => (*notify).as_ref().notify(), STATE_ABANDONED => { drop(Box::from_raw(transfer)); } @@ -193,3 +208,42 @@ pub(crate) unsafe fn notify_completion(transfer: *mut c_voi } } } + +pub(crate) struct TransferFuture { + transfer: Option>, + notify: Arc, +} + +impl TransferFuture { + pub(crate) fn new(transfer: D, submit: impl FnOnce(Idle) -> Pending) -> Self { + let notify = Arc::new(Notify::new()); + let transfer = submit(Idle::new(notify.clone(), transfer)); + Self { + transfer: Some(transfer), + notify, + } + } +} + +impl Future for TransferFuture { + type Output = Idle; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.notify.subscribe(cx); + take_completed_from_option(&mut self.transfer).map_or(Poll::Pending, Poll::Ready) + } +} + +impl MaybeFuture for TransferFuture +where + D: Send, +{ + fn wait(mut self) -> Self::Output { + loop { + if let Some(transfer) = take_completed_from_option(&mut self.transfer) { + return transfer; + } + self.notify.wait(); + } + } +} diff --git a/src/transfer/mod.rs b/src/transfer/mod.rs index 14e100a..1d6e059 100644 --- a/src/transfer/mod.rs +++ b/src/transfer/mod.rs @@ -3,48 +3,16 @@ //! Use the methods on an [`Interface`][`super::Interface`] to make individual //! transfers or obtain a [`Queue`] to manage multiple transfers. -use std::{ - fmt::Display, - future::Future, - io, - marker::PhantomData, - task::{Context, Poll}, -}; - -use crate::platform; - -mod queue; -pub use queue::Queue; - -mod buffer; -pub use buffer::{RequestBuffer, ResponseBuffer}; +use std::{fmt::Display, io}; mod control; #[allow(unused)] -pub(crate) use control::SETUP_PACKET_SIZE; -pub use control::{Control, ControlIn, ControlOut, ControlType, Direction, Recipient}; +pub(crate) use control::{request_type, SETUP_PACKET_SIZE}; +pub use control::{ControlIn, ControlOut, ControlType, Direction, Recipient}; -mod internal; -pub(crate) use internal::{ - notify_completion, PlatformSubmit, PlatformTransfer, TransferHandle, TransferRequest, -}; +pub(crate) mod internal; -/// Endpoint type. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -#[allow(dead_code)] -pub enum TransferType { - /// Control endpoint. - Control = 0, - - /// Isochronous endpoint. - Isochronous = 1, - - /// Bulk endpoint. - Bulk = 2, - - /// Interrupt endpoint. - Interrupt = 3, -} +use crate::descriptors::TransferType; /// Transfer error. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -97,82 +65,53 @@ impl From for io::Error { } } -/// Status and data returned on transfer completion. -/// -/// A transfer can return partial data even in the case of failure or -/// cancellation, thus this is a struct containing both `data` and `status` -/// rather than a `Result`. Use [`into_result`][`Completion::into_result`] to -/// ignore a partial transfer and get a `Result`. -#[derive(Debug, Clone)] -#[must_use] -pub struct Completion { - /// Returned data or buffer to re-use. - pub data: T, - - /// Indicates successful completion or error. - pub status: Result<(), TransferError>, +mod private { + pub trait Sealed {} } -impl Completion { - /// Ignore any partial completion, turning `self` into a `Result` containing - /// either the completed buffer for a successful transfer or a - /// `TransferError`. - pub fn into_result(self) -> Result { - self.status.map(|()| self.data) - } +/// Type-level endpoint direction +pub trait EndpointDirection: private::Sealed + Send + Sync { + /// Runtime direction value + const DIR: Direction; } -impl TryFrom>> for Vec { - type Error = TransferError; - - fn try_from(c: Completion>) -> Result { - c.into_result() - } +/// Type-level endpoint direction: device-to-host +pub enum In {} +impl private::Sealed for In {} +impl EndpointDirection for In { + const DIR: Direction = Direction::In; } -impl TryFrom> for ResponseBuffer { - type Error = TransferError; - - fn try_from(c: Completion) -> Result { - c.into_result() - } +/// Type-level endpoint direction: host-to-device +pub enum Out {} +impl private::Sealed for Out {} +impl EndpointDirection for Out { + const DIR: Direction = Direction::Out; } -/// [`Future`] used to await the completion of a transfer. -/// -/// Use the methods on [`Interface`][super::Interface] to -/// submit an individual transfer and obtain a `TransferFuture`. -/// -/// The transfer is cancelled on drop. The buffer and -/// any partially-completed data are destroyed. This means -/// that `TransferFuture` is not [cancel-safe] and cannot be used -/// in `select!{}`, When racing a `TransferFuture` with a timeout -/// you cannot tell whether data may have been partially transferred on timeout. -/// Use the [`Queue`] interface if these matter for your application. -/// -/// [cancel-safe]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety -pub struct TransferFuture { - transfer: TransferHandle, - ty: PhantomData, +/// Type-level endpoint direction +pub trait EndpointType: private::Sealed + Send + Sync { + /// Runtime direction value + const TYPE: TransferType; } -impl TransferFuture { - pub(crate) fn new(transfer: TransferHandle) -> TransferFuture { - TransferFuture { - transfer, - ty: PhantomData, - } - } -} +/// EndpointType for Bulk and interrupt endpoints. +pub trait BulkOrInterrupt: EndpointType {} -impl Future for TransferFuture -where - platform::TransferData: PlatformSubmit, - D::Response: Unpin, -{ - type Output = Completion; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().transfer.poll_completion::(cx) - } +/// Type-level endpoint type: Bulk +pub enum Bulk {} +impl private::Sealed for Bulk {} +impl EndpointType for Bulk { + const TYPE: TransferType = TransferType::Bulk; } +impl BulkOrInterrupt for Bulk {} + +/// Type-level endpoint type: Interrupt +pub enum Interrupt {} +impl private::Sealed for Interrupt {} +impl EndpointType for Interrupt { + const TYPE: TransferType = TransferType::Interrupt; +} +impl BulkOrInterrupt for Interrupt {} + +pub use crate::device::{Completion, Request}; diff --git a/src/transfer/queue.rs b/src/transfer/queue.rs deleted file mode 100644 index 8c0f22e..0000000 --- a/src/transfer/queue.rs +++ /dev/null @@ -1,237 +0,0 @@ -use std::{ - collections::VecDeque, - future::{poll_fn, Future}, - marker::PhantomData, - sync::Arc, - task::{Context, Poll}, -}; - -use crate::{platform, Error, MaybeFuture}; - -use super::{Completion, PlatformSubmit, TransferHandle, TransferRequest, TransferType}; - -/// Manages a stream of transfers on an endpoint. -/// -/// A `Queue` optimizes a common pattern when streaming data to or from a USB -/// endpoint: To maximize throughput and minimize latency, the host controller -/// needs to attempt a transfer in every possible frame. That requires always -/// having a transfer request pending with the kernel by submitting multiple -/// transfer requests and re-submitting them as they complete. -/// -/// Use the methods on [`Interface`][`crate::Interface`] to obtain a `Queue`. -/// -/// When the `Queue` is dropped, all pending transfers are cancelled. -/// -/// ### Why use a `Queue` instead of submitting multiple transfers individually with the methods on [`Interface`][`crate::Interface`]? -/// -/// * Individual transfers give you individual `Future`s, which you then have -/// to keep track of and poll using something like `FuturesUnordered`. -/// * A `Queue` provides better cancellation semantics than `Future`'s -/// cancel-on-drop. -/// * After dropping a [`TransferFuture`][super::TransferFuture], you lose -/// the ability to get the status of the cancelled transfer and see if it -/// may have been partially or fully completed. -/// * When cancelling multiple transfers, it's important to do so in reverse -/// order so that subsequent pending transfers can't end up executing. -/// When managing a collection of `TransferFuture`s it's tricky to -/// guarantee drop order, while `Queue` always cancels its contained -/// transfers in reverse order. -/// * The `TransferFuture` methods on `Interface` are not [cancel-safe], -/// meaning they cannot be used in `select!{}` or similar patterns, -/// because dropping the Future has side effects and can lose data. The -/// Future returned from [`Queue::next_complete`] is cancel-safe because -/// it merely waits for completion, while the `Queue` owns the pending -/// transfers. -/// * A queue caches the internal transfer data structures of the last -/// completed transfer, meaning that if you re-use the data buffer there is -/// no memory allocation involved in continued streaming. -/// -/// [cancel-safe]: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety -/// ### Example (read from an endpoint) -/// -/// ```no_run -/// use futures_lite::future::block_on; -/// use nusb::transfer::RequestBuffer; -/// # use nusb::MaybeFuture; -/// # let di = nusb::list_devices().wait().unwrap().next().unwrap(); -/// # let device = di.open().wait().unwrap(); -/// # let interface = device.claim_interface(0).wait().unwrap(); -/// # fn handle_data(_: &[u8]) {} -/// let mut queue = interface.bulk_in_queue(0x81); -/// -/// let n_transfers = 8; -/// let transfer_size = 256; -/// -/// while queue.pending() < n_transfers { -/// queue.submit(RequestBuffer::new(transfer_size)); -/// } -/// -/// loop { -/// let completion = block_on(queue.next_complete()); -/// handle_data(&completion.data); // your function -/// -/// if completion.status.is_err() { -/// break; -/// } -/// -/// queue.submit(RequestBuffer::reuse(completion.data, transfer_size)) -/// } -/// ``` -/// -/// ### Example (write to an endpoint) -/// ```no_run -/// use std::mem; -/// use futures_lite::future::block_on; -/// # use nusb::MaybeFuture; -/// # let di = nusb::list_devices().wait().unwrap().next().unwrap(); -/// # let device = di.open().wait().unwrap(); -/// # let interface = device.claim_interface(0).wait().unwrap(); -/// # fn fill_data(_: &mut Vec) {} -/// # fn data_confirmed_sent(_: usize) {} -/// let mut queue = interface.bulk_out_queue(0x02); -/// -/// let n_transfers = 8; -/// -/// let mut next_buf = Vec::new(); -/// -/// loop { -/// while queue.pending() < n_transfers { -/// let mut buf = mem::replace(&mut next_buf, Vec::new()); -/// fill_data(&mut buf); // your function -/// queue.submit(buf); -/// } -/// -/// let completion = block_on(queue.next_complete()); -/// data_confirmed_sent(completion.data.actual_length()); // your function -/// next_buf = completion.data.reuse(); - -/// if completion.status.is_err() { -/// break; -/// } -/// } -/// ``` -pub struct Queue { - interface: Arc, - endpoint: u8, - endpoint_type: TransferType, - - /// A queue of pending transfers, expected to complete in order - pending: VecDeque>, - - /// An idle transfer that recently completed for re-use. - cached: Option>, - - bufs: PhantomData, -} - -impl Queue -where - R: TransferRequest + Send + Sync, - platform::TransferData: PlatformSubmit, -{ - pub(crate) fn new( - interface: Arc, - endpoint: u8, - endpoint_type: TransferType, - ) -> Queue { - Queue { - interface, - endpoint, - endpoint_type, - pending: VecDeque::new(), - cached: None, - bufs: PhantomData, - } - } - - /// Submit a new transfer on the endpoint. - /// - /// For an `IN` endpoint, pass a [`RequestBuffer`][`super::RequestBuffer`].\ - /// For an `OUT` endpoint, pass a [`Vec`]. - pub fn submit(&mut self, data: R) { - let mut transfer = self.cached.take().unwrap_or_else(|| { - self.interface - .make_transfer(self.endpoint, self.endpoint_type) - }); - transfer.submit(data); - self.pending.push_back(transfer); - } - - /// Return a `Future` that waits for the next pending transfer to complete, and yields its - /// buffer and status. - /// - /// For an `IN` endpoint, the completion contains a [`Vec`].\ - /// For an `OUT` endpoint, the completion contains a [`ResponseBuffer`][`super::ResponseBuffer`]. - /// - /// This future is cancel-safe: it can be cancelled and re-created without - /// side effects, enabling its use in `select!{}` or similar. - /// - /// Panics if there are no transfers pending. - pub fn next_complete<'a>( - &'a mut self, - ) -> impl Future> + Unpin + Send + Sync + 'a { - poll_fn(|cx| self.poll_next(cx)) - } - - /// Get the next pending transfer if one has completed, or register the - /// current task for wakeup when the next transfer completes. - /// - /// For an `IN` endpoint, the completion contains a [`Vec`].\ - /// For an `OUT` endpoint, the completion contains a - /// [`ResponseBuffer`][`super::ResponseBuffer`]. - /// - /// Panics if there are no transfers pending. - pub fn poll_next(&mut self, cx: &mut Context) -> Poll> { - let res = self - .pending - .front_mut() - .expect("queue should have pending transfers when calling next_complete") - .poll_completion::(cx); - if res.is_ready() { - self.cached = self.pending.pop_front(); - } - res - } - - /// Get the number of transfers that have been submitted with `submit` that - /// have not yet been returned from `next_complete`. - pub fn pending(&self) -> usize { - self.pending.len() - } - - /// Request cancellation of all pending transfers. - /// - /// The transfers will still be returned from subsequent calls to - /// `next_complete` so you can tell which were completed, - /// partially-completed, or cancelled. - pub fn cancel_all(&mut self) { - // Cancel transfers in reverse order to ensure subsequent transfers - // can't complete out of order while we're going through them. - for transfer in self.pending.iter_mut().rev() { - transfer.cancel(); - } - } - - /// Clear the endpoint's halt / stall condition. - /// - /// Sends a `CLEAR_FEATURE` `ENDPOINT_HALT` control transfer to tell the - /// device to reset the endpoint's data toggle and clear the halt / stall - /// condition, and resets the host-side data toggle. - /// - /// Use this after receiving - /// [`TransferError::Stall`][crate::transfer::TransferError::Stall] to clear - /// the error and resume use of the endpoint. - /// - /// This should not be called when transfers are pending on the endpoint. - pub fn clear_halt(&mut self) -> impl MaybeFuture> { - self.interface.clone().clear_halt(self.endpoint) - } -} - -impl Drop for Queue { - fn drop(&mut self) { - // Cancel transfers in reverse order to ensure subsequent transfers - // can't complete out of order while we're going through them. - self.pending.drain(..).rev().for_each(drop) - } -} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..e6638b0 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,18 @@ +use std::mem::MaybeUninit; + +/// Copies the elements from `src` to `dest`, +/// returning a mutable reference to the now initialized contents of `dest`. +/// +/// Port of the `[MaybeUninit]` method from std, which is not stable yet. +pub fn write_copy_of_slice<'a, T>(dest: &'a mut [MaybeUninit], src: &[T]) -> &'a mut [T] +where + T: Copy, +{ + // SAFETY: &[T] and &[MaybeUninit] have the same layout + let uninit_src: &[MaybeUninit] = unsafe { std::mem::transmute(src) }; + + dest.copy_from_slice(uninit_src); + + // SAFETY: Valid elements have just been copied into `self` so it is initialized + unsafe { &mut *(dest as *mut [MaybeUninit] as *mut [T]) } +}