diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 4b3fbce..a1cf5e5 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -447,6 +447,7 @@ pub async fn handle_urb_loop>(64); @@ -463,8 +464,17 @@ pub async fn handle_urb_loop(()) }); - // Track in-flight URBs for CMD_UNLINK cancellation - let in_flight: Arc>> = + // Track in-flight URBs for CMD_UNLINK cancellation. + // When CMD_UNLINK arrives and the entry exists, the UNLINK handler stores + // its response header here instead of sending RET_UNLINK immediately. + // The spawn_blocking task then sends RET_UNLINK *after* RET_SUBMIT (if + // any), guaranteeing the wire ordering the kernel requires. + struct InFlightUrb { + cancel: CancellationToken, + /// Populated by the UNLINK handler; drained by spawn_blocking. + unlink_header: Option, + } + let in_flight: Arc>> = Arc::new(std::sync::Mutex::new(HashMap::new())); // Compute expected devid for validation: the Linux kernel performs this @@ -575,15 +585,34 @@ pub async fn handle_urb_loop { if tx.blocking_send(bytes).is_err() { @@ -703,6 +736,25 @@ pub async fn handle_urb_loop warn!("Failed to serialize response for seqnum={seqnum}: {e}"), } + + // Now remove from in_flight. Any RET_UNLINK we send after + // this point is guaranteed to follow RET_SUBMIT in the + // channel (same sender thread → FIFO). If an UNLINK stored + // its header between our cancel-check and this remove, we + // pick it up here and send the deferred RET_UNLINK. + let late_unlink = in_flight_ref.lock().unwrap_or_else(|e| e.into_inner()) + .remove(&seqnum) + .and_then(|e| e.unlink_header); + if let Some(hdr) = late_unlink { + let unlink_res = UsbIpResponse::UsbIpRetUnlink { + header: hdr, + status: (-104i32) as u32, // ECONNRESET + }; + match unlink_res.to_bytes() { + Ok(bytes) => { let _ = tx.blocking_send(bytes); } + Err(e) => warn!("Failed to serialize late UNLINK response: {e}"), + } + } }); } UsbIpCommand::UsbIpCmdUnlink { @@ -735,34 +787,38 @@ pub async fn handle_urb_loop { - if response_tx.send(bytes).await.is_err() { - break Err(std::io::Error::other("Response channel closed")); - } + } else { + false + } + }; + + if !found { + // URB already completed and removed from in_flight. + // Its RET_SUBMIT is already in the channel, so our + // RET_UNLINK will follow it — safe to send now. + trace!("UNLINK: seqnum={unlink_seqnum} not found in-flight (already completed)"); + let res = UsbIpResponse::UsbIpRetUnlink { + header: header.clone(), + status: (-2i32) as u32, // ENOENT + }; + match res.to_bytes() { + Ok(bytes) => { + if response_tx.send(bytes).await.is_err() { + break Err(std::io::Error::other("Response channel closed")); + } + } + Err(e) => warn!("Failed to serialize UNLINK response: {e}"), } - Err(e) => warn!("Failed to serialize UNLINK response: {e}"), } } _ => {