feat: concurrent ISO pipelining via nusb update and &self handlers

Update nusb to c1380673 which allows multiple IsoEndpoint instances per
address, enabling concurrent URB submission from separate threads.

Change UsbInterfaceHandler trait methods from &mut self to &self and
replace Arc<Mutex<Box<dyn Handler>>> with Arc<dyn Handler>. This
removes the serialization bottleneck where the handler mutex was held
for the entire USB transfer duration, causing ISO audio to play at
~67% speed.

Handlers needing interior mutability (HID, CDC) now use Mutex on
individual fields. Passthrough handlers already used Arc<Mutex<>>
internally and need no changes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Davíð Steinn Geirsson 2026-03-22 15:10:28 +00:00
parent 9250c062aa
commit 4c368c02b5
15 changed files with 96 additions and 127 deletions

2
Cargo.lock generated
View file

@ -470,7 +470,7 @@ dependencies = [
[[package]]
name = "nusb"
version = "0.2.3"
source = "git+https://git.dsg.is/dsg/nusb.git?rev=997a76cd#997a76cd3c5c528ea08ad005d85e47bc9a7b91b5"
source = "git+https://git.dsg.is/dsg/nusb.git?rev=c1380673#c1380673db9c692b8468104d7f3e81144a1ee12a"
dependencies = [
"core-foundation",
"core-foundation-sys",

View file

@ -16,4 +16,4 @@ tokio-vsock = "0.7"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "time", "net"] }
log = "0.4"
env_logger = "0.11"
nusb = { git = "https://git.dsg.is/dsg/nusb.git", rev = "997a76cd" }
nusb = { git = "https://git.dsg.is/dsg/nusb.git", rev = "c1380673" }

View file

@ -121,11 +121,11 @@ fn build_usb_device(dev: nusb::Device, dev_info: nusb::DeviceInfo) -> Result<Usb
}
};
let handler: Arc<Mutex<Box<dyn UsbInterfaceHandler + Send>>> = Arc::new(Mutex::new(
Box::new(NusbUsbHostInterfaceHandler::new(Arc::new(Mutex::new(
let handler: Arc<dyn UsbInterfaceHandler> = Arc::new(
NusbUsbHostInterfaceHandler::new(Arc::new(Mutex::new(
claimed.clone(),
)))),
));
))),
);
let alt_settings: Vec<UsbInterface> = claimed
.descriptors()

View file

