fix: guarantee RET_SUBMIT before RET_UNLINK wire ordering

The previous TOCTOU fix prevented sending both responses from two
threads, but a channel-ordering problem remained: the spawn_blocking
task and the UNLINK handler both sent to the same mpsc channel from
different threads, so RET_UNLINK could arrive at the kernel before
RET_SUBMIT.  The kernel (vhci_rx.c) then gives back the URB via the
unlink path, and the subsequent RET_SUBMIT triggers "cannot find a
urb" → VDEV_EVENT_ERROR_TCP → device disconnect.

Fix by making the spawn_blocking task the sole sender of RET_UNLINK
when the URB is still in-flight.  The UNLINK handler now stores its
response header in the InFlightUrb entry instead of sending
immediately.  The spawn_blocking task drains this after sending
RET_SUBMIT (or instead of it, if cancelled), guaranteeing same-thread
FIFO ordering through the channel.  The UNLINK handler only sends
RET_UNLINK directly for the "not found" case, where the entry has
already been removed — meaning RET_SUBMIT is already in the channel.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Davíð Steinn Geirsson 2026-03-25 12:53:49 +00:00
parent 80a9f35e39
commit ed4fa758f1

View file

@ -447,6 +447,7 @@ pub async fn handle_urb_loop<T: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
use tokio::io::{split, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::usbip_protocol::UsbIpHeaderBasic;
let (mut reader, writer) = split(socket);
let (response_tx, mut response_rx) = mpsc::channel::<Vec<u8>>(64);
@ -463,8 +464,17 @@ pub async fn handle_urb_loop<T: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
Ok::<_, std::io::Error>(())
});
// Track in-flight URBs for CMD_UNLINK cancellation
let in_flight: Arc<std::sync::Mutex<HashMap<u32, CancellationToken>>> =
// Track in-flight URBs for CMD_UNLINK cancellation.
// When CMD_UNLINK arrives and the entry exists, the UNLINK handler stores
// its response header here instead of sending RET_UNLINK immediately.
// The spawn_blocking task then sends RET_UNLINK *after* RET_SUBMIT (if
// any), guaranteeing the wire ordering the kernel requires.
struct InFlightUrb {
cancel: CancellationToken,
/// Populated by the UNLINK handler; drained by spawn_blocking.
unlink_header: Option<UsbIpHeaderBasic>,
}
let in_flight: Arc<std::sync::Mutex<HashMap<u32, InFlightUrb>>> =
Arc::new(std::sync::Mutex::new(HashMap::new()));
// Compute expected devid for validation: the Linux kernel performs this
@ -575,15 +585,34 @@ pub async fn handle_urb_loop<T: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
let tx = response_tx.clone();
let device = device.clone();
let cancel = CancellationToken::new();
in_flight.lock().unwrap_or_else(|e| e.into_inner()).insert(seqnum, cancel.clone());
in_flight.lock().unwrap_or_else(|e| e.into_inner()).insert(seqnum, InFlightUrb {
cancel: cancel.clone(),
unlink_header: None,
});
let in_flight_ref = in_flight.clone();
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
if cancel.is_cancelled() {
in_flight_ref.lock().unwrap_or_else(|e| e.into_inner()).remove(&seqnum);
return;
// Fast path: if already cancelled before we start, send
// the deferred RET_UNLINK and skip handle_urb entirely.
{
let mut guard = in_flight_ref.lock().unwrap_or_else(|e| e.into_inner());
let is_cancelled = guard.get(&seqnum).map_or(false, |e| e.cancel.is_cancelled());
if is_cancelled {
let entry = guard.remove(&seqnum);
drop(guard);
if let Some(hdr) = entry.and_then(|e| e.unlink_header) {
let res = UsbIpResponse::UsbIpRetUnlink {
header: hdr,
status: (-104i32) as u32, // ECONNRESET
};
if let Ok(bytes) = res.to_bytes() {
let _ = tx.blocking_send(bytes);
}
}
return;
}
}
let res = match device.find_ep(real_ep as u8) {
@ -675,26 +704,30 @@ pub async fn handle_urb_loop<T: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
}
};
// Remove from in-flight BEFORE checking cancellation to close
// a TOCTOU race with the UNLINK handler. The UNLINK handler
// only sends RET_UNLINK(status=0) when it finds the entry in
// in_flight; by removing first we guarantee either:
// (a) UNLINK already found + cancelled us → is_cancelled()
// returns true here, we suppress our RET_SUBMIT (kernel
// will use the RET_UNLINK to give back the URB), or
// (b) UNLINK won't find us → it sends RET_UNLINK(-ENOENT),
// and our RET_SUBMIT completes the URB normally (the
// kernel handles both correctly).
// Without this ordering, the UNLINK handler can cancel + send
// RET_UNLINK(status=0) between our is_cancelled() check and
// the remove, causing both RET_UNLINK and RET_SUBMIT to be
// sent — a fatal protocol violation that crashes vhci_hcd.
in_flight_ref.lock().unwrap_or_else(|e| e.into_inner()).remove(&seqnum);
// Check cancellation under the in_flight lock. If cancelled,
// the UNLINK handler stored its header in the entry — we send
// the deferred RET_UNLINK instead of RET_SUBMIT.
let cancelled = {
let guard = in_flight_ref.lock().unwrap_or_else(|e| e.into_inner());
guard.get(&seqnum).map_or(false, |e| e.cancel.is_cancelled())
};
if cancel.is_cancelled() {
if cancelled {
let entry = in_flight_ref.lock().unwrap_or_else(|e| e.into_inner())
.remove(&seqnum);
if let Some(hdr) = entry.and_then(|e| e.unlink_header) {
let unlink_res = UsbIpResponse::UsbIpRetUnlink {
header: hdr,
status: (-104i32) as u32, // ECONNRESET
};
if let Ok(bytes) = unlink_res.to_bytes() {
let _ = tx.blocking_send(bytes);
}
}
return;
}
// Not cancelled: send RET_SUBMIT first.
match res.to_bytes() {
Ok(bytes) => {
if tx.blocking_send(bytes).is_err() {
@ -703,6 +736,25 @@ pub async fn handle_urb_loop<T: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
}
Err(e) => warn!("Failed to serialize response for seqnum={seqnum}: {e}"),
}
// Now remove from in_flight. Any RET_UNLINK we send after
// this point is guaranteed to follow RET_SUBMIT in the
// channel (same sender thread → FIFO). If an UNLINK stored
// its header between our cancel-check and this remove, we
// pick it up here and send the deferred RET_UNLINK.
let late_unlink = in_flight_ref.lock().unwrap_or_else(|e| e.into_inner())
.remove(&seqnum)
.and_then(|e| e.unlink_header);
if let Some(hdr) = late_unlink {
let unlink_res = UsbIpResponse::UsbIpRetUnlink {
header: hdr,
status: (-104i32) as u32, // ECONNRESET
};
match unlink_res.to_bytes() {
Ok(bytes) => { let _ = tx.blocking_send(bytes); }
Err(e) => warn!("Failed to serialize late UNLINK response: {e}"),
}
}
});
}
UsbIpCommand::UsbIpCmdUnlink {
@ -735,34 +787,38 @@ pub async fn handle_urb_loop<T: AsyncReadExt + AsyncWriteExt + Unpin + Send + 's
header.command = USBIP_RET_UNLINK.into();
let cancelled = in_flight
.lock()
.unwrap()
.get(&unlink_seqnum)
.map(|token| {
token.cancel();
// Try to cancel the in-flight URB. If found, store our
// response header so the spawn_blocking task can send
// RET_UNLINK *after* any RET_SUBMIT — the kernel requires
// RET_SUBMIT to arrive first on the wire.
let found = {
let mut guard = in_flight.lock().unwrap();
if let Some(entry) = guard.get_mut(&unlink_seqnum) {
entry.cancel.cancel();
entry.unlink_header = Some(header.clone());
true
})
.unwrap_or(false);
let status = if cancelled {
0u32
} else {
trace!("UNLINK: seqnum={unlink_seqnum} not found in-flight (already completed)");
(-2i32) as u32
};
let res = UsbIpResponse::UsbIpRetUnlink {
header: header.clone(),
status,
};
match res.to_bytes() {
Ok(bytes) => {
if response_tx.send(bytes).await.is_err() {
break Err(std::io::Error::other("Response channel closed"));
}
} else {
false
}
};
if !found {
// URB already completed and removed from in_flight.
// Its RET_SUBMIT is already in the channel, so our
// RET_UNLINK will follow it — safe to send now.
trace!("UNLINK: seqnum={unlink_seqnum} not found in-flight (already completed)");
let res = UsbIpResponse::UsbIpRetUnlink {
header: header.clone(),
status: (-2i32) as u32, // ENOENT
};
match res.to_bytes() {
Ok(bytes) => {
if response_tx.send(bytes).await.is_err() {
break Err(std::io::Error::other("Response channel closed"));
}
}
Err(e) => warn!("Failed to serialize UNLINK response: {e}"),
}
Err(e) => warn!("Failed to serialize UNLINK response: {e}"),
}
}
_ => {