diff --git a/src/backend.rs b/src/backend.rs index 7d14144..14c4fe4 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -3,9 +3,25 @@ // // SPDX-License-Identifier: Apache-2.0 +//! Traits for vhost user backend servers to implement virtio data plain services. +//! +//! Define two traits for vhost user backend servers to implement virtio data plane services. +//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is +//! designed with interior mutability, so the implementor may choose the suitable way to protect +//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior +//! mutability, and an implementation of: +//! ```ignore +//! impl VhostUserBackend for RwLock { } +//! ``` +//! is provided for convenience. +//! +//! [VhostUserBackend]: trait.VhostUserBackend.html +//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html + use std::io; +use std::ops::Deref; use std::result; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use vhost::vhost_user::message::VhostUserProtocolFeatures; use vhost::vhost_user::SlaveFsCacheReq; @@ -14,37 +30,82 @@ use vmm_sys_util::eventfd::EventFd; use super::Vring; -/// This trait must be implemented by the caller in order to provide backend -/// specific implementation. +/// Trait with interior mutability for vhost user backend servers to implement concrete services. +/// +/// To support multi-threading and asynchronous IO, we enforce `the Send + Sync + 'static`. +/// So there's no plan for support of "Rc" and "RefCell". pub trait VhostUserBackend: Send + Sync + 'static { - /// Number of queues. + /// Get number of queues supported. fn num_queues(&self) -> usize; - /// Depth of each queue. + /// Get maximum queue size supported. fn max_queue_size(&self) -> usize; - /// Available virtio features. + /// Get available virtio features. fn features(&self) -> u64; - /// Acked virtio features. - fn acked_features(&mut self, _features: u64) {} + /// Set acknowledged virtio features. + fn acked_features(&self, _features: u64) {} - /// Virtio protocol features. + /// Get available vhost protocol features. fn protocol_features(&self) -> VhostUserProtocolFeatures; - /// Tell the backend if EVENT_IDX has been negotiated. - fn set_event_idx(&mut self, enabled: bool); + /// Enable or disable the virtio EVENT_IDX feature + fn set_event_idx(&self, enabled: bool); + + /// Get virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn get_config(&self, _offset: u32, _size: u32) -> Vec { + Vec::new() + } + + /// Set virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn set_config(&self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> { + Ok(()) + } /// Update guest memory regions. fn update_memory( - &mut self, + &self, atomic_mem: GuestMemoryAtomic, ) -> result::Result<(), io::Error>; - /// This function gets called if the backend registered some additional - /// listeners onto specific file descriptors. The library can handle - /// virtqueues on its own, but does not know what to do with events - /// happening on custom listeners. + /// Set handler for communicating with the master by the slave communication channel. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + /// + /// TODO: this interface is designed only for vhost-user-fs, it should be refined. + fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {} + + /// Get the map to map queue index to worker thread index. + /// + /// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0, + /// the following two queues will be handled by worker thread 1, and the last four queues will + /// be handled by worker thread 2. + fn queues_per_thread(&self) -> Vec { + vec![0xffff_ffff] + } + + /// Provide an optional exit EventFd for the specified worker thread. + /// + /// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO + /// events by using epoll with the specified `token`. When the returned EventFd is written to, + /// the worker thread will exit. + fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> { + None + } + + /// Handle IO events for backend registered file descriptors. + /// + /// This function gets called if the backend registered some additional listeners onto specific + /// file descriptors. The library can handle virtqueues on its own, but does not know what to + /// do with events happening on custom listeners. fn handle_event( &self, device_event: u16, @@ -52,35 +113,422 @@ pub trait VhostUserBackend: Send + Sync + 'static { vrings: &[Arc>], thread_id: usize, ) -> result::Result; +} + +/// Trait without interior mutability for vhost user backend servers to implement concrete services. +pub trait VhostUserBackendMut: Send + Sync + 'static { + /// Get number of queues supported. + fn num_queues(&self) -> usize; + + /// Get maximum queue size supported. + fn max_queue_size(&self) -> usize; + + /// Get available virtio features. + fn features(&self) -> u64; + + /// Set acknowledged virtio features. + fn acked_features(&mut self, _features: u64) {} + + /// Get available vhost protocol features. + fn protocol_features(&self) -> VhostUserProtocolFeatures; + + /// Enable or disable the virtio EVENT_IDX feature + fn set_event_idx(&mut self, enabled: bool); /// Get virtio device configuration. - /// A default implementation is provided as we cannot expect all backends - /// to implement this function. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. fn get_config(&self, _offset: u32, _size: u32) -> Vec { Vec::new() } /// Set virtio device configuration. - /// A default implementation is provided as we cannot expect all backends - /// to implement this function. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> { Ok(()) } - /// Provide an exit EventFd - /// When this EventFd is written to the worker thread will exit. An optional id may - /// also be provided, if it not provided then the exit event will be first event id - /// after the last queue - fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, Option)> { - None - } + /// Update guest memory regions. + fn update_memory( + &mut self, + atomic_mem: GuestMemoryAtomic, + ) -> result::Result<(), io::Error>; - /// Set slave fd. - /// A default implementation is provided as we cannot expect all backends - /// to implement this function. + /// Set handler for communicating with the master by the slave communication channel. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + /// + /// TODO: this interface is designed only for vhost-user-fs, it should be refined. fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + /// Get the map to map queue index to worker thread index. + /// + /// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0, + /// the following two queues will be handled by worker thread 1, and the last four queues will + /// be handled by worker thread 2. fn queues_per_thread(&self) -> Vec { vec![0xffff_ffff] } + + /// Provide an optional exit EventFd for the specified worker thread. + /// + /// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO + /// events by using epoll with the specified `token`. When the returned EventFd is written to, + /// the worker thread will exit. + fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> { + None + } + + /// Handle IO events for backend registered file descriptors. + /// + /// This function gets called if the backend registered some additional listeners onto specific + /// file descriptors. The library can handle virtqueues on its own, but does not know what to + /// do with events happening on custom listeners. + fn handle_event( + &mut self, + device_event: u16, + evset: epoll::Events, + vrings: &[Arc>], + thread_id: usize, + ) -> result::Result; +} + +impl VhostUserBackend for Arc { + fn num_queues(&self) -> usize { + self.deref().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.deref().max_queue_size() + } + + fn features(&self) -> u64 { + self.deref().features() + } + + fn acked_features(&self, features: u64) { + self.deref().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.deref().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.deref().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.deref().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> { + self.deref().set_config(offset, buf) + } + + fn update_memory( + &self, + atomic_mem: GuestMemoryAtomic, + ) -> Result<(), io::Error> { + self.deref().update_memory(atomic_mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.deref().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.deref().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> { + self.deref().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Arc>], + thread_id: usize, + ) -> Result { + self.deref() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +impl VhostUserBackend for Mutex { + fn num_queues(&self) -> usize { + self.lock().unwrap().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.lock().unwrap().max_queue_size() + } + + fn features(&self) -> u64 { + self.lock().unwrap().features() + } + + fn acked_features(&self, features: u64) { + self.lock().unwrap().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.lock().unwrap().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.lock().unwrap().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.lock().unwrap().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> { + self.lock().unwrap().set_config(offset, buf) + } + + fn update_memory( + &self, + atomic_mem: GuestMemoryAtomic, + ) -> Result<(), io::Error> { + self.lock().unwrap().update_memory(atomic_mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.lock().unwrap().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.lock().unwrap().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> { + self.lock().unwrap().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Arc>], + thread_id: usize, + ) -> Result { + self.lock() + .unwrap() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +impl VhostUserBackend for RwLock { + fn num_queues(&self) -> usize { + self.read().unwrap().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.read().unwrap().max_queue_size() + } + + fn features(&self) -> u64 { + self.read().unwrap().features() + } + + fn acked_features(&self, features: u64) { + self.write().unwrap().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.read().unwrap().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.write().unwrap().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.read().unwrap().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> { + self.write().unwrap().set_config(offset, buf) + } + + fn update_memory( + &self, + atomic_mem: GuestMemoryAtomic, + ) -> Result<(), io::Error> { + self.write().unwrap().update_memory(atomic_mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.write().unwrap().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.read().unwrap().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> { + self.read().unwrap().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Arc>], + thread_id: usize, + ) -> Result { + self.write() + .unwrap() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use epoll::Events; + use std::io::Error; + use std::sync::Mutex; + + struct MockVhostBackend { + events: u64, + event_idx: bool, + acked_features: u64, + } + + impl MockVhostBackend { + fn new() -> Self { + MockVhostBackend { + events: 0, + event_idx: false, + acked_features: 0, + } + } + } + + impl VhostUserBackendMut for MockVhostBackend { + fn num_queues(&self) -> usize { + 2 + } + + fn max_queue_size(&self) -> usize { + 256 + } + + fn features(&self) -> u64 { + 0xffff_ffff_ffff_ffff + } + + fn acked_features(&mut self, features: u64) { + self.acked_features = features; + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::all() + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + assert_eq!(offset, 0x200); + assert_eq!(size, 8); + + vec![0xa5u8; 8] + } + + fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<(), Error> { + assert_eq!(offset, 0x200); + assert_eq!(buf.len(), 8); + assert_eq!(buf, &[0xa5u8; 8]); + + Ok(()) + } + + fn update_memory( + &mut self, + _atomic_mem: GuestMemoryAtomic, + ) -> Result<(), Error> { + Ok(()) + } + + fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + fn queues_per_thread(&self) -> Vec { + vec![1, 1] + } + + fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> { + let event_fd = EventFd::new(0).unwrap(); + + Some((event_fd, 0x100)) + } + + fn handle_event( + &mut self, + _device_event: u16, + _evset: Events, + _vrings: &[Arc>], + _thread_id: usize, + ) -> Result { + self.events += 1; + + Ok(false) + } + } + + #[test] + fn test_new_mock_backend_mutex() { + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + + assert_eq!(backend.num_queues(), 2); + assert_eq!(backend.max_queue_size(), 256); + assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff); + assert_eq!( + backend.protocol_features(), + VhostUserProtocolFeatures::all() + ); + assert_eq!(backend.queues_per_thread(), [1, 1]); + + assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]); + backend.set_config(0x200, &vec![0xa5; 8]).unwrap(); + + backend.acked_features(0xffff); + assert_eq!(backend.lock().unwrap().acked_features, 0xffff); + + backend.set_event_idx(true); + assert_eq!(backend.lock().unwrap().event_idx, true); + } + + #[test] + fn test_new_mock_backend_rwlock() { + let backend = Arc::new(RwLock::new(MockVhostBackend::new())); + + assert_eq!(backend.num_queues(), 2); + assert_eq!(backend.max_queue_size(), 256); + assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff); + assert_eq!( + backend.protocol_features(), + VhostUserProtocolFeatures::all() + ); + assert_eq!(backend.queues_per_thread(), [1, 1]); + + assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]); + backend.set_config(0x200, &vec![0xa5; 8]).unwrap(); + + backend.acked_features(0xffff); + assert_eq!(backend.read().unwrap().acked_features, 0xffff); + + backend.set_event_idx(true); + assert_eq!(backend.read().unwrap().event_idx, true); + } } diff --git a/src/lib.rs b/src/lib.rs index f7276f0..81d23d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,8 +34,8 @@ use vm_memory::{ }; use vmm_sys_util::eventfd::EventFd; -mod backend; -pub use backend::VhostUserBackend; +pub mod backend; +pub use backend::{VhostUserBackend, VhostUserBackendMut}; const MAX_MEM_SLOTS: u64 = 32; @@ -73,14 +73,14 @@ pub struct VhostUserDaemon { main_thread: Option>>, } -impl VhostUserDaemon { +impl VhostUserDaemon { /// Create the daemon instance, providing the backend implementation of /// VhostUserBackend. /// Under the hood, this will start a dedicated thread responsible for /// listening onto registered event. Those events can be vring events or /// custom events from the backend, but they get to be registered later /// during the sequence. - pub fn new(name: String, backend: Arc>) -> Result { + pub fn new(name: String, backend: S) -> Result { let handler = Arc::new(Mutex::new( VhostUserHandler::new(backend).map_err(Error::NewVhostUserHandler)?, )); @@ -194,7 +194,7 @@ pub enum VringEpollHandlerError { type VringEpollHandlerResult = std::result::Result; struct VringEpollHandler { - backend: Arc>, + backend: S, vrings: Vec>>, exit_event_id: Option, thread_id: usize, @@ -225,8 +225,6 @@ impl VringEpollHandler { } self.backend - .read() - .unwrap() .handle_event(device_event, evset, &self.vrings, self.thread_id) .map_err(VringEpollHandlerError::HandleEventBackendHandling) } @@ -373,7 +371,7 @@ impl error::Error for VhostUserHandlerError {} type VhostUserHandlerResult = std::result::Result; struct VhostUserHandler { - backend: Arc>, + backend: S, workers: Vec>, owned: bool, features_acked: bool, @@ -388,11 +386,11 @@ struct VhostUserHandler { worker_threads: Vec>>, } -impl VhostUserHandler { - fn new(backend: Arc>) -> VhostUserHandlerResult { - let num_queues = backend.read().unwrap().num_queues(); - let max_queue_size = backend.read().unwrap().max_queue_size(); - let queues_per_thread = backend.read().unwrap().queues_per_thread(); +impl VhostUserHandler { + fn new(backend: S) -> VhostUserHandlerResult { + let num_queues = backend.num_queues(); + let max_queue_size = backend.max_queue_size(); + let queues_per_thread = backend.queues_per_thread(); let atomic_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); @@ -416,21 +414,19 @@ impl VhostUserHandler { let vring_worker = Arc::new(VringWorker { epoll_file }); let worker = vring_worker.clone(); - let exit_event_id = if let Some((exit_event_fd, exit_event_id)) = - backend.read().unwrap().exit_event(thread_id) - { - let exit_event_id = exit_event_id.unwrap_or(num_queues as u16); - worker - .register_listener( - exit_event_fd.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(exit_event_id), - ) - .map_err(VhostUserHandlerError::RegisterExitEvent)?; - Some(exit_event_id) - } else { - None - }; + let exit_event_id = + if let Some((exit_event_fd, exit_event_id)) = backend.exit_event(thread_id) { + worker + .register_listener( + exit_event_fd.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(exit_event_id), + ) + .map_err(VhostUserHandlerError::RegisterExitEvent)?; + Some(exit_event_id) + } else { + None + }; let mut thread_vrings: Vec>> = Vec::new(); for (index, vring) in vrings.iter().enumerate() { @@ -487,7 +483,7 @@ impl VhostUserHandler { } } -impl VhostUserSlaveReqHandlerMut for VhostUserHandler { +impl VhostUserSlaveReqHandlerMut for VhostUserHandler { fn set_owner(&mut self) -> VhostUserResult<()> { if self.owned { return Err(VhostUserError::InvalidOperation); @@ -505,11 +501,11 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { } fn get_features(&mut self) -> VhostUserResult { - Ok(self.backend.read().unwrap().features()) + Ok(self.backend.features()) } fn set_features(&mut self, features: u64) -> VhostUserResult<()> { - if (features & !self.backend.read().unwrap().features()) != 0 { + if (features & !self.backend.features()) != 0 { return Err(VhostUserError::InvalidParam); } @@ -529,16 +525,13 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { vring.write().unwrap().enabled = vring_enabled; } - self.backend - .write() - .unwrap() - .acked_features(self.acked_features); + self.backend.acked_features(self.acked_features); Ok(()) } fn get_protocol_features(&mut self) -> VhostUserResult { - Ok(self.backend.read().unwrap().protocol_features()) + Ok(self.backend.protocol_features()) } fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> { @@ -581,8 +574,6 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { self.atomic_mem.lock().unwrap().replace(mem); self.backend - .write() - .unwrap() .update_memory(self.atomic_mem.clone()) .map_err(|e| { VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) @@ -657,7 +648,7 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { .unwrap() .mut_queue() .set_event_idx(event_idx); - self.backend.write().unwrap().set_event_idx(event_idx); + self.backend.set_event_idx(event_idx); Ok(()) } @@ -783,7 +774,7 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { size: u32, _flags: VhostUserConfigFlags, ) -> VhostUserResult> { - Ok(self.backend.read().unwrap().get_config(offset, size)) + Ok(self.backend.get_config(offset, size)) } fn set_config( @@ -793,8 +784,6 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { _flags: VhostUserConfigFlags, ) -> VhostUserResult<()> { self.backend - .write() - .unwrap() .set_config(offset, buf) .map_err(VhostUserError::ReqHandlerError) } @@ -804,7 +793,7 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { vu_req.set_reply_ack_flag(true); } - self.backend.write().unwrap().set_slave_req_fd(vu_req); + self.backend.set_slave_req_fd(vu_req); } fn get_max_mem_slots(&mut self) -> VhostUserResult { @@ -838,8 +827,6 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { self.atomic_mem.lock().unwrap().replace(mem); self.backend - .write() - .unwrap() .update_memory(self.atomic_mem.clone()) .map_err(|e| { VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) @@ -866,8 +853,6 @@ impl VhostUserSlaveReqHandlerMut for VhostUserHandler { self.atomic_mem.lock().unwrap().replace(mem); self.backend - .write() - .unwrap() .update_memory(self.atomic_mem.clone()) .map_err(|e| { VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))