Replace Eventfd with EventNotifier/EventConsumer

Eventfd is Linux-specific. To support more platforms, we replace it with
the EventNotifier/EventConsumer abstractions.
EventSender and EventReceiver are wrappers that encapsulate eventfd functionality
Use pipefd to replace eventfd in the test.

Signed-off-by: Wenyu Huang <huangwenyuu@outlook.com>
This commit is contained in:
Wenyu Huang 2025-07-06 13:22:37 +00:00 committed by Stefano Garzarella
parent e869735f3f
commit 18bf2a5613
6 changed files with 70 additions and 59 deletions

View file

@ -4,6 +4,8 @@
### Added
### Changed
- [[#308](https://github.com/rust-vmm/vhost/pull/308)] Replace Eventfd with EventNotifier/EventConsumer.
### Deprecated
### Fixed

View file

@ -30,7 +30,7 @@ use vhost::vhost_user::message::{
use vhost::vhost_user::Backend;
use vm_memory::bitmap::Bitmap;
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;
use vmm_sys_util::event::{EventConsumer, EventNotifier};
use vhost::vhost_user::GpuBackend;
@ -132,7 +132,8 @@ pub trait VhostUserBackend: Send + Sync {
///
/// The returned `EventFd` will be monitored for IO events. When the
/// returned EventFd is written to, the worker thread will exit.
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
// TODO: Refine this API to return only EventNotifier.
fn exit_event(&self, _thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
None
}
@ -275,7 +276,8 @@ pub trait VhostUserBackendMut: Send + Sync {
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
/// the worker thread will exit.
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
// TODO: Refine this API to return only EventNotifier.
fn exit_event(&self, _thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
None
}
@ -382,7 +384,7 @@ impl<T: VhostUserBackend> VhostUserBackend for Arc<T> {
self.deref().queues_per_thread()
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
fn exit_event(&self, thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
self.deref().exit_event(thread_index)
}
@ -471,7 +473,7 @@ impl<T: VhostUserBackendMut> VhostUserBackend for Mutex<T> {
self.lock().unwrap().queues_per_thread()
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
fn exit_event(&self, thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
self.lock().unwrap().exit_event(thread_index)
}
@ -563,7 +565,7 @@ impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> {
self.read().unwrap().queues_per_thread()
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
fn exit_event(&self, thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
self.read().unwrap().exit_event(thread_index)
}
@ -599,16 +601,16 @@ impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> {
pub mod tests {
use super::*;
use crate::VringRwLock;
use libc::EFD_NONBLOCK;
use std::sync::Mutex;
use uuid::Uuid;
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::event::{new_event_consumer_and_notifier, EventFlag};
pub struct MockVhostBackend {
events: u64,
event_idx: bool,
acked_features: u64,
exit_event_fds: Vec<EventFd>,
exit_event_fds: Vec<(EventConsumer, EventNotifier)>,
}
impl MockVhostBackend {
@ -624,7 +626,10 @@ pub mod tests {
// order to allow tests maximum flexibility in checking whether
// signals arrived or not.
backend.exit_event_fds = (0..backend.queues_per_thread().len())
.map(|_| EventFd::new(EFD_NONBLOCK).unwrap())
.map(|_| {
new_event_consumer_and_notifier(EventFlag::NONBLOCK)
.expect("Failed to new EventNotifier and EventConsumer")
})
.collect();
backend
@ -695,13 +700,13 @@ pub mod tests {
vec![1, 1]
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
Some(
self.exit_event_fds
.get(thread_index)?
.try_clone()
.expect("Could not clone exit eventfd"),
)
fn exit_event(&self, thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
self.exit_event_fds.get(thread_index).map(|(s, r)| {
(
s.try_clone().expect("Failed to clone EventConsumer"),
r.try_clone().expect("Failed to clone EventNotifier"),
)
})
}
fn handle_event(

View file

@ -6,10 +6,11 @@
use std::fmt::{Display, Formatter};
use std::io::{self, Result};
use std::marker::PhantomData;
use std::os::fd::IntoRawFd;
use std::os::unix::io::{AsRawFd, RawFd};
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::eventfd::EventFd;
use vmm_sys_util::event::EventNotifier;
use super::backend::VhostUserBackend;
use super::vring::VringT;
@ -61,7 +62,7 @@ pub struct VringEpollHandler<T: VhostUserBackend> {
backend: T,
vrings: Vec<T::Vring>,
thread_id: usize,
exit_event_fd: Option<EventFd>,
exit_event_fd: Option<EventNotifier>,
phantom: PhantomData<T::Bitmap>,
}
@ -69,7 +70,7 @@ impl<T: VhostUserBackend> VringEpollHandler<T> {
/// Send `exit event` to break the event loop.
pub fn send_exit_event(&self) {
if let Some(eventfd) = self.exit_event_fd.as_ref() {
let _ = eventfd.write(1);
let _ = eventfd.notify();
}
}
}
@ -87,16 +88,19 @@ where
let epoll = Epoll::new().map_err(VringEpollError::EpollCreateFd)?;
let exit_event_fd = backend.exit_event(thread_id);
if let Some(exit_event_fd) = &exit_event_fd {
let exit_event_fd = if let Some((consumer, notifier)) = exit_event_fd {
let id = backend.num_queues();
epoll
.ctl(
ControlOperation::Add,
exit_event_fd.as_raw_fd(),
consumer.into_raw_fd(),
EpollEvent::new(EventSet::IN, id as u64),
)
.map_err(VringEpollError::RegisterExitEvent)?;
}
Some(notifier)
} else {
None
};
Ok(VringEpollHandler {
epoll,
@ -230,7 +234,7 @@ mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::eventfd::EventFd;
use vmm_sys_util::event::{new_event_consumer_and_notifier, EventFlag};
#[test]
fn test_vring_epoll_handler() {
@ -242,29 +246,29 @@ mod tests {
let handler = VringEpollHandler::new(backend, vec![vring], 0x1).unwrap();
let eventfd = EventFd::new(0).unwrap();
let (consumer, _notifier) = new_event_consumer_and_notifier(EventFlag::empty()).unwrap();
handler
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.register_listener(consumer.as_raw_fd(), EventSet::IN, 3)
.unwrap();
// Register an already registered fd.
handler
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.register_listener(consumer.as_raw_fd(), EventSet::IN, 3)
.unwrap_err();
// Register an invalid data.
handler
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 1)
.register_listener(consumer.as_raw_fd(), EventSet::IN, 1)
.unwrap_err();
handler
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.unregister_listener(consumer.as_raw_fd(), EventSet::IN, 3)
.unwrap();
// unregister an already unregistered fd.
handler
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.unregister_listener(consumer.as_raw_fd(), EventSet::IN, 3)
.unwrap_err();
// unregister an invalid data.
handler
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 1)
.unregister_listener(consumer.as_raw_fd(), EventSet::IN, 1)
.unwrap_err();
// Check we retrieve the correct file descriptor
assert_eq!(handler.as_raw_fd(), handler.epoll.as_raw_fd());

View file

@ -350,7 +350,7 @@ mod tests {
let fd = backend.exit_event(thread_id).unwrap();
// Reading from exit fd should fail since nothing was written yet
assert_eq!(
fd.read().unwrap_err().raw_os_error().unwrap(),
fd.0.consume().unwrap_err().raw_os_error().unwrap(),
EAGAIN,
"exit event should not have been raised yet!"
);
@ -365,7 +365,7 @@ mod tests {
let backend = backend.lock().unwrap();
for thread_id in 0..backend.queues_per_thread().len() {
let fd = backend.exit_event(thread_id).unwrap();
assert!(fd.read().is_ok(), "No exit event was raised!");
assert!(fd.0.consume().is_ok(), "No exit event was raised!");
}
}
}

View file

@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuar
use virtio_queue::{Error as VirtQueError, Queue, QueueT};
use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::eventfd::EventFd;
use vmm_sys_util::event::{EventConsumer, EventNotifier};
/// Trait for objects returned by `VringT::get_ref()`.
pub trait VringStateGuard<'a, M: GuestAddressSpace> {
@ -109,9 +109,9 @@ pub trait VringT<M: GuestAddressSpace>:
/// object for single-threaded context.
pub struct VringState<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
queue: Queue,
kick: Option<EventFd>,
call: Option<EventFd>,
err: Option<EventFd>,
kick: Option<EventConsumer>,
call: Option<EventNotifier>,
err: Option<EventConsumer>,
enabled: bool,
mem: M,
}
@ -148,7 +148,7 @@ impl<M: GuestAddressSpace> VringState<M> {
/// Notify the vhost-user frontend that used descriptors have been put into the used queue.
pub fn signal_used_queue(&self) -> io::Result<()> {
if let Some(call) = self.call.as_ref() {
call.write(1)
call.notify()
} else {
Ok(())
}
@ -227,7 +227,7 @@ impl<M: GuestAddressSpace> VringState<M> {
}
/// Get the `EventFd` for kick.
pub fn get_kick(&self) -> &Option<EventFd> {
pub fn get_kick(&self) -> &Option<EventConsumer> {
&self.kick
}
@ -237,13 +237,13 @@ impl<M: GuestAddressSpace> VringState<M> {
// EventFd requires that it has sole ownership of its fd. So does File, so this is safe.
// Ideally, we'd have a generic way to refer to a uniquely-owned fd, such as that proposed
// by Rust RFC #3128.
self.kick = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
self.kick = file.map(|f| unsafe { EventConsumer::from_raw_fd(f.into_raw_fd()) });
}
/// Read event from the kick `EventFd`.
fn read_kick(&self) -> io::Result<bool> {
if let Some(kick) = &self.kick {
kick.read()?;
kick.consume()?;
}
Ok(self.enabled)
@ -252,18 +252,18 @@ impl<M: GuestAddressSpace> VringState<M> {
/// Set `EventFd` for call.
fn set_call(&mut self, file: Option<File>) {
// SAFETY: see comment in set_kick()
self.call = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
self.call = file.map(|f| unsafe { EventNotifier::from_raw_fd(f.into_raw_fd()) });
}
/// Get the `EventFd` for call.
pub fn get_call(&self) -> &Option<EventFd> {
pub fn get_call(&self) -> &Option<EventNotifier> {
&self.call
}
/// Set `EventFd` for err.
fn set_err(&mut self, file: Option<File>) {
// SAFETY: see comment in set_kick()
self.err = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
self.err = file.map(|f| unsafe { EventConsumer::from_raw_fd(f.into_raw_fd()) });
}
}
@ -500,9 +500,8 @@ impl<M: 'static + GuestAddressSpace> VringT<M> for VringRwLock<M> {
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::io::AsRawFd;
use vm_memory::bitmap::AtomicBitmap;
use vmm_sys_util::eventfd::EventFd;
use vmm_sys_util::event::{new_event_consumer_and_notifier, EventFlag};
#[test]
fn test_new_vring() {
@ -549,37 +548,34 @@ mod tests {
vring.set_enabled(true);
assert!(vring.get_ref().enabled);
let eventfd = EventFd::new(0).unwrap();
let (consumer, notifier) = new_event_consumer_and_notifier(EventFlag::empty()).unwrap();
// SAFETY: Safe because we panic before if eventfd is not valid.
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
let file = unsafe { File::from_raw_fd(consumer.into_raw_fd()) };
assert!(vring.get_mut().kick.is_none());
assert!(vring.read_kick().unwrap());
vring.set_kick(Some(file));
eventfd.write(1).unwrap();
notifier.notify().unwrap();
assert!(vring.read_kick().unwrap());
assert!(vring.get_ref().kick.is_some());
vring.set_kick(None);
assert!(vring.get_ref().kick.is_none());
std::mem::forget(eventfd);
let eventfd = EventFd::new(0).unwrap();
let (_consumer, notifier) = new_event_consumer_and_notifier(EventFlag::empty()).unwrap();
// SAFETY: Safe because we panic before if eventfd is not valid.
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
let file = unsafe { File::from_raw_fd(notifier.into_raw_fd()) };
assert!(vring.get_ref().call.is_none());
vring.set_call(Some(file));
assert!(vring.get_ref().call.is_some());
vring.set_call(None);
assert!(vring.get_ref().call.is_none());
std::mem::forget(eventfd);
let eventfd = EventFd::new(0).unwrap();
let (consumer, _notifier) = new_event_consumer_and_notifier(EventFlag::empty()).unwrap();
// SAFETY: Safe because we panic before if eventfd is not valid.
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
let file = unsafe { File::from_raw_fd(consumer.into_raw_fd()) };
assert!(vring.get_ref().err.is_none());
vring.set_err(Some(file));
assert!(vring.get_ref().err.is_some());
vring.set_err(None);
assert!(vring.get_ref().err.is_none());
std::mem::forget(eventfd);
}
}

View file

@ -18,6 +18,9 @@ use vm_memory::{
FileOffset, GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap,
};
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::event::{
new_event_consumer_and_notifier, EventConsumer, EventFlag, EventNotifier,
};
use vmm_sys_util::eventfd::EventFd;
struct MockVhostBackend {
@ -105,10 +108,11 @@ impl VhostUserBackendMut for MockVhostBackend {
vec![1, 1]
}
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
let event_fd = EventFd::new(0).unwrap();
Some(event_fd)
fn exit_event(&self, _thread_index: usize) -> Option<(EventConsumer, EventNotifier)> {
Some(
new_event_consumer_and_notifier(EventFlag::empty())
.expect("Failed to create EventConsumer and EventNotifier"),
)
}
fn handle_event(