Merge pull request #153 from kevinmehall/io-traits
Tweaks for EndpointRead / EndpointWrite
This commit is contained in:
commit
ccb40bd01d
6 changed files with 319 additions and 44 deletions
10
Cargo.toml
10
Cargo.toml
|
|
@ -19,6 +19,7 @@ slab = "0.4.9"
|
|||
[dev-dependencies]
|
||||
env_logger = "0.11"
|
||||
futures-lite = "2.0"
|
||||
tokio = { version = "1", features = ["rt", "macros", "io-util", "rt-multi-thread"] }
|
||||
|
||||
[target.'cfg(any(target_os="linux", target_os="android"))'.dependencies]
|
||||
rustix = { version = "1.0.1", features = ["fs", "event", "net", "time", "mm"] }
|
||||
|
|
@ -49,3 +50,12 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(fuzzing)'] }
|
|||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
|
||||
[[example]]
|
||||
name = "bulk_io_smol"
|
||||
required-features = ["smol"]
|
||||
|
||||
[[example]]
|
||||
name = "bulk_io_tokio"
|
||||
required-features = ["tokio"]
|
||||
|
||||
|
|
|
|||
|
|
@ -31,7 +31,8 @@ fn main() {
|
|||
let mut reader = main_interface
|
||||
.endpoint::<Bulk, In>(0x83)
|
||||
.unwrap()
|
||||
.reader(128);
|
||||
.reader(128)
|
||||
.with_num_transfers(8);
|
||||
|
||||
writer.write_all(&[1; 16]).unwrap();
|
||||
writer.write_all(&[2; 256]).unwrap();
|
||||
|
|
@ -47,6 +48,21 @@ fn main() {
|
|||
|
||||
dbg!(reader.fill_buf().unwrap().len());
|
||||
|
||||
let mut buf = [0; 1000];
|
||||
for len in 0..1000 {
|
||||
reader.read_exact(&mut buf[..len]).unwrap();
|
||||
writer.write_all(&buf[..len]).unwrap();
|
||||
}
|
||||
|
||||
reader.cancel_all();
|
||||
loop {
|
||||
let n = reader.read(&mut buf).unwrap();
|
||||
dbg!(n);
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let echo_interface = device.claim_interface(1).wait().unwrap();
|
||||
echo_interface.set_alt_setting(1).wait().unwrap();
|
||||
|
||||
|
|
|
|||
99
examples/bulk_io_smol.rs
Normal file
99
examples/bulk_io_smol.rs
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use nusb::{
|
||||
transfer::{Bulk, In, Out},
|
||||
MaybeFuture,
|
||||
};
|
||||
|
||||
use futures_lite::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
fn main() {
|
||||
env_logger::init();
|
||||
let di = nusb::list_devices()
|
||||
.wait()
|
||||
.unwrap()
|
||||
.find(|d| d.vendor_id() == 0x59e3 && d.product_id() == 0x00aa)
|
||||
.expect("device should be connected");
|
||||
|
||||
println!("Device info: {di:?}");
|
||||
|
||||
futures_lite::future::block_on(async {
|
||||
let device = di.open().await.unwrap();
|
||||
|
||||
let main_interface = device.claim_interface(0).await.unwrap();
|
||||
|
||||
let mut writer = main_interface
|
||||
.endpoint::<Bulk, Out>(0x03)
|
||||
.unwrap()
|
||||
.writer(128)
|
||||
.with_num_transfers(8);
|
||||
|
||||
let mut reader = main_interface
|
||||
.endpoint::<Bulk, In>(0x83)
|
||||
.unwrap()
|
||||
.reader(128)
|
||||
.with_num_transfers(8);
|
||||
|
||||
writer.write_all(&[1; 16]).await.unwrap();
|
||||
writer.write_all(&[2; 256]).await.unwrap();
|
||||
writer.flush().await.unwrap();
|
||||
writer.write_all(&[3; 64]).await.unwrap();
|
||||
writer.flush_end_async().await.unwrap();
|
||||
|
||||
let mut buf = [0; 16];
|
||||
reader.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
let mut buf = [0; 64];
|
||||
reader.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
dbg!(reader.fill_buf().await.unwrap().len());
|
||||
|
||||
let mut buf = [0; 1000];
|
||||
for len in 0..1000 {
|
||||
reader.read_exact(&mut buf[..len]).await.unwrap();
|
||||
writer.write_all(&buf[..len]).await.unwrap();
|
||||
}
|
||||
|
||||
reader.cancel_all();
|
||||
loop {
|
||||
let n = reader.read(&mut buf).await.unwrap();
|
||||
dbg!(n);
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let echo_interface = device.claim_interface(1).await.unwrap();
|
||||
echo_interface.set_alt_setting(1).await.unwrap();
|
||||
|
||||
let mut writer = echo_interface
|
||||
.endpoint::<Bulk, Out>(0x01)
|
||||
.unwrap()
|
||||
.writer(64)
|
||||
.with_num_transfers(1);
|
||||
let mut reader = echo_interface
|
||||
.endpoint::<Bulk, In>(0x81)
|
||||
.unwrap()
|
||||
.reader(64)
|
||||
.with_num_transfers(8)
|
||||
.with_read_timeout(Duration::from_millis(100));
|
||||
|
||||
let mut pkt_reader = reader.until_short_packet();
|
||||
|
||||
writer.write_all(&[1; 16]).await.unwrap();
|
||||
writer.flush_end_async().await.unwrap();
|
||||
|
||||
writer.write_all(&[2; 128]).await.unwrap();
|
||||
writer.flush_end_async().await.unwrap();
|
||||
|
||||
let mut v = Vec::new();
|
||||
pkt_reader.read_to_end(&mut v).await.unwrap();
|
||||
assert_eq!(&v[..], &[1; 16]);
|
||||
pkt_reader.consume_end().unwrap();
|
||||
|
||||
let mut v = Vec::new();
|
||||
pkt_reader.read_to_end(&mut v).await.unwrap();
|
||||
assert_eq!(&v[..], &[2; 128]);
|
||||
pkt_reader.consume_end().unwrap();
|
||||
})
|
||||
}
|
||||
95
examples/bulk_io_tokio.rs
Normal file
95
examples/bulk_io_tokio.rs
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use nusb::transfer::{Bulk, In, Out};
|
||||
|
||||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::init();
|
||||
let di = nusb::list_devices()
|
||||
.await
|
||||
.unwrap()
|
||||
.find(|d| d.vendor_id() == 0x59e3 && d.product_id() == 0x00aa)
|
||||
.expect("device should be connected");
|
||||
|
||||
println!("Device info: {di:?}");
|
||||
|
||||
let device = di.open().await.unwrap();
|
||||
|
||||
let main_interface = device.claim_interface(0).await.unwrap();
|
||||
|
||||
let mut writer = main_interface
|
||||
.endpoint::<Bulk, Out>(0x03)
|
||||
.unwrap()
|
||||
.writer(128)
|
||||
.with_num_transfers(8);
|
||||
|
||||
let mut reader = main_interface
|
||||
.endpoint::<Bulk, In>(0x83)
|
||||
.unwrap()
|
||||
.reader(128)
|
||||
.with_num_transfers(8);
|
||||
|
||||
writer.write_all(&[1; 16]).await.unwrap();
|
||||
writer.write_all(&[2; 256]).await.unwrap();
|
||||
writer.flush().await.unwrap();
|
||||
writer.write_all(&[3; 64]).await.unwrap();
|
||||
writer.flush_end_async().await.unwrap();
|
||||
|
||||
let mut buf = [0; 16];
|
||||
reader.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
let mut buf = [0; 64];
|
||||
reader.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
dbg!(reader.fill_buf().await.unwrap().len());
|
||||
|
||||
let mut buf = [0; 1000];
|
||||
for len in 0..1000 {
|
||||
reader.read_exact(&mut buf[..len]).await.unwrap();
|
||||
writer.write_all(&buf[..len]).await.unwrap();
|
||||
}
|
||||
|
||||
reader.cancel_all();
|
||||
loop {
|
||||
let n = reader.read(&mut buf).await.unwrap();
|
||||
dbg!(n);
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let echo_interface = device.claim_interface(1).await.unwrap();
|
||||
echo_interface.set_alt_setting(1).await.unwrap();
|
||||
|
||||
let mut writer = echo_interface
|
||||
.endpoint::<Bulk, Out>(0x01)
|
||||
.unwrap()
|
||||
.writer(64)
|
||||
.with_num_transfers(1);
|
||||
let mut reader = echo_interface
|
||||
.endpoint::<Bulk, In>(0x81)
|
||||
.unwrap()
|
||||
.reader(64)
|
||||
.with_num_transfers(8)
|
||||
.with_read_timeout(Duration::from_millis(100));
|
||||
|
||||
let mut pkt_reader = reader.until_short_packet();
|
||||
|
||||
writer.write_all(&[1; 16]).await.unwrap();
|
||||
writer.flush_end_async().await.unwrap();
|
||||
|
||||
writer.write_all(&[2; 128]).await.unwrap();
|
||||
writer.flush_end_async().await.unwrap();
|
||||
|
||||
let mut v = Vec::new();
|
||||
pkt_reader.read_to_end(&mut v).await.unwrap();
|
||||
assert_eq!(&v[..], &[1; 16]);
|
||||
pkt_reader.consume_end().unwrap();
|
||||
|
||||
let mut v = Vec::new();
|
||||
pkt_reader.read_to_end(&mut v).await.unwrap();
|
||||
assert_eq!(&v[..], &[2; 128]);
|
||||
pkt_reader.consume_end().unwrap();
|
||||
}
|
||||
138
src/io/read.rs
138
src/io/read.rs
|
|
@ -1,4 +1,5 @@
|
|||
use std::{
|
||||
error::Error,
|
||||
io::{BufRead, Read},
|
||||
time::Duration,
|
||||
};
|
||||
|
|
@ -88,33 +89,41 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
|
|||
Self {
|
||||
endpoint,
|
||||
reading: None,
|
||||
num_transfers: 0,
|
||||
num_transfers: 1,
|
||||
transfer_size,
|
||||
read_timeout: Duration::MAX,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the number of transfers to maintain pending at all times.
|
||||
/// Set the number of concurrent transfers.
|
||||
///
|
||||
/// A value of 0 means that transfers will only be submitted when calling
|
||||
/// `read()` or `fill_buf()` and the buffer is empty. To maximize throughput,
|
||||
/// a value of 2 or more is recommended for applications that stream data
|
||||
/// continuously.
|
||||
/// A value of 1 (default) means that transfers will only be submitted when
|
||||
/// calling `read()` or `fill_buf()` and the buffer is empty. To maximize
|
||||
/// throughput, a value of 2 or more is recommended for applications that
|
||||
/// stream data continuously so that the host controller can continue to
|
||||
/// receive data while the application processes the data from a completed
|
||||
/// transfer.
|
||||
///
|
||||
/// A value of 0 means no further transfers will be submitted. Existing
|
||||
/// transfers will complete normally, and subsequent calls to `read()` and
|
||||
/// `fill_buf()` will return zero bytes (EOF).
|
||||
///
|
||||
/// This submits more transfers when increasing the number, but does not
|
||||
/// cancel transfers when decreasing it.
|
||||
/// [cancel transfers](Self::cancel_all) when decreasing it.
|
||||
pub fn set_num_transfers(&mut self, num_transfers: usize) {
|
||||
self.num_transfers = num_transfers;
|
||||
|
||||
while self.endpoint.pending() < num_transfers {
|
||||
// Leave the last transfer to be submitted by `read` such that
|
||||
// a value of `1` only has transfers pending within `read` calls.
|
||||
while self.endpoint.pending() < num_transfers.saturating_sub(1) {
|
||||
let buf = self.endpoint.allocate(self.transfer_size);
|
||||
self.endpoint.submit(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the number of transfers to maintain pending at all times.
|
||||
/// Set the number of concurrent transfers.
|
||||
///
|
||||
/// See [Self::set_num_transfers] -- this is for method chaining with `EndpointRead::new()`.
|
||||
/// See [Self::set_num_transfers] (this version is for method chaining).
|
||||
pub fn with_num_transfers(mut self, num_transfers: usize) -> Self {
|
||||
self.set_num_transfers(num_transfers);
|
||||
self
|
||||
|
|
@ -141,8 +150,14 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
|
|||
|
||||
/// Cancel all pending transfers.
|
||||
///
|
||||
/// They will be re-submitted on the next read.
|
||||
/// This sets [`num_transfers`](Self::set_num_transfers) to 0, so no further
|
||||
/// transfers will be submitted. Any data buffered before the transfers are cancelled
|
||||
/// can be read, and then the read methods will return 0 bytes (EOF).
|
||||
///
|
||||
/// Call [`num_transfers`](Self::set_num_transfers) with a non-zero value
|
||||
/// to resume receiving data.
|
||||
pub fn cancel_all(&mut self) {
|
||||
self.num_transfers = 0;
|
||||
self.endpoint.cancel_all();
|
||||
}
|
||||
|
||||
|
|
@ -183,15 +198,19 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
|
|||
}
|
||||
}
|
||||
|
||||
fn start_read(&mut self) {
|
||||
let t = usize::max(1, self.num_transfers);
|
||||
if self.endpoint.pending() < t {
|
||||
fn start_read(&mut self) -> bool {
|
||||
if self.endpoint.pending() < self.num_transfers {
|
||||
// Re-use the last completed buffer if available
|
||||
self.resubmit();
|
||||
while self.endpoint.pending() < t {
|
||||
while self.endpoint.pending() < self.num_transfers {
|
||||
// Allocate more buffers for any remaining transfers
|
||||
let buf = self.endpoint.allocate(self.transfer_size);
|
||||
self.endpoint.submit(buf);
|
||||
}
|
||||
}
|
||||
|
||||
// If num_transfers is 0 and all transfers are complete
|
||||
self.endpoint.pending() > 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
@ -208,38 +227,46 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
|
|||
}
|
||||
}
|
||||
|
||||
fn wait(&mut self) -> Result<(), std::io::Error> {
|
||||
self.start_read();
|
||||
let c = self.endpoint.wait_next_complete(self.read_timeout);
|
||||
let c = c.ok_or(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"timeout waiting for read",
|
||||
))?;
|
||||
self.reading = Some(ReadBuffer {
|
||||
pos: 0,
|
||||
buf: c.buffer,
|
||||
status: c.status,
|
||||
});
|
||||
Ok(())
|
||||
fn wait(&mut self) -> Result<bool, std::io::Error> {
|
||||
if self.start_read() {
|
||||
let c = self.endpoint.wait_next_complete(self.read_timeout);
|
||||
let c = c.ok_or(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"timeout waiting for read",
|
||||
))?;
|
||||
self.reading = Some(ReadBuffer {
|
||||
pos: 0,
|
||||
buf: c.buffer,
|
||||
status: c.status,
|
||||
});
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "tokio", feature = "smol"))]
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.start_read();
|
||||
let c = ready!(self.endpoint.poll_next_complete(cx));
|
||||
self.reading = Some(ReadBuffer {
|
||||
pos: 0,
|
||||
buf: c.buffer,
|
||||
status: c.status,
|
||||
});
|
||||
Poll::Ready(())
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
|
||||
if self.start_read() {
|
||||
let c = ready!(self.endpoint.poll_next_complete(cx));
|
||||
self.reading = Some(ReadBuffer {
|
||||
pos: 0,
|
||||
buf: c.buffer,
|
||||
status: c.status,
|
||||
});
|
||||
Poll::Ready(true)
|
||||
} else {
|
||||
Poll::Ready(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "tokio", feature = "smol"))]
|
||||
#[inline]
|
||||
fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<Result<&[u8], std::io::Error>> {
|
||||
while !self.has_data() {
|
||||
ready!(self.poll(cx));
|
||||
if !ready!(self.poll(cx)) {
|
||||
return Poll::Ready(Ok(&[]));
|
||||
}
|
||||
}
|
||||
Poll::Ready(self.remaining())
|
||||
}
|
||||
|
|
@ -251,7 +278,12 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
|
|||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<&[u8], std::io::Error>> {
|
||||
while !self.has_data_or_short_end() {
|
||||
ready!(self.poll(cx));
|
||||
if !ready!(self.poll(cx)) {
|
||||
return Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"ended without short packet",
|
||||
)));
|
||||
}
|
||||
}
|
||||
Poll::Ready(self.remaining())
|
||||
}
|
||||
|
|
@ -271,7 +303,9 @@ impl<EpType: BulkOrInterrupt> BufRead for EndpointRead<EpType> {
|
|||
#[inline]
|
||||
fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> {
|
||||
while !self.has_data() {
|
||||
self.wait()?;
|
||||
if !self.wait()? {
|
||||
return Ok(&[]);
|
||||
}
|
||||
}
|
||||
self.remaining()
|
||||
}
|
||||
|
|
@ -351,6 +385,19 @@ pub struct EndpointReadUntilShortPacket<'a, EpType: BulkOrInterrupt> {
|
|||
reader: &'a mut EndpointRead<EpType>,
|
||||
}
|
||||
|
||||
/// Error returned by [`EndpointReadUntilShortPacket::consume_end()`]
|
||||
/// when the reader is not at the end of a short packet.
|
||||
#[derive(Debug)]
|
||||
pub struct ExpectedShortPacket;
|
||||
|
||||
impl std::fmt::Display for ExpectedShortPacket {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "expected short packet")
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for ExpectedShortPacket {}
|
||||
|
||||
impl<EpType: BulkOrInterrupt> EndpointReadUntilShortPacket<'_, EpType> {
|
||||
/// Check if the underlying endpoint has reached the end of a short packet.
|
||||
///
|
||||
|
|
@ -369,12 +416,12 @@ impl<EpType: BulkOrInterrupt> EndpointReadUntilShortPacket<'_, EpType> {
|
|||
/// to read the next message.
|
||||
///
|
||||
/// Returns an error and does nothing if the reader [is not at the end of a short packet](Self::is_end).
|
||||
pub fn consume_end(&mut self) -> Result<(), ()> {
|
||||
pub fn consume_end(&mut self) -> Result<(), ExpectedShortPacket> {
|
||||
if self.is_end() {
|
||||
self.reader.reading.as_mut().unwrap().clear_short_packet();
|
||||
Ok(())
|
||||
} else {
|
||||
Err(())
|
||||
Err(ExpectedShortPacket)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -393,7 +440,12 @@ impl<EpType: BulkOrInterrupt> BufRead for EndpointReadUntilShortPacket<'_, EpTyp
|
|||
#[inline]
|
||||
fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> {
|
||||
while !self.reader.has_data_or_short_end() {
|
||||
self.reader.wait()?;
|
||||
if !self.reader.wait()? {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"ended without short packet",
|
||||
));
|
||||
}
|
||||
}
|
||||
self.reader.remaining()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,7 +49,10 @@ impl<EpType: BulkOrInterrupt> EndpointWrite<EpType> {
|
|||
/// If more than `num_transfers` transfers are pending, calls to `write`
|
||||
/// will block or async methods will return `Pending` until a transfer
|
||||
/// completes.
|
||||
///
|
||||
/// Panics if `num_transfers` is zero.
|
||||
pub fn set_num_transfers(&mut self, num_transfers: usize) {
|
||||
assert!(num_transfers > 0, "num_transfers must be greater than zero");
|
||||
self.num_transfers = num_transfers;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue