diff --git a/examples/bulk.rs b/examples/bulk.rs index 3a72011..a40e157 100644 --- a/examples/bulk.rs +++ b/examples/bulk.rs @@ -13,8 +13,13 @@ fn main() { let device = di.open().unwrap(); let interface = device.claim_interface(0).unwrap(); + let mut queue = interface.bulk_queue(0x81); + loop { - let result = block_on(interface.bulk_transfer(0x81, Vec::with_capacity(256))); + while queue.pending() < 8 { + queue.submit(Vec::with_capacity(256)); + } + let result = block_on(queue.next_complete()); println!("{result:?}"); if result.status != TransferStatus::Complete { break; diff --git a/src/device.rs b/src/device.rs index 549ad0d..e812c0e 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1,4 +1,8 @@ -use std::{collections::VecDeque, sync::Arc, time::Duration}; +use std::{ + collections::VecDeque, + future::{poll_fn, Future}, + sync::Arc, +}; use crate::{ control::{ControlIn, ControlOut}, @@ -17,7 +21,7 @@ pub struct Device { impl Device { pub(crate) fn open(d: &DeviceInfo) -> Result { - let backend = crate::platform::Device::from_device_info(d)?; + let backend = platform::Device::from_device_info(d)?; Ok(Device { backend }) } @@ -36,7 +40,7 @@ impl Device { } pub struct Interface { - backend: Arc, + backend: Arc, } impl Interface { @@ -62,19 +66,48 @@ impl Interface { TransferFuture::new(t) } + pub fn bulk_queue(&self, endpoint: u8) -> Queue { + Queue::new(self.backend.clone(), endpoint, EndpointType::Bulk) + } + pub fn interrupt_transfer(&self, endpoint: u8, buf: Vec) -> TransferFuture> { let mut t = TransferHandle::new(self.backend.clone(), endpoint, EndpointType::Interrupt); t.submit(buf); TransferFuture::new(t) } + + pub fn interrupt_queue(&self, endpoint: u8) -> Queue { + Queue::new(self.backend.clone(), endpoint, EndpointType::Interrupt) + } } -struct Queue { +pub struct Queue { + interface: Arc, + endpoint: u8, + endpoint_type: EndpointType, + + /// A queue of pending transfers, expected to complete in order pending: VecDeque>, + + /// An idle transfer that recently completed for re-use. Limiting cached: Option>, } impl Queue { + fn new( + interface: Arc, + endpoint: u8, + endpoint_type: EndpointType, + ) -> Queue { + Queue { + interface, + endpoint, + endpoint_type, + pending: VecDeque::new(), + cached: None, + } + } + /// Submit a new transfer on the endpoint. /// /// For an IN endpoint, the transfer size is set by the *capacity* of @@ -84,8 +117,12 @@ impl Queue { /// /// For an OUT endpoint, the contents of the buffer are written to /// the endpoint. - pub fn submit(&mut self, buf: Buffer) -> Result<(), TransferError> { - todo!() + pub fn submit(&mut self, data: Buffer) { + let mut transfer = self.cached.take().unwrap_or_else(|| { + TransferHandle::new(self.interface.clone(), self.endpoint, self.endpoint_type) + }); + transfer.submit(data); + self.pending.push_back(transfer); } /// Block waiting for the next pending transfer to complete, and return @@ -96,27 +133,43 @@ impl Queue { /// /// For an OUT endpoint, the buffer is unmodified, but can be /// reused for another transfer. - pub fn complete(&mut self, timeout: Option) -> Option>> { - todo!() + /// + /// Panics if there are no transfers pending. + pub fn next_complete<'a>(&'a mut self) -> impl Future>> + 'a { + poll_fn(|cx| { + let res = self + .pending + .front_mut() + .expect("queue should have pending transfers when calling next_complete") + .poll_completion::>(cx); + if res.is_ready() { + self.cached = self.pending.pop_front(); + } + res + }) } /// Get the number of transfers that have been submitted with /// `submit` that have not yet been returned from `complete`. - pub fn pending_transfers(&self) -> usize { - todo!() + pub fn pending(&self) -> usize { + self.pending.len() } - /// Get the number of transfers that have completed and are - /// ready to be returned from `complete` without blocking. - pub fn ready_transfers(&self) -> usize { - todo!() - } - - /// Cancel all pending transfers on the endpoint pipe. - /// TODO: maybe this should be on the `Device` or an object separable from the `Pipe` - /// so it can be called from another thread, and cause a blocking `complete` call to - //// immediately return. - fn cancel_all(&mut self) -> Result<(), TransferError> { - todo!() + /// Cancel all pending transfers. They will still be returned from `complete` so you can tell + /// which were completed, partially-completed, or cancelled. + pub fn cancel_all(&mut self) { + // Cancel transfers in reverse order to ensure subsequent transfers can't complete + // out of order while we're going through them. + for transfer in self.pending.iter_mut().rev() { + transfer.cancel(); + } + } +} + +impl Drop for Queue { + fn drop(&mut self) { + // Cancel transfers in reverse order to ensure subsequent transfers can't complete + // out of order while we're going through them. + self.pending.drain(..).rev().for_each(drop) } } diff --git a/src/transfer_internal.rs b/src/transfer_internal.rs index e81c378..8cdd09f 100644 --- a/src/transfer_internal.rs +++ b/src/transfer_internal.rs @@ -60,7 +60,7 @@ struct TransferInner { /// Platform-specific data. /// /// In an `UnsafeCell` because we provide `&mut` when the - /// state gurantees us exclusive access + /// state guarantees us exclusive access platform_data: UnsafeCell, /// One of the `STATE_*` constants below, used to synchronize