EndpointRead: improve semantics for num_transfers

This commit is contained in:
Kevin Mehall 2025-07-14 08:31:41 -06:00
parent 0fd3db6089
commit 89d55ab6a2

View file

@ -88,33 +88,41 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
Self { Self {
endpoint, endpoint,
reading: None, reading: None,
num_transfers: 0, num_transfers: 1,
transfer_size, transfer_size,
read_timeout: Duration::MAX, read_timeout: Duration::MAX,
} }
} }
/// Set the number of transfers to maintain pending at all times. /// Set the number of concurrent transfers.
/// ///
/// A value of 0 means that transfers will only be submitted when calling /// A value of 1 (default) means that transfers will only be submitted when
/// `read()` or `fill_buf()` and the buffer is empty. To maximize throughput, /// calling `read()` or `fill_buf()` and the buffer is empty. To maximize
/// a value of 2 or more is recommended for applications that stream data /// throughput, a value of 2 or more is recommended for applications that
/// continuously. /// stream data continuously so that the host controller can continue to
/// receive data while the application processes the data from a completed
/// transfer.
///
/// A value of 0 means no further transfers will be submitted. Existing
/// transfers will complete normally, and subsequent calls to `read()` and
/// `fill_buf()` will return zero bytes (EOF).
/// ///
/// This submits more transfers when increasing the number, but does not /// This submits more transfers when increasing the number, but does not
/// cancel transfers when decreasing it. /// [cancel transfers](Self::cancel_all) when decreasing it.
pub fn set_num_transfers(&mut self, num_transfers: usize) { pub fn set_num_transfers(&mut self, num_transfers: usize) {
self.num_transfers = num_transfers; self.num_transfers = num_transfers;
while self.endpoint.pending() < num_transfers { // Leave the last transfer to be submitted by `read` such that
// a value of `1` only has transfers pending within `read` calls.
while self.endpoint.pending() < num_transfers.saturating_sub(1) {
let buf = self.endpoint.allocate(self.transfer_size); let buf = self.endpoint.allocate(self.transfer_size);
self.endpoint.submit(buf); self.endpoint.submit(buf);
} }
} }
/// Set the number of transfers to maintain pending at all times. /// Set the number of concurrent transfers.
/// ///
/// See [Self::set_num_transfers] -- this is for method chaining with `EndpointRead::new()`. /// See [Self::set_num_transfers] (this version is for method chaining).
pub fn with_num_transfers(mut self, num_transfers: usize) -> Self { pub fn with_num_transfers(mut self, num_transfers: usize) -> Self {
self.set_num_transfers(num_transfers); self.set_num_transfers(num_transfers);
self self
@ -141,8 +149,14 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
/// Cancel all pending transfers. /// Cancel all pending transfers.
/// ///
/// They will be re-submitted on the next read. /// This sets [`num_transfers`](Self::set_num_transfers) to 0, so no further
/// transfers will be submitted. Any data buffered before the transfers are cancelled
/// can be read, and then the read methods will return 0 bytes (EOF).
///
/// Call [`num_transfers`](Self::set_num_transfers) with a non-zero value
/// to resume receiving data.
pub fn cancel_all(&mut self) { pub fn cancel_all(&mut self) {
self.num_transfers = 0;
self.endpoint.cancel_all(); self.endpoint.cancel_all();
} }
@ -183,15 +197,19 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
} }
} }
fn start_read(&mut self) { fn start_read(&mut self) -> bool {
let t = usize::max(1, self.num_transfers); if self.endpoint.pending() < self.num_transfers {
if self.endpoint.pending() < t { // Re-use the last completed buffer if available
self.resubmit(); self.resubmit();
while self.endpoint.pending() < t { while self.endpoint.pending() < self.num_transfers {
// Allocate more buffers for any remaining transfers
let buf = self.endpoint.allocate(self.transfer_size); let buf = self.endpoint.allocate(self.transfer_size);
self.endpoint.submit(buf); self.endpoint.submit(buf);
} }
} }
// If num_transfers is 0 and all transfers are complete
self.endpoint.pending() > 0
} }
#[inline] #[inline]
@ -208,38 +226,46 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
} }
} }
fn wait(&mut self) -> Result<(), std::io::Error> { fn wait(&mut self) -> Result<bool, std::io::Error> {
self.start_read(); if self.start_read() {
let c = self.endpoint.wait_next_complete(self.read_timeout); let c = self.endpoint.wait_next_complete(self.read_timeout);
let c = c.ok_or(std::io::Error::new( let c = c.ok_or(std::io::Error::new(
std::io::ErrorKind::TimedOut, std::io::ErrorKind::TimedOut,
"timeout waiting for read", "timeout waiting for read",
))?; ))?;
self.reading = Some(ReadBuffer { self.reading = Some(ReadBuffer {
pos: 0, pos: 0,
buf: c.buffer, buf: c.buffer,
status: c.status, status: c.status,
}); });
Ok(()) Ok(true)
} else {
Ok(false)
}
} }
#[cfg(any(feature = "tokio", feature = "smol"))] #[cfg(any(feature = "tokio", feature = "smol"))]
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { fn poll(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
self.start_read(); if self.start_read() {
let c = ready!(self.endpoint.poll_next_complete(cx)); let c = ready!(self.endpoint.poll_next_complete(cx));
self.reading = Some(ReadBuffer { self.reading = Some(ReadBuffer {
pos: 0, pos: 0,
buf: c.buffer, buf: c.buffer,
status: c.status, status: c.status,
}); });
Poll::Ready(()) Poll::Ready(true)
} else {
Poll::Ready(false)
}
} }
#[cfg(any(feature = "tokio", feature = "smol"))] #[cfg(any(feature = "tokio", feature = "smol"))]
#[inline] #[inline]
fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<Result<&[u8], std::io::Error>> { fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<Result<&[u8], std::io::Error>> {
while !self.has_data() { while !self.has_data() {
ready!(self.poll(cx)); if !ready!(self.poll(cx)) {
return Poll::Ready(Ok(&[]));
}
} }
Poll::Ready(self.remaining()) Poll::Ready(self.remaining())
} }
@ -251,7 +277,12 @@ impl<EpType: BulkOrInterrupt> EndpointRead<EpType> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<&[u8], std::io::Error>> { ) -> Poll<Result<&[u8], std::io::Error>> {
while !self.has_data_or_short_end() { while !self.has_data_or_short_end() {
ready!(self.poll(cx)); if !ready!(self.poll(cx)) {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"ended without short packet",
)));
}
} }
Poll::Ready(self.remaining()) Poll::Ready(self.remaining())
} }
@ -271,7 +302,9 @@ impl<EpType: BulkOrInterrupt> BufRead for EndpointRead<EpType> {
#[inline] #[inline]
fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> { fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> {
while !self.has_data() { while !self.has_data() {
self.wait()?; if !self.wait()? {
return Ok(&[]);
}
} }
self.remaining() self.remaining()
} }
@ -393,7 +426,12 @@ impl<EpType: BulkOrInterrupt> BufRead for EndpointReadUntilShortPacket<'_, EpTyp
#[inline] #[inline]
fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> { fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> {
while !self.reader.has_data_or_short_end() { while !self.reader.has_data_or_short_end() {
self.reader.wait()?; if !self.reader.wait()? {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"ended without short packet",
));
}
} }
self.reader.remaining() self.reader.remaining()
} }