Refactor transfers, add control transfers

This commit is contained in:
Kevin Mehall 2023-10-01 11:34:42 -06:00
parent 85551a9779
commit 4dc0c511f6
11 changed files with 616 additions and 234 deletions

35
examples/control.rs Normal file
View file

@ -0,0 +1,35 @@
use futures_lite::future::block_on;
use nusb::{ControlIn, ControlOut, ControlType, Recipient};
fn main() {
env_logger::init();
let di = nusb::list_devices()
.unwrap()
.find(|d| d.vendor_id() == 0x59e3 && d.product_id() == 0x0a23)
.expect("device should be connected");
println!("Device info: {di:?}");
let device = di.open().unwrap();
let interface = device.claim_interface(0).unwrap();
let result = block_on(interface.control_transfer_out(ControlOut {
control_type: ControlType::Vendor,
recipient: Recipient::Device,
request: 0x81,
value: 0x9999,
index: 0x9999,
data: &[1, 2, 3, 4],
}));
println!("{result:?}");
let result = block_on(interface.control_transfer_in(ControlIn {
control_type: ControlType::Vendor,
recipient: Recipient::Device,
request: 0x81,
value: 0x9999,
index: 0x9999,
length: 256,
}));
println!("{result:?}");
}

119
src/control.rs Normal file
View file

@ -0,0 +1,119 @@
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
pub enum Direction {
/// Host to device
Out = 0,
/// Device to host
In = 1,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
pub enum ControlType {
Standard = 0,
Class = 1,
Vendor = 2,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
pub enum Recipient {
Device = 0,
Interface = 1,
Endpoint = 2,
Other = 3,
}
pub struct ControlOut<'a> {
#[doc(alias = "bmRequestType")]
pub control_type: ControlType,
#[doc(alias = "bmRequestType")]
pub recipient: Recipient,
#[doc(alias = "bRequest")]
pub request: u8,
#[doc(alias = "windex")]
pub value: u16,
#[doc(alias = "wIndex")]
pub index: u16,
#[doc(alias = "wLength")]
pub data: &'a [u8],
}
impl<'a> ControlOut<'a> {
pub(crate) fn setup_packet(&self) -> Result<[u8; SETUP_PACKET_SIZE], ()> {
Ok(pack_setup(
Direction::Out,
self.control_type,
self.recipient,
self.request,
self.value,
self.index,
self.data.len().try_into().map_err(|_| ())?,
))
}
}
pub struct ControlIn {
#[doc(alias = "bmRequestType")]
pub control_type: ControlType,
#[doc(alias = "bmRequestType")]
pub recipient: Recipient,
#[doc(alias = "bRequest")]
pub request: u8,
#[doc(alias = "windex")]
pub value: u16,
#[doc(alias = "wIndex")]
pub index: u16,
#[doc(alias = "wLength")]
pub length: u16,
}
impl ControlIn {
pub(crate) fn setup_packet(&self) -> [u8; SETUP_PACKET_SIZE] {
pack_setup(
Direction::In,
self.control_type,
self.recipient,
self.request,
self.value,
self.index,
self.length,
)
}
}
pub(crate) const SETUP_PACKET_SIZE: usize = 8;
fn pack_setup(
direction: Direction,
control_type: ControlType,
recipient: Recipient,
request: u8,
value: u16,
index: u16,
length: u16,
) -> [u8; SETUP_PACKET_SIZE] {
let bmrequesttype = ((direction as u8) << 7) | ((control_type as u8) << 5) | (recipient as u8);
[
bmrequesttype,
request,
(value & 0xFF) as u8,
(value >> 8) as u8,
(index & 0xFF) as u8,
(index >> 8) as u8,
(length & 0xFF) as u8,
(length >> 8) as u8,
]
}

View file

@ -1,6 +1,11 @@
use std::{collections::VecDeque, sync::Arc, time::Duration};
use crate::{transfer::EndpointType, Completion, DeviceInfo, Error, Transfer};
use crate::{
control::{ControlIn, ControlOut},
platform,
transfer_internal::TransferHandle,
Completion, DeviceInfo, EndpointType, Error, TransferFuture,
};
type TransferError = Error;
type Buffer = Vec<u8>;
@ -39,22 +44,34 @@ impl Interface {
todo!()
}
pub fn bulk_transfer(&self, endpoint: u8, buf: Vec<u8>) -> Transfer {
let mut t = Transfer::new(self.backend.clone(), endpoint, EndpointType::Bulk);
t.submit(buf);
t
pub fn control_transfer_in(&self, data: ControlIn) -> TransferFuture<ControlIn> {
let mut t = TransferHandle::new(self.backend.clone(), 0, EndpointType::Control);
t.submit::<ControlIn>(data);
TransferFuture::new(t)
}
pub fn interrupt_transfer(&self, endpoint: u8, buf: Vec<u8>) -> Transfer {
let mut t = Transfer::new(self.backend.clone(), endpoint, EndpointType::Interrupt);
pub fn control_transfer_out(&self, data: ControlOut) -> TransferFuture<ControlOut> {
let mut t = TransferHandle::new(self.backend.clone(), 0, EndpointType::Control);
t.submit::<ControlOut>(data);
TransferFuture::new(t)
}
pub fn bulk_transfer(&self, endpoint: u8, buf: Vec<u8>) -> TransferFuture<Vec<u8>> {
let mut t = TransferHandle::new(self.backend.clone(), endpoint, EndpointType::Bulk);
t.submit(buf);
t
TransferFuture::new(t)
}
pub fn interrupt_transfer(&self, endpoint: u8, buf: Vec<u8>) -> TransferFuture<Vec<u8>> {
let mut t = TransferHandle::new(self.backend.clone(), endpoint, EndpointType::Interrupt);
t.submit(buf);
TransferFuture::new(t)
}
}
struct Queue {
pending: VecDeque<Transfer>,
cached: Option<Transfer>,
pending: VecDeque<TransferHandle<platform::Interface>>,
cached: Option<TransferHandle<platform::Interface>>,
}
impl Queue {
@ -79,7 +96,7 @@ impl Queue {
///
/// For an OUT endpoint, the buffer is unmodified, but can be
/// reused for another transfer.
pub fn complete(&mut self, timeout: Option<Duration>) -> Option<Completion> {
pub fn complete(&mut self, timeout: Option<Duration>) -> Option<Completion<Vec<u8>>> {
todo!()
}

View file

@ -1,16 +1,20 @@
use std::{fmt::Display, io, str::FromStr};
use std::io;
pub mod platform;
use device::Device;
pub use platform::list_devices;
mod control;
pub use control::{ControlIn, ControlOut, ControlType, Direction, Recipient};
mod enumeration;
pub use enumeration::{DeviceInfo, Speed, UnknownValue};
mod device;
use device::Device;
mod transfer;
pub use transfer::{Completion, Transfer, TransferStatus};
pub use transfer::{Completion, EndpointType, TransferFuture, TransferStatus};
mod transfer_internal;
pub type Error = io::Error;

View file

@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc};
use std::{ffi::c_void, path::PathBuf, sync::Arc};
use log::{debug, error};
use rustix::{
@ -7,11 +7,11 @@ use rustix::{
io::Errno,
};
use super::{events, usbfs};
use crate::{
platform::linux_usbfs::transfer::{Transfer, TransferInner},
DeviceInfo, Error,
use super::{
events,
usbfs::{self, Urb},
};
use crate::{transfer_internal, DeviceInfo, Error};
pub(crate) struct LinuxDevice {
fd: OwnedFd,
@ -53,14 +53,17 @@ impl LinuxDevice {
debug!("Handling events for device {}", self.events_id);
match usbfs::reap_urb_ndelay(&self.fd) {
Ok(urb_ptr) => {
{
let user_data = {
let urb = unsafe { &*urb_ptr };
debug!(
"URB {:?} for ep {:x} completed, status={} actual_length={}",
urb_ptr, urb.endpoint, urb.status, urb.actual_length
);
}
unsafe { Transfer::notify_completion(urb_ptr as *mut TransferInner) }
urb.usercontext
};
// SAFETY: pointer came from submit via kernel an we're now done with it
unsafe { transfer_internal::notify_completion::<LinuxInterface>(user_data) }
}
Err(Errno::AGAIN) => {}
Err(Errno::NODEV) => {
@ -117,8 +120,7 @@ pub(crate) struct LinuxInterface {
}
impl LinuxInterface {
pub(crate) unsafe fn submit_transfer(&self, transfer: *mut TransferInner) {
let urb = transfer as *mut usbfs::Urb;
pub(crate) unsafe fn submit_urb(&self, urb: *mut Urb) {
let ep = unsafe { (&mut *urb).endpoint };
if let Err(e) = usbfs::submit_urb(&self.device.fd, urb) {
// SAFETY: Transfer was not submitted. We still own the transfer
@ -130,16 +132,15 @@ impl LinuxInterface {
u.actual_length = 0;
u.status = e.raw_os_error();
}
Transfer::notify_completion(transfer)
transfer_internal::notify_completion::<LinuxInterface>(urb as *mut c_void)
}
} else {
debug!("Submitted URB {urb:?} on ep {ep:x}");
}
}
pub(crate) unsafe fn cancel_transfer(&self, transfer: *mut TransferInner) {
pub(crate) unsafe fn cancel_urb(&self, urb: *mut Urb) {
unsafe {
let urb = transfer as *mut usbfs::Urb;
if let Err(e) = usbfs::discard_urb(&self.device.fd, urb) {
debug!("Failed to cancel URB {urb:?}: {e}");
}

View file

@ -81,7 +81,3 @@ pub fn probe_device(path: SysfsPath) -> Result<DeviceInfo, Error> {
path: path,
})
}
/// Returns the path of a device in usbfs
fn usb_devfs_path(busnum: u8, devnum: u8) -> PathBuf {
PathBuf::from(format!("/dev/bus/usb/{busnum:03}/{devnum:03}"))
}

View file

@ -1,8 +1,5 @@
use std::path::PathBuf;
mod transfer;
mod usbfs;
pub use transfer::Transfer;
mod enumeration;
mod events;

View file

@ -1,109 +1,77 @@
use std::{
cell::UnsafeCell,
future::Future,
ffi::c_void,
mem::{self, ManuallyDrop},
ptr::{null_mut, NonNull},
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
task::{Context, Poll},
ptr::null_mut,
};
use atomic_waker::AtomicWaker;
use rustix::io::Errno;
use crate::{transfer::EndpointType, Completion, TransferStatus};
use super::{
usbfs::{
Urb, USBDEVFS_URB_TYPE_BULK, USBDEVFS_URB_TYPE_CONTROL, USBDEVFS_URB_TYPE_INTERRUPT,
USBDEVFS_URB_TYPE_ISO,
},
Interface,
use crate::{
control::{ControlIn, ControlOut, SETUP_PACKET_SIZE},
transfer_internal, Completion, EndpointType, TransferStatus,
};
use super::usbfs::{
Urb, USBDEVFS_URB_TYPE_BULK, USBDEVFS_URB_TYPE_CONTROL, USBDEVFS_URB_TYPE_INTERRUPT,
USBDEVFS_URB_TYPE_ISO,
};
/// Linux-specific transfer state.
///
/// This logically contains a `Vec` with urb.buffer and capacity.
/// It also owns the `urb` allocation itself, which is stored out-of-line
/// to avoid violating noalias when submitting the transfer while holding
/// `&mut TransferData`.
#[repr(C)]
pub(crate) struct TransferInner {
urb: UnsafeCell<Urb>,
state: AtomicU8,
waker: AtomicWaker,
interface: Arc<Interface>,
pub(crate) struct TransferData {
urb: *mut Urb,
capacity: usize,
}
impl TransferInner {
/// Transfer ownership of `buf` into the transfer's `urb`.
/// SAFETY: requires that there is no concurrent access to `urb`
unsafe fn put_buffer(&self, buf: Vec<u8>) {
unsafe {
let mut buf = ManuallyDrop::new(buf);
let urb = &mut *self.urb.get();
urb.buffer = buf.as_mut_ptr();
assert!(buf.len() < i32::MAX as usize, "Buffer too large");
urb.actual_length = buf.len() as i32;
assert!(buf.capacity() < i32::MAX as usize, "Buffer too large");
urb.buffer_length = buf.capacity() as i32;
}
unsafe impl Send for TransferData {}
impl TransferData {
fn urb_mut(&mut self) -> &mut Urb {
// SAFETY: if we have `&mut`, the transfer is not pending
unsafe { &mut *self.urb }
}
/// Transfer ownership of transfer's `urb` buffer back to a `Vec`.
/// SAFETY: requires that the the buffer is present and there is no concurrent
/// access to `urb`. Invalidates the buffer.
unsafe fn take_buffer(&self) -> Vec<u8> {
unsafe {
let urb = &mut *self.urb.get();
Vec::from_raw_parts(
mem::replace(&mut urb.buffer, null_mut()),
urb.actual_length as usize,
urb.buffer_length as usize,
)
}
fn fill(&mut self, v: Vec<u8>, len: usize, user_data: *mut c_void) {
let mut v = ManuallyDrop::new(v);
let urb = self.urb_mut();
urb.buffer = v.as_mut_ptr();
urb.buffer_length = len.try_into().expect("buffer size should fit in i32");
urb.usercontext = user_data;
urb.actual_length = 0;
self.capacity = v.capacity();
}
/// Get the transfer status
/// SAFETY: requires that there is no concurrent access to `urb`
unsafe fn status(&self) -> TransferStatus {
let status = unsafe { (&*self.urb.get()).status };
/// SAFETY: requires that the transfer has completed and `length` bytes are initialized
unsafe fn take_buf(&mut self, length: usize) -> Vec<u8> {
let urb = self.urb_mut();
assert!(!urb.buffer.is_null());
let ptr = mem::replace(&mut urb.buffer, null_mut());
let capacity = mem::replace(&mut self.capacity, 0);
assert!(length <= capacity);
Vec::from_raw_parts(ptr, length, capacity)
}
}
if status == 0 {
return TransferStatus::Complete;
}
// It's sometimes positive, sometimes negative, but rustix panics if negative.
match Errno::from_raw_os_error(status.abs()) {
Errno::NODEV | Errno::SHUTDOWN => TransferStatus::Disconnected,
Errno::PIPE => TransferStatus::Stall,
Errno::NOENT | Errno::CONNRESET => TransferStatus::Cancelled,
Errno::PROTO | Errno::ILSEQ | Errno::OVERFLOW | Errno::COMM | Errno::TIME => {
TransferStatus::Fault
impl Drop for TransferData {
fn drop(&mut self) {
unsafe {
if !self.urb_mut().buffer.is_null() {
drop(Vec::from_raw_parts(self.urb_mut().buffer, 0, self.capacity));
}
_ => TransferStatus::UnknownError,
drop(Box::from_raw(self.urb));
}
}
}
pub struct Transfer {
ptr: NonNull<TransferInner>,
}
impl transfer_internal::Platform for super::Interface {
type TransferData = TransferData;
/// The transfer has not been submitted. The buffer is not valid.
const STATE_IDLE: u8 = 0;
/// The transfer has been submitted to the kernel and completion has not yet
/// been handled. The buffer points to valid memory but cannot be accessed by
/// userspace. There is a future or queue waiting for it completion.
const STATE_PENDING: u8 = 1;
/// Like PENDING, but there is no one waiting for completion. The completion
/// handler will drop the buffer and transfer.
const STATE_ABANDONED: u8 = 3;
/// The transfer completion has been handled. The buffer is valid and may
/// be accessed.
const STATE_COMPLETED: u8 = 3;
impl Transfer {
pub(crate) fn new(interface: Arc<Interface>, endpoint: u8, ep_type: EndpointType) -> Transfer {
fn make_transfer_data(&self, endpoint: u8, ep_type: crate::EndpointType) -> TransferData {
let ep_type = match ep_type {
EndpointType::Control => USBDEVFS_URB_TYPE_CONTROL,
EndpointType::Interrupt => USBDEVFS_URB_TYPE_INTERRUPT,
@ -111,8 +79,8 @@ impl Transfer {
EndpointType::Isochronous => USBDEVFS_URB_TYPE_ISO,
};
let b = Box::new(TransferInner {
urb: UnsafeCell::new(Urb {
TransferData {
urb: Box::into_raw(Box::new(Urb {
ep_type,
endpoint,
status: 0,
@ -125,118 +93,102 @@ impl Transfer {
error_count: 0,
signr: 0,
usercontext: null_mut(),
}),
state: AtomicU8::new(STATE_IDLE),
waker: AtomicWaker::new(),
interface,
});
Transfer {
ptr: Box::leak(b).into(),
})),
capacity: 0,
}
}
fn inner(&self) -> &TransferInner {
// Safety: while Transfer is alive, its TransferInner is alive
unsafe { self.ptr.as_ref() }
fn cancel(&self, data: &TransferData) {
unsafe {
self.cancel_urb(data.urb);
}
}
}
impl transfer_internal::PlatformSubmit<Vec<u8>> for super::Interface {
unsafe fn submit(&self, data: Vec<u8>, transfer: &mut TransferData, user_data: *mut c_void) {
let ep = transfer.urb_mut().endpoint;
let len = if ep & 0x80 == 0 {
data.len()
} else {
data.capacity()
};
transfer.fill(data, len, user_data);
// SAFETY: we just properly filled the buffer and it is not already pending
unsafe { self.submit_urb(transfer.urb) }
}
/// Prepare the transfer for submission by filling the buffer fields
/// and setting the state to PENDING. Returns a `*mut TransferInner`
/// that must later be passed to `complete`.
///
/// Panics if the transfer has already been submitted.
pub(crate) fn submit(&mut self, data: Vec<u8>) {
let inner = self.inner();
assert_eq!(
inner.state.load(Ordering::Acquire),
STATE_IDLE,
"Transfer should be idle when submitted"
unsafe fn take_completed(transfer: &mut TransferData) -> Completion<Vec<u8>> {
let status = urb_status(transfer.urb_mut());
let len = transfer.urb_mut().actual_length as usize;
// SAFETY: transfer is completed (precondition) and `actual_length` bytes were initialized.
let data = unsafe { transfer.take_buf(len) };
Completion { data, status }
}
}
impl transfer_internal::PlatformSubmit<ControlIn> for super::Interface {
unsafe fn submit(&self, data: ControlIn, transfer: &mut TransferData, user_data: *mut c_void) {
let buf_len = SETUP_PACKET_SIZE + data.length as usize;
let mut buf = Vec::with_capacity(buf_len);
buf.extend_from_slice(&data.setup_packet());
transfer.fill(buf, buf_len, user_data);
// SAFETY: we just properly filled the buffer and it is not already pending
unsafe { self.submit_urb(transfer.urb) }
}
unsafe fn take_completed(transfer: &mut TransferData) -> Completion<Vec<u8>> {
let status = urb_status(transfer.urb_mut());
let len = transfer.urb_mut().actual_length as usize;
// SAFETY: transfer is completed (precondition) and `actual_length`
// bytes were initialized with setup buf in front
let mut data = unsafe { transfer.take_buf(SETUP_PACKET_SIZE + len) };
data.splice(0..SETUP_PACKET_SIZE, []);
Completion { data, status }
}
}
impl transfer_internal::PlatformSubmit<ControlOut<'_>> for super::Interface {
unsafe fn submit(&self, data: ControlOut, transfer: &mut TransferData, user_data: *mut c_void) {
let buf_len = SETUP_PACKET_SIZE + data.data.len();
let mut buf = Vec::with_capacity(buf_len);
buf.extend_from_slice(
&data
.setup_packet()
.expect("data length should fit in setup packet's u16"),
);
unsafe {
// SAFETY: invariants guaranteed by being in state IDLE
inner.put_buffer(data);
}
inner.state.store(STATE_PENDING, Ordering::Release);
unsafe {
inner.interface.submit_transfer(self.ptr.as_ptr());
}
buf.extend_from_slice(data.data);
transfer.fill(buf, buf_len, user_data);
// SAFETY: we just properly filled the buffer and it is not already pending
unsafe { self.submit_urb(transfer.urb) }
}
pub(crate) fn cancel(&mut self) {
let inner = self.inner();
unsafe {
inner.interface.cancel_transfer(self.ptr.as_ptr());
}
}
pub fn poll_completion(&self, cx: &Context) -> Poll<Completion> {
let inner = self.inner();
inner.waker.register(cx.waker());
match inner.state.load(Ordering::Acquire) {
STATE_PENDING => Poll::Pending,
STATE_COMPLETED => {
// SAFETY: state means we have exclusive access
// and the buffer is valid.
inner.state.store(STATE_IDLE, Ordering::Relaxed);
unsafe {
let data = inner.take_buffer();
let status = inner.status();
Poll::Ready(Completion { data, status })
}
}
s => panic!("Polling transfer in unexpected state {s}"),
}
}
pub(crate) unsafe fn notify_completion(transfer: *mut TransferInner) {
unsafe {
let waker = (*transfer).waker.take();
match (*transfer).state.swap(STATE_COMPLETED, Ordering::Release) {
STATE_PENDING => {
if let Some(waker) = waker {
waker.wake()
}
}
STATE_ABANDONED => {
let b = Box::from_raw(transfer);
drop(b.take_buffer());
drop(b);
}
s => panic!("Completing transfer in unexpected state {s}"),
}
}
unsafe fn take_completed(transfer: &mut TransferData) -> Completion<usize> {
let status = urb_status(transfer.urb_mut());
let len = transfer.urb_mut().actual_length as usize;
drop(transfer.take_buf(0));
Completion { data: len, status }
}
}
impl Drop for Transfer {
fn drop(&mut self) {
match self.inner().state.swap(STATE_ABANDONED, Ordering::Acquire) {
STATE_PENDING => {
self.cancel();
/* handler responsible for dropping */
}
STATE_IDLE => {
// SAFETY: state means there is no concurrent access
unsafe { drop(Box::from_raw(self.ptr.as_ptr())) }
}
STATE_COMPLETED => {
// SAFETY: state means buffer is valid and there is no concurrent access
unsafe {
let b = Box::from_raw(self.ptr.as_ptr());
drop(b.take_buffer());
drop(b);
}
}
s => panic!("Dropping transfer in unexpected state {s}"),
fn urb_status(urb: &Urb) -> TransferStatus {
if urb.status == 0 {
return TransferStatus::Complete;
}
// It's sometimes positive, sometimes negative, but rustix panics if negative.
match Errno::from_raw_os_error(urb.status.abs()) {
Errno::NODEV | Errno::SHUTDOWN => TransferStatus::Disconnected,
Errno::PIPE => TransferStatus::Stall,
Errno::NOENT | Errno::CONNRESET => TransferStatus::Cancelled,
Errno::PROTO | Errno::ILSEQ | Errno::OVERFLOW | Errno::COMM | Errno::TIME => {
TransferStatus::Fault
}
}
}
impl Future for Transfer {
type Output = Completion;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().poll_completion(cx)
_ => TransferStatus::UnknownError,
}
}

View file

@ -1,13 +1,13 @@
//! Wrappers for the [usbfs] character device ioctls, translated from the
//! [C structures and ioctl definitions][uapi].
//! [usbfs]: https://www.kernel.org/doc/html/latest/driver-api/usb/usb.html#the-usb-character-device-nodes
//! [uapi]: https://github.com/torvalds/linux/blob/master/tools/include/uapi/linux/usbdevice_fs.h
use std::{
ffi::{c_int, c_uchar, c_uint, c_void},
marker::PhantomData,
};
/// Wrappers for the [usbfs] character device ioctls, translated from the
/// [C structures and ioctl definitions][uapi].
///
/// [usbfs]: https://www.kernel.org/doc/html/latest/driver-api/usb/usb.html#the-usb-character-device-nodes
/// [uapi]: https://github.com/torvalds/linux/blob/master/tools/include/uapi/linux/usbdevice_fs.h
use rustix::{
fd::AsFd,
io,

View file

@ -1,11 +1,19 @@
pub use crate::platform::Transfer;
use crate::{
platform,
transfer_internal::{PlatformSubmit, TransferHandle, TransferRequest},
};
use std::{
future::Future,
marker::PhantomData,
task::{Context, Poll},
};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum EndpointType {
Control,
Interrupt,
Bulk,
Isochronous,
Control = 0,
Isochronous = 1,
Bulk = 2,
Interrupt = 3,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -19,7 +27,33 @@ pub enum TransferStatus {
}
#[derive(Debug, Clone)]
pub struct Completion {
pub data: Vec<u8>,
pub struct Completion<T> {
pub data: T,
pub status: TransferStatus,
}
pub struct TransferFuture<D: TransferRequest> {
transfer: TransferHandle<platform::Interface>,
ty: PhantomData<D::Response>,
}
impl<D: TransferRequest> TransferFuture<D> {
pub(crate) fn new(transfer: TransferHandle<platform::Interface>) -> TransferFuture<D> {
TransferFuture {
transfer,
ty: PhantomData,
}
}
}
impl<D: TransferRequest> Future for TransferFuture<D>
where
platform::Interface: PlatformSubmit<D>,
D::Response: Unpin,
{
type Output = Completion<D::Response>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().transfer.poll_completion::<D>(cx)
}
}

227
src/transfer_internal.rs Normal file
View file

@ -0,0 +1,227 @@
use std::{
cell::UnsafeCell,
ffi::c_void,
ptr::NonNull,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
task::{Context, Poll},
};
use atomic_waker::AtomicWaker;
use crate::{
control::{ControlIn, ControlOut},
Completion, EndpointType,
};
pub(crate) trait Platform {
/// Platform-specific per-transfer data.
type TransferData: Send;
/// Get a `TransferData`.
fn make_transfer_data(&self, endpoint: u8, ep_type: EndpointType) -> Self::TransferData;
/// Request cancellation of a transfer that may or may not currently be
/// pending.
fn cancel(&self, transfer: &Self::TransferData);
}
pub trait TransferRequest {
type Response;
}
pub(crate) trait PlatformSubmit<D: TransferRequest>: Platform {
/// Fill the transfer with the data from `data` and submit it to the kernel.
/// Arrange for `notify_completion(transfer)` to be called once the transfer
/// has completed.
///
/// SAFETY(caller): transfer is in an idle state
unsafe fn submit(&self, data: D, platform: &mut Self::TransferData, transfer: *mut c_void);
/// SAFETY(caller): `transfer` is in a completed state
unsafe fn take_completed(transfer: &mut Self::TransferData) -> Completion<D::Response>;
}
impl TransferRequest for Vec<u8> {
type Response = Vec<u8>;
}
impl TransferRequest for ControlIn {
type Response = Vec<u8>;
}
impl TransferRequest for ControlOut<'_> {
type Response = usize;
}
struct TransferInner<P: Platform> {
/// Platform-specific data.
///
/// In an `UnsafeCell` because we provide `&mut` when the
/// state gurantees us exclusive access
platform_data: UnsafeCell<P::TransferData>,
/// One of the `STATE_*` constants below, used to synchronize
/// the state.
state: AtomicU8,
/// Waker that is notified when transfer completes.
waker: AtomicWaker,
/// Platform
interface: Arc<P>,
}
/// Handle to a transfer.
///
/// Cancels the transfer and arranges for memory to be freed
/// when dropped.
pub(crate) struct TransferHandle<P: Platform> {
ptr: NonNull<TransferInner<P>>,
}
unsafe impl<P: Platform> Send for TransferHandle<P> {}
unsafe impl<P: Platform> Sync for TransferHandle<P> {}
/// The transfer has not been submitted. The buffer is not valid.
const STATE_IDLE: u8 = 0;
/// The transfer has been or is about to be submitted to the kernel and
/// completion has not yet been handled. The buffer points to valid memory but
/// cannot necessarily be accessed by userspace. There is a future or queue
/// waiting for it completion.
const STATE_PENDING: u8 = 1;
/// Like PENDING, but there is no one waiting for completion. The completion
/// handler will drop the buffer and transfer.
const STATE_ABANDONED: u8 = 3;
/// The transfer completion has been handled on the event loop thread. The
/// buffer is valid and may be accessed by the `TransferHandle`.
const STATE_COMPLETED: u8 = 3;
impl<P: Platform> TransferHandle<P> {
/// Create a new transfer and get a handle.
pub(crate) fn new(interface: Arc<P>, endpoint: u8, ep_type: EndpointType) -> TransferHandle<P> {
let b = Box::new(TransferInner {
platform_data: UnsafeCell::new(interface.make_transfer_data(endpoint, ep_type)),
state: AtomicU8::new(STATE_IDLE),
waker: AtomicWaker::new(),
interface,
});
TransferHandle {
ptr: Box::leak(b).into(),
}
}
fn inner(&self) -> &TransferInner<P> {
// SAFETY: while `TransferHandle` is alive, its `TransferInner` is alive
// (it may be shared by `notify_completion` on the event thread, so can't be &mut)
unsafe { self.ptr.as_ref() }
}
fn platform_data(&self) -> &P::TransferData {
// SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data`
// is via this `TransferHandle`.
unsafe { &*self.inner().platform_data.get() }
}
pub(crate) fn submit<D>(&mut self, data: D)
where
D: TransferRequest,
P: PlatformSubmit<D>,
{
let inner = self.inner();
// It's the syscall that submits the transfer that actually performs the
// release ordering.
let prev = self.inner().state.swap(STATE_PENDING, Ordering::Relaxed);
assert_eq!(prev, STATE_IDLE, "Transfer should be idle when submitted");
// SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data`
// is via this `TransferHandle`. Verified that it is idle.
unsafe {
inner.interface.submit(
data,
&mut *inner.platform_data.get(),
self.ptr.as_ptr() as *mut c_void,
);
}
}
pub(crate) fn cancel(&mut self) {
self.inner().interface.cancel(self.platform_data());
}
fn poll_completion_generic(&mut self, cx: &Context) -> Poll<&mut P::TransferData> {
let inner = self.inner();
inner.waker.register(cx.waker());
match inner.state.load(Ordering::Acquire) {
STATE_PENDING => Poll::Pending,
STATE_COMPLETED => {
// Relaxed because this doesn't synchronize with anything,
// just marks that we no longer need to drop the buffer
inner.state.store(STATE_IDLE, Ordering::Relaxed);
// SAFETY: while `TransferHandle` is alive, the only mutable access to `platform_data`
// is via this `TransferHandle`.
Poll::Ready(unsafe { &mut *inner.platform_data.get() })
}
s => panic!("Polling transfer in unexpected state {s}"),
}
}
pub fn poll_completion<D: TransferRequest>(
&mut self,
cx: &Context,
) -> Poll<Completion<D::Response>>
where
D: TransferRequest,
P: PlatformSubmit<D>,
{
// SAFETY: `poll_completion_generic` checks that it is completed
self.poll_completion_generic(cx)
.map(|u| unsafe { P::take_completed(u) })
}
}
impl<P: Platform> Drop for TransferHandle<P> {
fn drop(&mut self) {
match self.inner().state.swap(STATE_ABANDONED, Ordering::Acquire) {
STATE_PENDING => {
self.cancel();
/* handler responsible for dropping */
}
STATE_IDLE | STATE_COMPLETED => {
// SAFETY: state means there is no concurrent access
unsafe { drop(Box::from_raw(self.ptr.as_ptr())) }
}
s => panic!("Dropping transfer in unexpected state {s}"),
}
}
}
/// Notify that a transfer has completed.
///
/// SAFETY: `transfer` must be a pointer previously passed to `submit`, and
/// the caller / kernel must no longer dereference it or its buffer.
pub(crate) unsafe fn notify_completion<P: Platform>(transfer: *mut c_void) {
unsafe {
let transfer = transfer as *mut TransferInner<P>;
let waker = (*transfer).waker.take();
match (*transfer).state.swap(STATE_COMPLETED, Ordering::Release) {
STATE_PENDING => {
if let Some(waker) = waker {
waker.wake()
}
}
STATE_ABANDONED => {
drop(Box::from_raw(transfer));
}
s => panic!("Completing transfer in unexpected state {s}"),
}
}
}