From 89d55ab6a2635fb2477333fe69894aa7105459a9 Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Mon, 14 Jul 2025 08:31:41 -0600 Subject: [PATCH] EndpointRead: improve semantics for num_transfers --- src/io/read.rs | 120 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 41 deletions(-) diff --git a/src/io/read.rs b/src/io/read.rs index 0645148..4b69b0f 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -88,33 +88,41 @@ impl EndpointRead { Self { endpoint, reading: None, - num_transfers: 0, + num_transfers: 1, transfer_size, read_timeout: Duration::MAX, } } - /// Set the number of transfers to maintain pending at all times. + /// Set the number of concurrent transfers. /// - /// A value of 0 means that transfers will only be submitted when calling - /// `read()` or `fill_buf()` and the buffer is empty. To maximize throughput, - /// a value of 2 or more is recommended for applications that stream data - /// continuously. + /// A value of 1 (default) means that transfers will only be submitted when + /// calling `read()` or `fill_buf()` and the buffer is empty. To maximize + /// throughput, a value of 2 or more is recommended for applications that + /// stream data continuously so that the host controller can continue to + /// receive data while the application processes the data from a completed + /// transfer. + /// + /// A value of 0 means no further transfers will be submitted. Existing + /// transfers will complete normally, and subsequent calls to `read()` and + /// `fill_buf()` will return zero bytes (EOF). /// /// This submits more transfers when increasing the number, but does not - /// cancel transfers when decreasing it. + /// [cancel transfers](Self::cancel_all) when decreasing it. pub fn set_num_transfers(&mut self, num_transfers: usize) { self.num_transfers = num_transfers; - while self.endpoint.pending() < num_transfers { + // Leave the last transfer to be submitted by `read` such that + // a value of `1` only has transfers pending within `read` calls. + while self.endpoint.pending() < num_transfers.saturating_sub(1) { let buf = self.endpoint.allocate(self.transfer_size); self.endpoint.submit(buf); } } - /// Set the number of transfers to maintain pending at all times. + /// Set the number of concurrent transfers. /// - /// See [Self::set_num_transfers] -- this is for method chaining with `EndpointRead::new()`. + /// See [Self::set_num_transfers] (this version is for method chaining). pub fn with_num_transfers(mut self, num_transfers: usize) -> Self { self.set_num_transfers(num_transfers); self @@ -141,8 +149,14 @@ impl EndpointRead { /// Cancel all pending transfers. /// - /// They will be re-submitted on the next read. + /// This sets [`num_transfers`](Self::set_num_transfers) to 0, so no further + /// transfers will be submitted. Any data buffered before the transfers are cancelled + /// can be read, and then the read methods will return 0 bytes (EOF). + /// + /// Call [`num_transfers`](Self::set_num_transfers) with a non-zero value + /// to resume receiving data. pub fn cancel_all(&mut self) { + self.num_transfers = 0; self.endpoint.cancel_all(); } @@ -183,15 +197,19 @@ impl EndpointRead { } } - fn start_read(&mut self) { - let t = usize::max(1, self.num_transfers); - if self.endpoint.pending() < t { + fn start_read(&mut self) -> bool { + if self.endpoint.pending() < self.num_transfers { + // Re-use the last completed buffer if available self.resubmit(); - while self.endpoint.pending() < t { + while self.endpoint.pending() < self.num_transfers { + // Allocate more buffers for any remaining transfers let buf = self.endpoint.allocate(self.transfer_size); self.endpoint.submit(buf); } } + + // If num_transfers is 0 and all transfers are complete + self.endpoint.pending() > 0 } #[inline] @@ -208,38 +226,46 @@ impl EndpointRead { } } - fn wait(&mut self) -> Result<(), std::io::Error> { - self.start_read(); - let c = self.endpoint.wait_next_complete(self.read_timeout); - let c = c.ok_or(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "timeout waiting for read", - ))?; - self.reading = Some(ReadBuffer { - pos: 0, - buf: c.buffer, - status: c.status, - }); - Ok(()) + fn wait(&mut self) -> Result { + if self.start_read() { + let c = self.endpoint.wait_next_complete(self.read_timeout); + let c = c.ok_or(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "timeout waiting for read", + ))?; + self.reading = Some(ReadBuffer { + pos: 0, + buf: c.buffer, + status: c.status, + }); + Ok(true) + } else { + Ok(false) + } } #[cfg(any(feature = "tokio", feature = "smol"))] - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.start_read(); - let c = ready!(self.endpoint.poll_next_complete(cx)); - self.reading = Some(ReadBuffer { - pos: 0, - buf: c.buffer, - status: c.status, - }); - Poll::Ready(()) + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + if self.start_read() { + let c = ready!(self.endpoint.poll_next_complete(cx)); + self.reading = Some(ReadBuffer { + pos: 0, + buf: c.buffer, + status: c.status, + }); + Poll::Ready(true) + } else { + Poll::Ready(false) + } } #[cfg(any(feature = "tokio", feature = "smol"))] #[inline] fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll> { while !self.has_data() { - ready!(self.poll(cx)); + if !ready!(self.poll(cx)) { + return Poll::Ready(Ok(&[])); + } } Poll::Ready(self.remaining()) } @@ -251,7 +277,12 @@ impl EndpointRead { cx: &mut Context<'_>, ) -> Poll> { while !self.has_data_or_short_end() { - ready!(self.poll(cx)); + if !ready!(self.poll(cx)) { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "ended without short packet", + ))); + } } Poll::Ready(self.remaining()) } @@ -271,7 +302,9 @@ impl BufRead for EndpointRead { #[inline] fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> { while !self.has_data() { - self.wait()?; + if !self.wait()? { + return Ok(&[]); + } } self.remaining() } @@ -393,7 +426,12 @@ impl BufRead for EndpointReadUntilShortPacket<'_, EpTyp #[inline] fn fill_buf(&mut self) -> Result<&[u8], std::io::Error> { while !self.reader.has_data_or_short_end() { - self.reader.wait()?; + if !self.reader.wait()? { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "ended without short packet", + )); + } } self.reader.remaining() }