Implement Queue API

This commit is contained in:
Kevin Mehall 2023-10-01 14:45:05 -06:00
parent 1cd8c2c935
commit 21cc94ebd6
3 changed files with 82 additions and 24 deletions

View file

@ -13,8 +13,13 @@ fn main() {
let device = di.open().unwrap();
let interface = device.claim_interface(0).unwrap();
let mut queue = interface.bulk_queue(0x81);
loop {
let result = block_on(interface.bulk_transfer(0x81, Vec::with_capacity(256)));
while queue.pending() < 8 {
queue.submit(Vec::with_capacity(256));
}
let result = block_on(queue.next_complete());
println!("{result:?}");
if result.status != TransferStatus::Complete {
break;

View file

@ -1,4 +1,8 @@
use std::{collections::VecDeque, sync::Arc, time::Duration};
use std::{
collections::VecDeque,
future::{poll_fn, Future},
sync::Arc,
};
use crate::{
control::{ControlIn, ControlOut},
@ -17,7 +21,7 @@ pub struct Device {
impl Device {
pub(crate) fn open(d: &DeviceInfo) -> Result<Device, std::io::Error> {
let backend = crate::platform::Device::from_device_info(d)?;
let backend = platform::Device::from_device_info(d)?;
Ok(Device { backend })
}
@ -36,7 +40,7 @@ impl Device {
}
pub struct Interface {
backend: Arc<crate::platform::Interface>,
backend: Arc<platform::Interface>,
}
impl Interface {
@ -62,19 +66,48 @@ impl Interface {
TransferFuture::new(t)
}
pub fn bulk_queue(&self, endpoint: u8) -> Queue {
Queue::new(self.backend.clone(), endpoint, EndpointType::Bulk)
}
pub fn interrupt_transfer(&self, endpoint: u8, buf: Vec<u8>) -> TransferFuture<Vec<u8>> {
let mut t = TransferHandle::new(self.backend.clone(), endpoint, EndpointType::Interrupt);
t.submit(buf);
TransferFuture::new(t)
}
pub fn interrupt_queue(&self, endpoint: u8) -> Queue {
Queue::new(self.backend.clone(), endpoint, EndpointType::Interrupt)
}
}
struct Queue {
pub struct Queue {
interface: Arc<platform::Interface>,
endpoint: u8,
endpoint_type: EndpointType,
/// A queue of pending transfers, expected to complete in order
pending: VecDeque<TransferHandle<platform::Interface>>,
/// An idle transfer that recently completed for re-use. Limiting
cached: Option<TransferHandle<platform::Interface>>,
}
impl Queue {
fn new(
interface: Arc<platform::Interface>,
endpoint: u8,
endpoint_type: EndpointType,
) -> Queue {
Queue {
interface,
endpoint,
endpoint_type,
pending: VecDeque::new(),
cached: None,
}
}
/// Submit a new transfer on the endpoint.
///
/// For an IN endpoint, the transfer size is set by the *capacity* of
@ -84,8 +117,12 @@ impl Queue {
///
/// For an OUT endpoint, the contents of the buffer are written to
/// the endpoint.
pub fn submit(&mut self, buf: Buffer) -> Result<(), TransferError> {
todo!()
pub fn submit(&mut self, data: Buffer) {
let mut transfer = self.cached.take().unwrap_or_else(|| {
TransferHandle::new(self.interface.clone(), self.endpoint, self.endpoint_type)
});
transfer.submit(data);
self.pending.push_back(transfer);
}
/// Block waiting for the next pending transfer to complete, and return
@ -96,27 +133,43 @@ impl Queue {
///
/// For an OUT endpoint, the buffer is unmodified, but can be
/// reused for another transfer.
pub fn complete(&mut self, timeout: Option<Duration>) -> Option<Completion<Vec<u8>>> {
todo!()
///
/// Panics if there are no transfers pending.
pub fn next_complete<'a>(&'a mut self) -> impl Future<Output = Completion<Vec<u8>>> + 'a {
poll_fn(|cx| {
let res = self
.pending
.front_mut()
.expect("queue should have pending transfers when calling next_complete")
.poll_completion::<Vec<u8>>(cx);
if res.is_ready() {
self.cached = self.pending.pop_front();
}
res
})
}
/// Get the number of transfers that have been submitted with
/// `submit` that have not yet been returned from `complete`.
pub fn pending_transfers(&self) -> usize {
todo!()
pub fn pending(&self) -> usize {
self.pending.len()
}
/// Get the number of transfers that have completed and are
/// ready to be returned from `complete` without blocking.
pub fn ready_transfers(&self) -> usize {
todo!()
}
/// Cancel all pending transfers on the endpoint pipe.
/// TODO: maybe this should be on the `Device` or an object separable from the `Pipe`
/// so it can be called from another thread, and cause a blocking `complete` call to
//// immediately return.
fn cancel_all(&mut self) -> Result<(), TransferError> {
todo!()
/// Cancel all pending transfers. They will still be returned from `complete` so you can tell
/// which were completed, partially-completed, or cancelled.
pub fn cancel_all(&mut self) {
// Cancel transfers in reverse order to ensure subsequent transfers can't complete
// out of order while we're going through them.
for transfer in self.pending.iter_mut().rev() {
transfer.cancel();
}
}
}
impl Drop for Queue {
fn drop(&mut self) {
// Cancel transfers in reverse order to ensure subsequent transfers can't complete
// out of order while we're going through them.
self.pending.drain(..).rev().for_each(drop)
}
}

View file

@ -60,7 +60,7 @@ struct TransferInner<P: Platform> {
/// Platform-specific data.
///
/// In an `UnsafeCell` because we provide `&mut` when the
/// state gurantees us exclusive access
/// state guarantees us exclusive access
platform_data: UnsafeCell<P::TransferData>,
/// One of the `STATE_*` constants below, used to synchronize