Take waker and drop EndpointInner reference before waking transfer

This fixes a race condition for users that drop the Endpoint immediately after a transfer
completes, and then might immediately try and re-open the endpoint. The event thread
still held an Arc reference after marking the transfer complete, and so the endpoint
was still considered in use and re-opening occasionally failed.

It also means the `Notify` lock is not held when calling the waker function, which fixes
a deadlock if it were to call back into endpoint methods.
This commit is contained in:
Kevin Mehall 2025-08-08 17:45:43 -06:00
parent 573126df43
commit bcfb2b8e89

View file

@ -26,6 +26,20 @@ pub enum NotifyState {
Thread(Thread),
}
impl NotifyState {
fn take(&mut self) -> Self {
std::mem::replace(self, NotifyState::None)
}
fn notify(self) {
match self {
NotifyState::None => {}
NotifyState::Waker(waker) => waker.wake(),
NotifyState::Thread(thread) => thread.unpark(),
}
}
}
impl AsRef<Notify> for Notify {
fn as_ref(&self) -> &Notify {
self
@ -69,12 +83,8 @@ impl Notify {
}
}
pub fn notify(&self) {
match &mut *self.state.lock().unwrap() {
NotifyState::None => {}
NotifyState::Waker(waker) => waker.wake_by_ref(),
NotifyState::Thread(thread) => thread.unpark(),
}
fn take_notify_state(&self) -> NotifyState {
self.state.lock().unwrap().take()
}
}
@ -222,9 +232,9 @@ impl<P> Drop for Pending<P> {
pub(crate) unsafe fn notify_completion<P>(transfer: *mut P) {
unsafe {
let transfer = transfer as *mut TransferInner<P>;
let notify = (*transfer).notify.clone();
let wake = (*transfer).notify.deref().as_ref().take_notify_state();
match (*transfer).state.swap(STATE_IDLE, Ordering::AcqRel) {
STATE_PENDING => (*notify).as_ref().notify(),
STATE_PENDING => wake.notify(),
STATE_ABANDONED => {
drop(Box::from_raw(transfer));
}