@ -1,5 +1,5 @@
use log::info;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use usbip_rs::{
@ -13,7 +13,7 @@ use crate::transport;
async fn do_test_hid_session<S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static>(
mut stream: S,
device: UsbDevice,
handler: Arc<Mutex<Box<dyn UsbInterfaceHandler + Send>>>,
handler: Arc<UsbHidKeyboardHandler>,
) -> std::io::Result<()> {
// Send device info (simplified handshake)
let handshake = UsbIpResponse::op_rep_import_success(&device).to_bytes()?;
@ -28,11 +28,9 @@ async fn do_test_hid_session<S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let mut h = handler_clone.lock().unwrap();
if let Some(hid) = h.as_any().downcast_mut::<UsbHidKeyboardHandler>() {
match usbip_rs::hid::UsbHidKeyboardReport::from_ascii(b'1') {
Ok(report) => {
hid.pending_key_events.push_back(report);
handler_clone.pending_key_events.lock().unwrap().push_back(report);
info!("Simulated key event '1'");
}
Err(e) => {
@ -40,7 +38,6 @@ async fn do_test_hid_session<S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
}
}
}
}
});
// Handle URBs
@ -49,9 +46,7 @@ async fn do_test_hid_session<S: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
pub async fn run(addr: transport::TransportAddr) -> std::io::Result<()> {
// Create simulated HID keyboard
let handler = Arc::new(Mutex::new(
Box::new(UsbHidKeyboardHandler::new_keyboard()) as Box<dyn UsbInterfaceHandler + Send>
));
let handler = Arc::new(UsbHidKeyboardHandler::new_keyboard());
let device = UsbDevice::new(0)?.with_interface(
ClassCode::HID as u8,
@ -64,7 +59,7 @@ pub async fn run(addr: transport::TransportAddr) -> std::io::Result<()> {
max_packet_size: 0x08, // 8 bytes
interval: 10,
}],
handler.clone(),
handler.clone() as Arc<dyn UsbInterfaceHandler>,
)?;
info!(

View file

@ -31,7 +31,7 @@
src = self;
cargoHash = "sha256-xEyWh+bP7F+a0KpH24hVliC1JQMRIW0wfmUB3rFNvyk=";
cargoHash = "sha256-ol61Q+2IYDZ4ZvJiLYJdfqaN23thuZ1QHDOSvNXHhvY=";
inherit nativeBuildInputs buildInputs;

View file

@ -14,7 +14,7 @@ num-traits = "0.2.15"
num-derive = "0.4.2"
rusb = "0.9.3"
serde = { version = "1.0", features = ["derive"], optional = true }
nusb = { git = "https://git.dsg.is/dsg/nusb.git", rev = "997a76cd" }
nusb = { git = "https://git.dsg.is/dsg/nusb.git", rev = "c1380673" }
tokio-util = { version = "0.7", features = ["rt"] }
[dev-dependencies]

View file

@ -1,13 +1,12 @@
use log::*;
use std::net::*;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() {
env_logger::init();
let handler = Arc::new(Mutex::new(Box::new(usbip_rs::cdc::UsbCdcAcmHandler::new())
as Box<dyn usbip_rs::UsbInterfaceHandler + Send>));
let handler = Arc::new(usbip_rs::cdc::UsbCdcAcmHandler::new());
let server = Arc::new(usbip_rs::UsbIpServer::new_simulated(vec![
usbip_rs::UsbDevice::new(0)
.expect("create device")
@ -17,7 +16,7 @@ async fn main() {
0x00,
Some("Test CDC ACM"),
usbip_rs::cdc::UsbCdcAcmHandler::endpoints(),
handler.clone(),
handler.clone() as Arc<dyn usbip_rs::UsbInterfaceHandler>,
)
.expect("add interface"),
]));
@ -27,13 +26,7 @@ async fn main() {
loop {
// sleep 1s
tokio::time::sleep(Duration::new(1, 0)).await;
let mut handler = handler.lock().unwrap();
if let Some(acm) = handler
.as_any()
.downcast_mut::<usbip_rs::cdc::UsbCdcAcmHandler>()
{
acm.tx_buffer.push(b'a');
handler.tx_buffer.lock().unwrap().push(b'a');
info!("Simulate a char input");
}
}
}

View file

@ -1,15 +1,12 @@
use log::*;
use std::net::*;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() {
env_logger::init();
let handler = Arc::new(Mutex::new(
Box::new(usbip_rs::hid::UsbHidKeyboardHandler::new_keyboard())
as Box<dyn usbip_rs::UsbInterfaceHandler + Send>,
));
let handler = Arc::new(usbip_rs::hid::UsbHidKeyboardHandler::new_keyboard());
let server = Arc::new(usbip_rs::UsbIpServer::new_simulated(vec![
usbip_rs::UsbDevice::new(0)
.expect("create device")
@ -24,7 +21,7 @@ async fn main() {
max_packet_size: 0x08, // 8 bytes
interval: 10,
}],
handler.clone(),
handler.clone() as Arc<dyn usbip_rs::UsbInterfaceHandler>,
)
.expect("add interface"),
]));
@ -34,14 +31,9 @@ async fn main() {
loop {
// sleep 1s
tokio::time::sleep(Duration::new(1, 0)).await;
let mut handler = handler.lock().unwrap();
if let Some(hid) = handler
.as_any()
.downcast_mut::<usbip_rs::hid::UsbHidKeyboardHandler>()
{
match usbip_rs::hid::UsbHidKeyboardReport::from_ascii(b'1') {
Ok(report) => {
hid.pending_key_events.push_back(report);
handler.pending_key_events.lock().unwrap().push_back(report);
info!("Simulate a key event");
}
Err(e) => {
@ -49,5 +41,4 @@ async fn main() {
}
}
}
}
}

View file

@ -2,9 +2,9 @@
use super::*;
/// A handler of a CDC ACM(Abstract Control Model)
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct UsbCdcAcmHandler {
pub tx_buffer: Vec<u8>,
pub tx_buffer: Mutex<Vec<u8>>,
}
impl Default for UsbCdcAcmHandler {
@ -18,7 +18,7 @@ pub const CDC_ACM_SUBCLASS: u8 = 0x02;
impl UsbCdcAcmHandler {
pub fn new() -> Self {
Self { tx_buffer: vec![] }
Self { tx_buffer: Mutex::new(vec![]) }
}
pub fn endpoints() -> Vec<UsbEndpoint> {
@ -50,7 +50,7 @@ impl UsbCdcAcmHandler {
impl UsbInterfaceHandler for UsbCdcAcmHandler {
fn handle_urb(
&mut self,
&self,
_interface: &UsbInterface,
request: UrbRequest,
) -> Result<UrbResponse> {
@ -74,15 +74,13 @@ impl UsbInterfaceHandler for UsbCdcAcmHandler {
return Ok(UrbResponse::default());
} else {
// bulk in
// Handle max packet size - return data in chunks of max_packet_size
let max_packet_size = ep.max_packet_size as usize;
let resp = if self.tx_buffer.len() > max_packet_size {
// Return only the first chunk (max_packet_size bytes)
self.tx_buffer.drain(..max_packet_size).collect::<Vec<_>>()
let mut tx_buffer = self.tx_buffer.lock().unwrap();
let resp = if tx_buffer.len() > max_packet_size {
tx_buffer.drain(..max_packet_size).collect::<Vec<_>>()
} else {
// Return all data if it fits in one packet
let resp = self.tx_buffer.clone();
self.tx_buffer.clear();
let resp = tx_buffer.clone();
tx_buffer.clear();
resp
};
return Ok(UrbResponse { data: resp, ..Default::default() });
@ -106,7 +104,7 @@ impl UsbInterfaceHandler for UsbCdcAcmHandler {
]
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}

View file

@ -188,13 +188,13 @@ impl UsbDevice {
interface_protocol: u8,
name: Option<&str>,
endpoints: Vec<UsbEndpoint>,
handler: Arc<Mutex<Box<dyn UsbInterfaceHandler + Send>>>,
handler: Arc<dyn UsbInterfaceHandler>,
) -> std::io::Result<Self> {
let string_interface = match name {
Some(name) => self.new_string(name)?,
None => 0,
};
let class_specific_descriptor = handler.lock().unwrap().get_class_specific_descriptor();
let class_specific_descriptor = handler.get_class_specific_descriptor();
self.interface_states.push(InterfaceState::new(UsbInterface {
interface_class,
interface_subclass,
@ -621,8 +621,7 @@ impl UsbDevice {
}
};
let inner = state.inner.read().await;
let mut handler = inner.active.handler.lock().unwrap();
handler.handle_urb(&inner.active, UrbRequest {
inner.active.handler.handle_urb(&inner.active, UrbRequest {
ep,
transfer_buffer_length,
setup: setup_packet,
@ -682,7 +681,7 @@ impl UsbDevice {
let mut inner = state.inner.write().await;
if (alt as usize) < inner.alt_settings.len() {
// Notify the handler so it can update the physical device
inner.active.handler.lock().unwrap().set_alt_setting(alt)?;
inner.active.handler.set_alt_setting(alt)?;
inner.active = inner.alt_settings[alt as usize].clone();
inner.current_alt = alt;
info!("SET_INTERFACE: intf={intf_index} alt={alt}");
@ -718,8 +717,7 @@ impl UsbDevice {
}
};
let inner = state.inner.read().await;
let mut handler = inner.active.handler.lock().unwrap();
handler.handle_urb(&inner.active, UrbRequest {
inner.active.handler.handle_urb(&inner.active, UrbRequest {
ep,
transfer_buffer_length,
setup: setup_packet,
@ -755,8 +753,7 @@ impl UsbDevice {
Some(idx) => {
let state = &self.interface_states[idx];
let inner = state.inner.read().await;
let mut handler = inner.active.handler.lock().unwrap();
handler.handle_urb(
inner.active.handler.handle_urb(
&inner.active,
request,
)
@ -789,7 +786,7 @@ pub trait UsbDeviceHandler: std::fmt::Debug {
fn handle_urb(&mut self, request: UrbRequest) -> Result<UrbResponse>;
/// Helper to downcast to actual struct
fn as_any(&mut self) -> &mut dyn Any;
fn as_any(&self) -> &dyn Any;
}
#[cfg(test)]

View file

@ -13,12 +13,11 @@ enum UsbHidKeyboardHandlerState {
}
/// A handler of a HID keyboard
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug)]
pub struct UsbHidKeyboardHandler {
pub report_descriptor: Vec<u8>,
pub pending_key_events: VecDeque<UsbHidKeyboardReport>,
state: UsbHidKeyboardHandlerState,
pub pending_key_events: Mutex<VecDeque<UsbHidKeyboardReport>>,
state: Mutex<UsbHidKeyboardHandlerState>,
}
/// A report of a HID keyboard
@ -59,8 +58,8 @@ impl UsbHidKeyboardReport {
impl UsbHidKeyboardHandler {
pub fn new_keyboard() -> Self {
Self {
pending_key_events: VecDeque::new(),
state: UsbHidKeyboardHandlerState::Idle,
pending_key_events: Mutex::new(VecDeque::new()),
state: Mutex::new(UsbHidKeyboardHandlerState::Idle),
report_descriptor: vec![
0x05, 0x01, // Usage Page (Generic Desktop)
0x09, 0x06, // Usage (Keyboard)
@ -95,7 +94,7 @@ impl UsbHidKeyboardHandler {
impl UsbInterfaceHandler for UsbHidKeyboardHandler {
fn handle_urb(
&mut self,
&self,
_interface: &UsbInterface,
request: UrbRequest,
) -> Result<UrbResponse> {
@ -134,20 +133,21 @@ impl UsbInterfaceHandler for UsbHidKeyboardHandler {
// interrupt transfer
if let Direction::In = ep.direction() {
// interrupt in
match self.state {
let mut state = self.state.lock().unwrap();
match *state {
UsbHidKeyboardHandlerState::Idle => {
if let Some(report) = self.pending_key_events.pop_front() {
if let Some(report) = self.pending_key_events.lock().unwrap().pop_front() {
let mut resp = vec![report.modifier, 0];
resp.extend_from_slice(&report.keys);
info!("HID key down");
self.state = UsbHidKeyboardHandlerState::KeyDown;
*state = UsbHidKeyboardHandlerState::KeyDown;
return Ok(UrbResponse { data: resp, ..Default::default() });
}
}
UsbHidKeyboardHandlerState::KeyDown => {
let resp = vec![0; 6];
info!("HID key up");
self.state = UsbHidKeyboardHandlerState::Idle;
*state = UsbHidKeyboardHandlerState::Idle;
return Ok(UrbResponse { data: resp, ..Default::default() });
}
}
@ -170,7 +170,7 @@ impl UsbInterfaceHandler for UsbHidKeyboardHandler {
]
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}

View file

@ -31,7 +31,7 @@ impl RusbUsbHostInterfaceHandler {
impl UsbInterfaceHandler for RusbUsbHostInterfaceHandler {
fn handle_urb(
&mut self,
&self,
_interface: &UsbInterface,
request: UrbRequest,
) -> Result<UrbResponse> {
@ -123,7 +123,7 @@ impl UsbInterfaceHandler for RusbUsbHostInterfaceHandler {
vec![]
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}
@ -186,7 +186,7 @@ impl UsbDeviceHandler for RusbUsbHostDeviceHandler {
Ok(UrbResponse::default())
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}
@ -212,7 +212,7 @@ impl NusbUsbHostInterfaceHandler {
}
impl UsbInterfaceHandler for NusbUsbHostInterfaceHandler {
fn set_alt_setting(&mut self, alt: u8) -> Result<()> {
fn set_alt_setting(&self, alt: u8) -> Result<()> {
let handle = self.handle.lock().unwrap();
handle.set_alt_setting(alt).wait().map_err(|e| {
std::io::Error::other(format!("Failed to set alt setting {alt}: {e}"))
@ -220,7 +220,7 @@ impl UsbInterfaceHandler for NusbUsbHostInterfaceHandler {
}
fn handle_urb(
&mut self,
&self,
_interface: &UsbInterface,
request: UrbRequest,
) -> Result<UrbResponse> {
@ -425,7 +425,6 @@ impl UsbInterfaceHandler for NusbUsbHostInterfaceHandler {
let mut endpoint = handle
.iso_endpoint::<nusb::transfer::In>(ep.address, num_packets)
.map_err(|e| std::io::Error::other(format!("Failed to open ISO IN endpoint: {e}")))?;
let buf = endpoint.allocate(total_len);
drop(handle);
endpoint.submit(buf, packet_size);
@ -468,8 +467,6 @@ impl UsbInterfaceHandler for NusbUsbHostInterfaceHandler {
.map_err(|e| std::io::Error::other(format!("Failed to open ISO OUT endpoint: {e}")))?;
let mut buf = endpoint.allocate(total_len);
// allocate() returns a zero-length buffer with capacity;
// fill it from request data, zero-padding if needed
let copy_len = request.data.len().min(total_len);
if copy_len > 0 {
buf.extend_from_slice(&request.data[..copy_len]);
@ -488,7 +485,6 @@ impl UsbInterfaceHandler for NusbUsbHostInterfaceHandler {
}
let mut response_descriptors = Vec::new();
for (i, pkt) in completion.packets.iter().enumerate() {
response_descriptors.push(IsoPacketDescriptor {
offset: request.iso_packet_descriptors.get(i).map(|d| d.offset).unwrap_or(0),
@ -514,7 +510,7 @@ impl UsbInterfaceHandler for NusbUsbHostInterfaceHandler {
vec![]
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}
@ -640,7 +636,7 @@ impl UsbDeviceHandler for NusbUsbHostDeviceHandler {
Ok(UrbResponse::default())
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}

View file

@ -13,7 +13,7 @@ pub struct UsbInterface {
pub class_specific_descriptor: Vec<u8>,
#[cfg_attr(feature = "serde", serde(skip))]
pub handler: Arc<Mutex<Box<dyn UsbInterfaceHandler + Send>>>,
pub handler: Arc<dyn UsbInterfaceHandler>,
}
/// Inner mutable state for an interface, protected by RwLock.
@ -103,24 +103,29 @@ pub struct UrbResponse {
pub error_count: u32,
}
/// A handler of a custom usb interface
pub trait UsbInterfaceHandler: std::fmt::Debug {
/// A handler of a custom usb interface.
///
/// Methods take `&self` (not `&mut self`) so that multiple URBs can be
/// processed concurrently on the same interface — critical for isochronous
/// pipelining. Implementations that need interior mutability should use
/// `Mutex`, `AtomicU8`, etc.
pub trait UsbInterfaceHandler: std::fmt::Debug + Send + Sync {
/// Return the class specific descriptor which is inserted between interface descriptor and endpoint descriptor
fn get_class_specific_descriptor(&self) -> Vec<u8>;
/// Handle a URB(USB Request Block) targeting at this interface
fn handle_urb(
&mut self,
&self,
interface: &UsbInterface,
request: UrbRequest,
) -> Result<UrbResponse>;
/// Notify the handler that the alternate setting has changed.
/// For host passthrough handlers, this updates the underlying USB interface.
fn set_alt_setting(&mut self, _alt: u8) -> Result<()> {
fn set_alt_setting(&self, _alt: u8) -> Result<()> {
Ok(())
}
/// Helper to downcast to actual struct
fn as_any(&mut self) -> &mut dyn Any;
fn as_any(&self) -> &dyn Any;
}

View file

@ -93,10 +93,9 @@ impl UsbIpServer {
});
}
let handler = Arc::new(Mutex::new(Box::new(NusbUsbHostInterfaceHandler::new(
Arc::new(Mutex::new(intf.clone())),
))
as Box<dyn UsbInterfaceHandler + Send>));
let handler: Arc<dyn UsbInterfaceHandler> = Arc::new(
NusbUsbHostInterfaceHandler::new(Arc::new(Mutex::new(intf.clone()))),
);
interface_states.push(InterfaceState::new(UsbInterface {
interface_class: alt_setting.class(),
interface_subclass: alt_setting.subclass(),
@ -239,10 +238,9 @@ impl UsbIpServer {
});
}
let handler = Arc::new(Mutex::new(Box::new(RusbUsbHostInterfaceHandler::new(
handle.clone(),
))
as Box<dyn UsbInterfaceHandler + Send>));
let handler: Arc<dyn UsbInterfaceHandler> = Arc::new(
RusbUsbHostInterfaceHandler::new(handle.clone()),
);
interface_states.push(InterfaceState::new(UsbInterface {
interface_class: intf_desc.class_code(),
interface_subclass: intf_desc.sub_class_code(),
@ -816,9 +814,7 @@ mod tests {
0x00,
Some("Test CDC ACM"),
cdc::UsbCdcAcmHandler::endpoints(),
Arc::new(Mutex::new(
Box::new(cdc::UsbCdcAcmHandler::new()) as Box<dyn UsbInterfaceHandler + Send>
)),
Arc::new(cdc::UsbCdcAcmHandler::new()) as Arc<dyn UsbInterfaceHandler>,
)
.unwrap()])
}
@ -1200,10 +1196,9 @@ mod tests {
}
fn new_iso_test_device() -> UsbDevice {
let handler = Arc::new(Mutex::new(
Box::new(util::tests::IsoLoopbackHandler::new())
as Box<dyn UsbInterfaceHandler + Send>,
));
let handler: Arc<dyn UsbInterfaceHandler> = Arc::new(
util::tests::IsoLoopbackHandler::new(),
);
UsbDevice::new(0)
.unwrap()
.with_interface(

View file

@ -99,12 +99,12 @@ pub(crate) mod tests {
/// ISO OUT: accepts and discards data, reports success per packet.
#[derive(Debug)]
pub(crate) struct IsoLoopbackHandler {
counter: u8,
counter: std::sync::atomic::AtomicU8,
}
impl IsoLoopbackHandler {
pub fn new() -> Self {
Self { counter: 0 }
Self { counter: std::sync::atomic::AtomicU8::new(0) }
}
}
@ -114,7 +114,7 @@ pub(crate) mod tests {
}
fn handle_urb(
&mut self,
&self,
_interface: &UsbInterface,
request: UrbRequest,
) -> std::io::Result<UrbResponse> {
@ -127,8 +127,7 @@ pub(crate) mod tests {
let mut descriptors = Vec::new();
for desc in &request.iso_packet_descriptors {
let fill_byte = self.counter;
self.counter = self.counter.wrapping_add(1);
let fill_byte = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let actual_len = desc.length;
let packet_data = vec![fill_byte; actual_len as usize];
data.extend_from_slice(&packet_data);
@ -178,7 +177,7 @@ pub(crate) mod tests {
Ok(UrbResponse::default())
}
fn as_any(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}
}