diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 2511b63..c624534 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -1,5 +1,13 @@ version: 2 updates: +- package-ecosystem: cargo + directory: "/" + schedule: + interval: weekly + open-pull-requests-limit: 3 + allow: + - dependency-type: direct + - dependency-type: indirect - package-ecosystem: gitsubmodule directory: "/" schedule: diff --git a/Cargo.toml b/Cargo.toml index e378a2d..14c18c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,4 +2,5 @@ members = [ "crates/vhost", + "crates/vhost-user-backend", ] diff --git a/coverage_config_x86_64.json b/coverage_config_x86_64.json index f6bd3c3..ea9c6dd 100644 --- a/coverage_config_x86_64.json +++ b/coverage_config_x86_64.json @@ -1,5 +1,5 @@ { - "coverage_score": 80.5, + "coverage_score": 83.3, "exclude_path": "vhost/src/vhost_kern/", "crate_features": "vhost/vhost-user-master,vhost/vhost-user-slave" } diff --git a/crates/vhost-user-backend/CHANGELOG.md b/crates/vhost-user-backend/CHANGELOG.md new file mode 100644 index 0000000..24c203d --- /dev/null +++ b/crates/vhost-user-backend/CHANGELOG.md @@ -0,0 +1,66 @@ +# Changelog +## [Unreleased] + +### Added + +### Changed + +### Fixed + +### Deprecated + +## v0.7.0 + +### Changed + +- Started using caret dependencies +- Updated dependency nix 0.24 -> 0.25 +- Updated depepdency log 0.4.6 -> 0.4.17 +- Updated dependency vhost 0.4 -> 0.5 +- Updated dependency virtio-queue 0.5.0 -> 0.6 +- Updated dependency vm-memory 0.7 -> 0.9 + +## v0.6.0 + +### Changed + +- Moved to rust-vmm/virtio-queue v0.5.0 + +### Fixed + +- Fixed vring initialization logic + +## v0.5.1 + +### Changed +- Moved to rust-vmm/vmm-sys-util 0.10.0 + +## v0.5.0 + +### Changed + +- Moved to rust-vmm/virtio-queue v0.4.0 + +## v0.4.0 + +### Changed + +- Moved to rust-vmm/virtio-queue v0.3.0 +- Relaxed rust-vmm/vm-memory dependency to require ">=0.7" + +## v0.3.0 + +### Changed + +- Moved to rust-vmm/vhost v0.4.0 + +## v0.2.0 + +### Added + +- Ability to run the daemon as a client +- VringEpollHandler implements AsRawFd + +## v0.1.0 + +First release diff --git a/crates/vhost-user-backend/Cargo.toml b/crates/vhost-user-backend/Cargo.toml new file mode 100644 index 0000000..7d9fee2 --- /dev/null +++ b/crates/vhost-user-backend/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "vhost-user-backend" +version = "0.7.0" +authors = ["The Cloud Hypervisor Authors"] +keywords = ["vhost-user", "virtio"] +description = "A framework to build vhost-user backend service daemon" +edition = "2018" +license = "Apache-2.0" + +[dependencies] +libc = "0.2.39" +log = "0.4.17" +vhost = { version = "0.5", features = ["vhost-user-slave"] } +virtio-bindings = "0.1" +virtio-queue = "0.6" +vm-memory = { version = "0.9", features = ["backend-mmap", "backend-atomic"] } +vmm-sys-util = "0.10" + +[dev-dependencies] +nix = "0.25" +vhost = { version = "0.5", features = ["vhost-user-master", "vhost-user-slave"] } +vm-memory = { version = "0.9", features = ["backend-mmap", "backend-atomic", "backend-bitmap"] } +tempfile = "3.2.0" diff --git a/crates/vhost-user-backend/README.md b/crates/vhost-user-backend/README.md new file mode 100644 index 0000000..c9b7c92 --- /dev/null +++ b/crates/vhost-user-backend/README.md @@ -0,0 +1,105 @@ +# vhost-user-backend + +## Design + +The `vhost-user-backend` crate provides a framework to implement `vhost-user` backend services, +which includes following external public APIs: +- A daemon control object (`VhostUserDaemon`) to start and stop the service daemon. +- A vhost-user backend trait (`VhostUserBackendMut`) to handle vhost-user control messages and virtio + messages. +- A vring access trait (`VringT`) to access virtio queues, and three implementations of the trait: + `VringState`, `VringMutex` and `VringRwLock`. + +## Usage +The `vhost-user-backend` crate provides a framework to implement vhost-user backend services. The main interface provided by `vhost-user-backend` library is the `struct VhostUserDaemon`: +```rust +pub struct VhostUserDaemon +where + S: VhostUserBackend, + V: VringT> + Clone + Send + Sync + 'static, + B: Bitmap + 'static, +{ + pub fn new(name: String, backend: S, atomic_mem: GuestMemoryAtomic>) -> Result; + pub fn start(&mut self, listener: Listener) -> Result<()>; + pub fn wait(&mut self) -> Result<()>; + pub fn get_epoll_handlers(&self) -> Vec>>; +} +``` + +### Create a `VhostUserDaemon` Instance +The `VhostUserDaemon::new()` creates an instance of `VhostUserDaemon` object. The client needs to +pass in an `VhostUserBackend` object, which will be used to configure the `VhostUserDaemon` +instance, handle control messages from the vhost-user master and handle virtio requests from +virtio queues. A group of working threads will be created to handle virtio requests from configured +virtio queues. + +### Start the `VhostUserDaemon` +The `VhostUserDaemon::start()` method waits for an incoming connection from the vhost-user masters +on the `listener`. Once a connection is ready, a main thread will be created to handle vhost-user +messages from the vhost-user master. + +### Stop the `VhostUserDaemon` +The `VhostUserDaemon::stop()` method waits for the main thread to exit. An exit event must be sent +to the main thread by writing to the `exit_event` EventFd before waiting for it to exit. + +### Threading Model +The main thread and virtio queue working threads will concurrently access the underlying virtio +queues, so all virtio queue in multi-threading model. But the main thread only accesses virtio +queues for configuration, so client could adopt locking policies to optimize for the virtio queue +working threads. + +## Example +Example code to handle virtio messages from a virtio queue: +```rust +impl VhostUserBackendMut for VhostUserService { + fn process_queue(&mut self, vring: &VringMutex) -> Result { + let mut used_any = false; + let mem = match &self.mem { + Some(m) => m.memory(), + None => return Err(Error::NoMemoryConfigured), + }; + + let mut vring_state = vring.get_mut(); + + while let Some(avail_desc) = vring_state + .get_queue_mut() + .iter() + .map_err(|_| Error::IterateQueue)? + .next() + { + // Process the request... + + if self.event_idx { + if vring_state.add_used(head_index, 0).is_err() { + warn!("Couldn't return used descriptors to the ring"); + } + + match vring_state.needs_notification() { + Err(_) => { + warn!("Couldn't check if queue needs to be notified"); + vring_state.signal_used_queue().unwrap(); + } + Ok(needs_notification) => { + if needs_notification { + vring_state.signal_used_queue().unwrap(); + } + } + } + } else { + if vring_state.add_used(head_index, 0).is_err() { + warn!("Couldn't return used descriptors to the ring"); + } + vring_state.signal_used_queue().unwrap(); + } + } + + Ok(used_any) + } +} +``` + +## License + +This project is licensed under + +- [Apache License](http://www.apache.org/licenses/LICENSE-2.0), Version 2.0 diff --git a/crates/vhost-user-backend/src/backend.rs b/crates/vhost-user-backend/src/backend.rs new file mode 100644 index 0000000..23c6fa5 --- /dev/null +++ b/crates/vhost-user-backend/src/backend.rs @@ -0,0 +1,555 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Traits for vhost user backend servers to implement virtio data plain services. +//! +//! Define two traits for vhost user backend servers to implement virtio data plane services. +//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is +//! designed with interior mutability, so the implementor may choose the suitable way to protect +//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior +//! mutability, and an implementation of: +//! ```ignore +//! impl VhostUserBackend for RwLock { } +//! ``` +//! is provided for convenience. +//! +//! [VhostUserBackend]: trait.VhostUserBackend.html +//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html + +use std::io::Result; +use std::ops::Deref; +use std::sync::{Arc, Mutex, RwLock}; + +use vhost::vhost_user::message::VhostUserProtocolFeatures; +use vhost::vhost_user::SlaveFsCacheReq; +use vm_memory::bitmap::Bitmap; +use vmm_sys_util::epoll::EventSet; +use vmm_sys_util::eventfd::EventFd; + +use super::vring::VringT; +use super::GM; + +/// Trait with interior mutability for vhost user backend servers to implement concrete services. +/// +/// To support multi-threading and asynchronous IO, we enforce `Send + Sync` bound. +pub trait VhostUserBackend: Send + Sync +where + V: VringT>, + B: Bitmap + 'static, +{ + /// Get number of queues supported. + fn num_queues(&self) -> usize; + + /// Get maximum queue size supported. + fn max_queue_size(&self) -> usize; + + /// Get available virtio features. + fn features(&self) -> u64; + + /// Set acknowledged virtio features. + fn acked_features(&self, _features: u64) {} + + /// Get available vhost protocol features. + fn protocol_features(&self) -> VhostUserProtocolFeatures; + + /// Enable or disable the virtio EVENT_IDX feature + fn set_event_idx(&self, enabled: bool); + + /// Get virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn get_config(&self, _offset: u32, _size: u32) -> Vec { + Vec::new() + } + + /// Set virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn set_config(&self, _offset: u32, _buf: &[u8]) -> Result<()> { + Ok(()) + } + + /// Update guest memory regions. + fn update_memory(&self, mem: GM) -> Result<()>; + + /// Set handler for communicating with the master by the slave communication channel. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + /// + /// TODO: this interface is designed only for vhost-user-fs, it should be refined. + fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {} + + /// Get the map to map queue index to worker thread index. + /// + /// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0, + /// the following two queues will be handled by worker thread 1, and the last four queues will + /// be handled by worker thread 2. + fn queues_per_thread(&self) -> Vec { + vec![0xffff_ffff] + } + + /// Provide an optional exit EventFd for the specified worker thread. + /// + /// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO + /// events by using epoll with the specified `token`. When the returned EventFd is written to, + /// the worker thread will exit. + fn exit_event(&self, _thread_index: usize) -> Option { + None + } + + /// Handle IO events for backend registered file descriptors. + /// + /// This function gets called if the backend registered some additional listeners onto specific + /// file descriptors. The library can handle virtqueues on its own, but does not know what to + /// do with events happening on custom listeners. + fn handle_event( + &self, + device_event: u16, + evset: EventSet, + vrings: &[V], + thread_id: usize, + ) -> Result; +} + +/// Trait without interior mutability for vhost user backend servers to implement concrete services. +pub trait VhostUserBackendMut: Send + Sync +where + V: VringT>, + B: Bitmap + 'static, +{ + /// Get number of queues supported. + fn num_queues(&self) -> usize; + + /// Get maximum queue size supported. + fn max_queue_size(&self) -> usize; + + /// Get available virtio features. + fn features(&self) -> u64; + + /// Set acknowledged virtio features. + fn acked_features(&mut self, _features: u64) {} + + /// Get available vhost protocol features. + fn protocol_features(&self) -> VhostUserProtocolFeatures; + + /// Enable or disable the virtio EVENT_IDX feature + fn set_event_idx(&mut self, enabled: bool); + + /// Get virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn get_config(&self, _offset: u32, _size: u32) -> Vec { + Vec::new() + } + + /// Set virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> Result<()> { + Ok(()) + } + + /// Update guest memory regions. + fn update_memory(&mut self, mem: GM) -> Result<()>; + + /// Set handler for communicating with the master by the slave communication channel. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + /// + /// TODO: this interface is designed only for vhost-user-fs, it should be refined. + fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + /// Get the map to map queue index to worker thread index. + /// + /// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0, + /// the following two queues will be handled by worker thread 1, and the last four queues will + /// be handled by worker thread 2. + fn queues_per_thread(&self) -> Vec { + vec![0xffff_ffff] + } + + /// Provide an optional exit EventFd for the specified worker thread. + /// + /// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO + /// events by using epoll with the specified `token`. When the returned EventFd is written to, + /// the worker thread will exit. + fn exit_event(&self, _thread_index: usize) -> Option { + None + } + + /// Handle IO events for backend registered file descriptors. + /// + /// This function gets called if the backend registered some additional listeners onto specific + /// file descriptors. The library can handle virtqueues on its own, but does not know what to + /// do with events happening on custom listeners. + fn handle_event( + &mut self, + device_event: u16, + evset: EventSet, + vrings: &[V], + thread_id: usize, + ) -> Result; +} + +impl, V, B> VhostUserBackend for Arc +where + V: VringT>, + B: Bitmap + 'static, +{ + fn num_queues(&self) -> usize { + self.deref().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.deref().max_queue_size() + } + + fn features(&self) -> u64 { + self.deref().features() + } + + fn acked_features(&self, features: u64) { + self.deref().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.deref().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.deref().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.deref().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> { + self.deref().set_config(offset, buf) + } + + fn update_memory(&self, mem: GM) -> Result<()> { + self.deref().update_memory(mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.deref().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.deref().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option { + self.deref().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: EventSet, + vrings: &[V], + thread_id: usize, + ) -> Result { + self.deref() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +impl, V, B> VhostUserBackend for Mutex +where + V: VringT>, + B: Bitmap + 'static, +{ + fn num_queues(&self) -> usize { + self.lock().unwrap().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.lock().unwrap().max_queue_size() + } + + fn features(&self) -> u64 { + self.lock().unwrap().features() + } + + fn acked_features(&self, features: u64) { + self.lock().unwrap().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.lock().unwrap().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.lock().unwrap().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.lock().unwrap().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> { + self.lock().unwrap().set_config(offset, buf) + } + + fn update_memory(&self, mem: GM) -> Result<()> { + self.lock().unwrap().update_memory(mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.lock().unwrap().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.lock().unwrap().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option { + self.lock().unwrap().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: EventSet, + vrings: &[V], + thread_id: usize, + ) -> Result { + self.lock() + .unwrap() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +impl, V, B> VhostUserBackend for RwLock +where + V: VringT>, + B: Bitmap + 'static, +{ + fn num_queues(&self) -> usize { + self.read().unwrap().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.read().unwrap().max_queue_size() + } + + fn features(&self) -> u64 { + self.read().unwrap().features() + } + + fn acked_features(&self, features: u64) { + self.write().unwrap().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.read().unwrap().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.write().unwrap().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.read().unwrap().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> { + self.write().unwrap().set_config(offset, buf) + } + + fn update_memory(&self, mem: GM) -> Result<()> { + self.write().unwrap().update_memory(mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.write().unwrap().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.read().unwrap().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option { + self.read().unwrap().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: EventSet, + vrings: &[V], + thread_id: usize, + ) -> Result { + self.write() + .unwrap() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::VringRwLock; + use std::sync::Mutex; + use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; + + pub struct MockVhostBackend { + events: u64, + event_idx: bool, + acked_features: u64, + } + + impl MockVhostBackend { + pub fn new() -> Self { + MockVhostBackend { + events: 0, + event_idx: false, + acked_features: 0, + } + } + } + + impl VhostUserBackendMut for MockVhostBackend { + fn num_queues(&self) -> usize { + 2 + } + + fn max_queue_size(&self) -> usize { + 256 + } + + fn features(&self) -> u64 { + 0xffff_ffff_ffff_ffff + } + + fn acked_features(&mut self, features: u64) { + self.acked_features = features; + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::all() + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + assert_eq!(offset, 0x200); + assert_eq!(size, 8); + + vec![0xa5u8; 8] + } + + fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<()> { + assert_eq!(offset, 0x200); + assert_eq!(buf.len(), 8); + assert_eq!(buf, &[0xa5u8; 8]); + + Ok(()) + } + + fn update_memory(&mut self, _atomic_mem: GuestMemoryAtomic) -> Result<()> { + Ok(()) + } + + fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + fn queues_per_thread(&self) -> Vec { + vec![1, 1] + } + + fn exit_event(&self, _thread_index: usize) -> Option { + let event_fd = EventFd::new(0).unwrap(); + + Some(event_fd) + } + + fn handle_event( + &mut self, + _device_event: u16, + _evset: EventSet, + _vrings: &[VringRwLock], + _thread_id: usize, + ) -> Result { + self.events += 1; + + Ok(false) + } + } + + #[test] + fn test_new_mock_backend_mutex() { + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + + assert_eq!(backend.num_queues(), 2); + assert_eq!(backend.max_queue_size(), 256); + assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff); + assert_eq!( + backend.protocol_features(), + VhostUserProtocolFeatures::all() + ); + assert_eq!(backend.queues_per_thread(), [1, 1]); + + assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]); + backend.set_config(0x200, &[0xa5; 8]).unwrap(); + + backend.acked_features(0xffff); + assert_eq!(backend.lock().unwrap().acked_features, 0xffff); + + backend.set_event_idx(true); + assert!(backend.lock().unwrap().event_idx); + + let _ = backend.exit_event(0).unwrap(); + + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + backend.update_memory(mem).unwrap(); + } + + #[test] + fn test_new_mock_backend_rwlock() { + let backend = Arc::new(RwLock::new(MockVhostBackend::new())); + + assert_eq!(backend.num_queues(), 2); + assert_eq!(backend.max_queue_size(), 256); + assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff); + assert_eq!( + backend.protocol_features(), + VhostUserProtocolFeatures::all() + ); + assert_eq!(backend.queues_per_thread(), [1, 1]); + + assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]); + backend.set_config(0x200, &[0xa5; 8]).unwrap(); + + backend.acked_features(0xffff); + assert_eq!(backend.read().unwrap().acked_features, 0xffff); + + backend.set_event_idx(true); + assert!(backend.read().unwrap().event_idx); + + let _ = backend.exit_event(0).unwrap(); + + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + backend.update_memory(mem.clone()).unwrap(); + + let vring = VringRwLock::new(mem, 0x1000).unwrap(); + backend + .handle_event(0x1, EventSet::IN, &[vring], 0) + .unwrap(); + } +} diff --git a/crates/vhost-user-backend/src/event_loop.rs b/crates/vhost-user-backend/src/event_loop.rs new file mode 100644 index 0000000..db19781 --- /dev/null +++ b/crates/vhost-user-backend/src/event_loop.rs @@ -0,0 +1,281 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::fmt::{Display, Formatter}; +use std::io::{self, Result}; +use std::marker::PhantomData; +use std::os::unix::io::{AsRawFd, RawFd}; + +use vm_memory::bitmap::Bitmap; +use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; +use vmm_sys_util::eventfd::EventFd; + +use super::backend::VhostUserBackend; +use super::vring::VringT; +use super::GM; + +/// Errors related to vring epoll event handling. +#[derive(Debug)] +pub enum VringEpollError { + /// Failed to create epoll file descriptor. + EpollCreateFd(io::Error), + /// Failed while waiting for events. + EpollWait(io::Error), + /// Could not register exit event + RegisterExitEvent(io::Error), + /// Failed to read the event from kick EventFd. + HandleEventReadKick(io::Error), + /// Failed to handle the event from the backend. + HandleEventBackendHandling(io::Error), +} + +impl Display for VringEpollError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + VringEpollError::EpollCreateFd(e) => write!(f, "cannot create epoll fd: {}", e), + VringEpollError::EpollWait(e) => write!(f, "failed to wait for epoll event: {}", e), + VringEpollError::RegisterExitEvent(e) => write!(f, "cannot register exit event: {}", e), + VringEpollError::HandleEventReadKick(e) => { + write!(f, "cannot read vring kick event: {}", e) + } + VringEpollError::HandleEventBackendHandling(e) => { + write!(f, "failed to handle epoll event: {}", e) + } + } + } +} + +impl std::error::Error for VringEpollError {} + +/// Result of vring epoll operations. +pub type VringEpollResult = std::result::Result; + +/// Epoll event handler to manage and process epoll events for registered file descriptor. +/// +/// The `VringEpollHandler` structure provides interfaces to: +/// - add file descriptors to be monitored by the epoll fd +/// - remove registered file descriptors from the epoll fd +/// - run the event loop to handle pending events on the epoll fd +pub struct VringEpollHandler { + epoll: Epoll, + backend: S, + vrings: Vec, + thread_id: usize, + exit_event_fd: Option, + phantom: PhantomData, +} + +impl VringEpollHandler { + /// Send `exit event` to break the event loop. + pub fn send_exit_event(&self) { + if let Some(eventfd) = self.exit_event_fd.as_ref() { + let _ = eventfd.write(1); + } + } +} + +impl VringEpollHandler +where + S: VhostUserBackend, + V: VringT>, + B: Bitmap + 'static, +{ + /// Create a `VringEpollHandler` instance. + pub(crate) fn new(backend: S, vrings: Vec, thread_id: usize) -> VringEpollResult { + let epoll = Epoll::new().map_err(VringEpollError::EpollCreateFd)?; + + let handler = match backend.exit_event(thread_id) { + Some(exit_event_fd) => { + let id = backend.num_queues(); + epoll + .ctl( + ControlOperation::Add, + exit_event_fd.as_raw_fd(), + EpollEvent::new(EventSet::IN, id as u64), + ) + .map_err(VringEpollError::RegisterExitEvent)?; + + VringEpollHandler { + epoll, + backend, + vrings, + thread_id, + exit_event_fd: Some(exit_event_fd), + phantom: PhantomData, + } + } + None => VringEpollHandler { + epoll, + backend, + vrings, + thread_id, + exit_event_fd: None, + phantom: PhantomData, + }, + }; + + Ok(handler) + } + + /// Register an event into the epoll fd. + /// + /// When this event is later triggered, the backend implementation of `handle_event` will be + /// called. + pub fn register_listener(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> { + // `data` range [0...num_queues] is reserved for queues and exit event. + if data <= self.backend.num_queues() as u64 { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } else { + self.register_event(fd, ev_type, data) + } + } + + /// Unregister an event from the epoll fd. + /// + /// If the event is triggered after this function has been called, the event will be silently + /// dropped. + pub fn unregister_listener(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> { + // `data` range [0...num_queues] is reserved for queues and exit event. + if data <= self.backend.num_queues() as u64 { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } else { + self.unregister_event(fd, ev_type, data) + } + } + + pub(crate) fn register_event(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> { + self.epoll + .ctl(ControlOperation::Add, fd, EpollEvent::new(ev_type, data)) + } + + pub(crate) fn unregister_event(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> { + self.epoll + .ctl(ControlOperation::Delete, fd, EpollEvent::new(ev_type, data)) + } + + /// Run the event poll loop to handle all pending events on registered fds. + /// + /// The event loop will be terminated once an event is received from the `exit event fd` + /// associated with the backend. + pub(crate) fn run(&self) -> VringEpollResult<()> { + const EPOLL_EVENTS_LEN: usize = 100; + let mut events = vec![EpollEvent::new(EventSet::empty(), 0); EPOLL_EVENTS_LEN]; + + 'epoll: loop { + let num_events = match self.epoll.wait(-1, &mut events[..]) { + Ok(res) => res, + Err(e) => { + if e.kind() == io::ErrorKind::Interrupted { + // It's well defined from the epoll_wait() syscall + // documentation that the epoll loop can be interrupted + // before any of the requested events occurred or the + // timeout expired. In both those cases, epoll_wait() + // returns an error of type EINTR, but this should not + // be considered as a regular error. Instead it is more + // appropriate to retry, by calling into epoll_wait(). + continue; + } + return Err(VringEpollError::EpollWait(e)); + } + }; + + for event in events.iter().take(num_events) { + let evset = match EventSet::from_bits(event.events) { + Some(evset) => evset, + None => { + let evbits = event.events; + println!("epoll: ignoring unknown event set: 0x{:x}", evbits); + continue; + } + }; + + let ev_type = event.data() as u16; + + // handle_event() returns true if an event is received from the exit event fd. + if self.handle_event(ev_type, evset)? { + break 'epoll; + } + } + } + + Ok(()) + } + + fn handle_event(&self, device_event: u16, evset: EventSet) -> VringEpollResult { + if self.exit_event_fd.is_some() && device_event as usize == self.backend.num_queues() { + return Ok(true); + } + + if (device_event as usize) < self.vrings.len() { + let vring = &self.vrings[device_event as usize]; + let enabled = vring + .read_kick() + .map_err(VringEpollError::HandleEventReadKick)?; + + // If the vring is not enabled, it should not be processed. + if !enabled { + return Ok(false); + } + } + + self.backend + .handle_event(device_event, evset, &self.vrings, self.thread_id) + .map_err(VringEpollError::HandleEventBackendHandling) + } +} + +impl AsRawFd for VringEpollHandler { + fn as_raw_fd(&self) -> RawFd { + self.epoll.as_raw_fd() + } +} + +#[cfg(test)] +mod tests { + use super::super::backend::tests::MockVhostBackend; + use super::super::vring::VringRwLock; + use super::*; + use std::sync::{Arc, Mutex}; + use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; + use vmm_sys_util::eventfd::EventFd; + + #[test] + fn test_vring_epoll_handler() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + let vring = VringRwLock::new(mem, 0x1000).unwrap(); + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + + let handler = VringEpollHandler::new(backend, vec![vring], 0x1).unwrap(); + + let eventfd = EventFd::new(0).unwrap(); + handler + .register_listener(eventfd.as_raw_fd(), EventSet::IN, 3) + .unwrap(); + // Register an already registered fd. + handler + .register_listener(eventfd.as_raw_fd(), EventSet::IN, 3) + .unwrap_err(); + // Register an invalid data. + handler + .register_listener(eventfd.as_raw_fd(), EventSet::IN, 1) + .unwrap_err(); + + handler + .unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3) + .unwrap(); + // unregister an already unregistered fd. + handler + .unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3) + .unwrap_err(); + // unregister an invalid data. + handler + .unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 1) + .unwrap_err(); + // Check we retrieve the correct file descriptor + assert_eq!(handler.as_raw_fd(), handler.epoll.as_raw_fd()); + } +} diff --git a/crates/vhost-user-backend/src/handler.rs b/crates/vhost-user-backend/src/handler.rs new file mode 100644 index 0000000..41f8107 --- /dev/null +++ b/crates/vhost-user-backend/src/handler.rs @@ -0,0 +1,601 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::error; +use std::fs::File; +use std::io; +use std::os::unix::io::AsRawFd; +use std::sync::Arc; +use std::thread; + +use vhost::vhost_user::message::{ + VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures, + VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags, + VhostUserVringState, +}; +use vhost::vhost_user::{ + Error as VhostUserError, Result as VhostUserResult, SlaveFsCacheReq, + VhostUserSlaveReqHandlerMut, +}; +use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; +use virtio_queue::{Error as VirtQueError, QueueT}; +use vm_memory::bitmap::Bitmap; +use vm_memory::mmap::NewBitmap; +use vm_memory::{ + FileOffset, GuestAddress, GuestAddressSpace, GuestMemoryMmap, GuestRegionMmap, MmapRegion, +}; +use vmm_sys_util::epoll::EventSet; + +use super::backend::VhostUserBackend; +use super::event_loop::VringEpollHandler; +use super::event_loop::{VringEpollError, VringEpollResult}; +use super::vring::VringT; +use super::GM; + +const MAX_MEM_SLOTS: u64 = 32; + +#[derive(Debug)] +/// Errors related to vhost-user handler. +pub enum VhostUserHandlerError { + /// Failed to create a `Vring`. + CreateVring(VirtQueError), + /// Failed to create vring worker. + CreateEpollHandler(VringEpollError), + /// Failed to spawn vring worker. + SpawnVringWorker(io::Error), + /// Could not find the mapping from memory regions. + MissingMemoryMapping, +} + +impl std::fmt::Display for VhostUserHandlerError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + VhostUserHandlerError::CreateVring(e) => { + write!(f, "failed to create vring: {}", e) + } + VhostUserHandlerError::CreateEpollHandler(e) => { + write!(f, "failed to create vring epoll handler: {}", e) + } + VhostUserHandlerError::SpawnVringWorker(e) => { + write!(f, "failed spawning the vring worker: {}", e) + } + VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"), + } + } +} + +impl error::Error for VhostUserHandlerError {} + +/// Result of vhost-user handler operations. +pub type VhostUserHandlerResult = std::result::Result; + +struct AddrMapping { + vmm_addr: u64, + size: u64, + gpa_base: u64, +} + +pub struct VhostUserHandler { + backend: S, + handlers: Vec>>, + owned: bool, + features_acked: bool, + acked_features: u64, + acked_protocol_features: u64, + num_queues: usize, + max_queue_size: usize, + queues_per_thread: Vec, + mappings: Vec, + atomic_mem: GM, + vrings: Vec, + worker_threads: Vec>>, +} + +// Ensure VhostUserHandler: Clone + Send + Sync + 'static. +impl VhostUserHandler +where + S: VhostUserBackend + Clone + 'static, + V: VringT> + Clone + Send + Sync + 'static, + B: Bitmap + Clone + Send + Sync + 'static, +{ + pub(crate) fn new(backend: S, atomic_mem: GM) -> VhostUserHandlerResult { + let num_queues = backend.num_queues(); + let max_queue_size = backend.max_queue_size(); + let queues_per_thread = backend.queues_per_thread(); + + let mut vrings = Vec::new(); + for _ in 0..num_queues { + let vring = V::new(atomic_mem.clone(), max_queue_size as u16) + .map_err(VhostUserHandlerError::CreateVring)?; + vrings.push(vring); + } + + let mut handlers = Vec::new(); + let mut worker_threads = Vec::new(); + for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() { + let mut thread_vrings = Vec::new(); + for (index, vring) in vrings.iter().enumerate() { + if (queues_mask >> index) & 1u64 == 1u64 { + thread_vrings.push(vring.clone()); + } + } + + let handler = Arc::new( + VringEpollHandler::new(backend.clone(), thread_vrings, thread_id) + .map_err(VhostUserHandlerError::CreateEpollHandler)?, + ); + let handler2 = handler.clone(); + let worker_thread = thread::Builder::new() + .name("vring_worker".to_string()) + .spawn(move || handler2.run()) + .map_err(VhostUserHandlerError::SpawnVringWorker)?; + + handlers.push(handler); + worker_threads.push(worker_thread); + } + + Ok(VhostUserHandler { + backend, + handlers, + owned: false, + features_acked: false, + acked_features: 0, + acked_protocol_features: 0, + num_queues, + max_queue_size, + queues_per_thread, + mappings: Vec::new(), + atomic_mem, + vrings, + worker_threads, + }) + } +} + +impl VhostUserHandler { + pub(crate) fn send_exit_event(&self) { + for handler in self.handlers.iter() { + handler.send_exit_event(); + } + } + + fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult { + for mapping in self.mappings.iter() { + if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size { + return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base); + } + } + + Err(VhostUserHandlerError::MissingMemoryMapping) + } +} + +impl VhostUserHandler +where + S: VhostUserBackend, + V: VringT>, + B: Bitmap, +{ + pub(crate) fn get_epoll_handlers(&self) -> Vec>> { + self.handlers.clone() + } + + fn vring_needs_init(&self, vring: &V) -> bool { + let vring_state = vring.get_ref(); + + // If the vring wasn't initialized and we already have an EventFd for + // VRING_KICK, initialize it now. + !vring_state.get_queue().ready() && vring_state.get_kick().is_some() + } + + fn initialize_vring(&self, vring: &V, index: u8) -> VhostUserResult<()> { + assert!(vring.get_ref().get_kick().is_some()); + + if let Some(fd) = vring.get_ref().get_kick() { + for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { + let shifted_queues_mask = queues_mask >> index; + if shifted_queues_mask & 1u64 == 1u64 { + let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); + self.handlers[thread_index] + .register_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx)) + .map_err(VhostUserError::ReqHandlerError)?; + break; + } + } + } + + self.vrings[index as usize].set_queue_ready(true); + + Ok(()) + } +} + +impl VhostUserSlaveReqHandlerMut for VhostUserHandler +where + S: VhostUserBackend, + V: VringT>, + B: NewBitmap + Clone, +{ + fn set_owner(&mut self) -> VhostUserResult<()> { + if self.owned { + return Err(VhostUserError::InvalidOperation("already claimed")); + } + self.owned = true; + Ok(()) + } + + fn reset_owner(&mut self) -> VhostUserResult<()> { + self.owned = false; + self.features_acked = false; + self.acked_features = 0; + self.acked_protocol_features = 0; + Ok(()) + } + + fn get_features(&mut self) -> VhostUserResult { + Ok(self.backend.features()) + } + + fn set_features(&mut self, features: u64) -> VhostUserResult<()> { + if (features & !self.backend.features()) != 0 { + return Err(VhostUserError::InvalidParam); + } + + self.acked_features = features; + self.features_acked = true; + + // If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated, + // the ring is initialized in an enabled state. + // If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated, + // the ring is initialized in a disabled state. Client must not + // pass data to/from the backend until ring is enabled by + // VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has + // been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0. + let vring_enabled = + self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0; + for vring in self.vrings.iter_mut() { + vring.set_enabled(vring_enabled); + } + + self.backend.acked_features(self.acked_features); + + Ok(()) + } + + fn set_mem_table( + &mut self, + ctx: &[VhostUserMemoryRegion], + files: Vec, + ) -> VhostUserResult<()> { + // We need to create tuple of ranges from the list of VhostUserMemoryRegion + // that we get from the caller. + let mut regions: Vec<(GuestAddress, usize, Option)> = Vec::new(); + let mut mappings: Vec = Vec::new(); + + for (region, file) in ctx.iter().zip(files) { + let g_addr = GuestAddress(region.guest_phys_addr); + let len = region.memory_size as usize; + let f_off = FileOffset::new(file, region.mmap_offset); + + regions.push((g_addr, len, Some(f_off))); + mappings.push(AddrMapping { + vmm_addr: region.user_addr, + size: region.memory_size, + gpa_base: region.guest_phys_addr, + }); + } + + let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + // Updating the inner GuestMemory object here will cause all our vrings to + // see the new one the next time they call to `atomic_mem.memory()`. + self.atomic_mem.lock().unwrap().replace(mem); + + self.backend + .update_memory(self.atomic_mem.clone()) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + self.mappings = mappings; + + Ok(()) + } + + fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> { + if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size { + return Err(VhostUserError::InvalidParam); + } + self.vrings[index as usize].set_queue_size(num as u16); + Ok(()) + } + + fn set_vring_addr( + &mut self, + index: u32, + _flags: VhostUserVringAddrFlags, + descriptor: u64, + used: u64, + available: u64, + _log: u64, + ) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + if !self.mappings.is_empty() { + let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + let used_ring = self.vmm_va_to_gpa(used).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + self.vrings[index as usize] + .set_queue_info(desc_table, avail_ring, used_ring) + .map_err(|_| VhostUserError::InvalidParam)?; + Ok(()) + } else { + Err(VhostUserError::InvalidParam) + } + } + + fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> { + let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; + + self.vrings[index as usize].set_queue_next_avail(base as u16); + self.vrings[index as usize].set_queue_event_idx(event_idx); + self.backend.set_event_idx(event_idx); + + Ok(()) + } + + fn get_vring_base(&mut self, index: u32) -> VhostUserResult { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + // Quote from vhost-user specification: + // Client must start ring upon receiving a kick (that is, detecting + // that file descriptor is readable) on the descriptor specified by + // VHOST_USER_SET_VRING_KICK, and stop ring upon receiving + // VHOST_USER_GET_VRING_BASE. + self.vrings[index as usize].set_queue_ready(false); + if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() { + for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { + let shifted_queues_mask = queues_mask >> index; + if shifted_queues_mask & 1u64 == 1u64 { + let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); + self.handlers[thread_index] + .unregister_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx)) + .map_err(VhostUserError::ReqHandlerError)?; + break; + } + } + } + + self.vrings[index as usize].set_kick(None); + self.vrings[index as usize].set_call(None); + + // Strictly speaking, we should do this upon receiving the first kick, + // but it's actually easier to just do it here so we're ready in case + // the vring gets re-initialized by the guest. + self.vrings[index as usize] + .get_mut() + .get_queue_mut() + .reset(); + + let next_avail = self.vrings[index as usize].queue_next_avail(); + + Ok(VhostUserVringState::new(index, u32::from(next_avail))) + } + + fn set_vring_kick(&mut self, index: u8, file: Option) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + // SAFETY: EventFd requires that it has sole ownership of its fd. So + // does File, so this is safe. + // Ideally, we'd have a generic way to refer to a uniquely-owned fd, + // such as that proposed by Rust RFC #3128. + self.vrings[index as usize].set_kick(file); + + if self.vring_needs_init(&self.vrings[index as usize]) { + self.initialize_vring(&self.vrings[index as usize], index)?; + } + + Ok(()) + } + + fn set_vring_call(&mut self, index: u8, file: Option) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + self.vrings[index as usize].set_call(file); + + if self.vring_needs_init(&self.vrings[index as usize]) { + self.initialize_vring(&self.vrings[index as usize], index)?; + } + + Ok(()) + } + + fn set_vring_err(&mut self, index: u8, file: Option) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + self.vrings[index as usize].set_err(file); + + Ok(()) + } + + fn get_protocol_features(&mut self) -> VhostUserResult { + Ok(self.backend.protocol_features()) + } + + fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> { + // Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must + // support this message even before VHOST_USER_SET_FEATURES was + // called. + self.acked_protocol_features = features; + Ok(()) + } + + fn get_queue_num(&mut self) -> VhostUserResult { + Ok(self.num_queues as u64) + } + + fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> { + // This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES + // has been negotiated. + if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 { + return Err(VhostUserError::InvalidOperation( + "protocol features not set", + )); + } else if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + // Slave must not pass data to/from the backend until ring is + // enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1, + // or after it has been disabled by VHOST_USER_SET_VRING_ENABLE + // with parameter 0. + self.vrings[index as usize].set_enabled(enable); + + Ok(()) + } + + fn get_config( + &mut self, + offset: u32, + size: u32, + _flags: VhostUserConfigFlags, + ) -> VhostUserResult> { + Ok(self.backend.get_config(offset, size)) + } + + fn set_config( + &mut self, + offset: u32, + buf: &[u8], + _flags: VhostUserConfigFlags, + ) -> VhostUserResult<()> { + self.backend + .set_config(offset, buf) + .map_err(VhostUserError::ReqHandlerError) + } + + fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) { + if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 { + vu_req.set_reply_ack_flag(true); + } + + self.backend.set_slave_req_fd(vu_req); + } + + fn get_inflight_fd( + &mut self, + _inflight: &vhost::vhost_user::message::VhostUserInflight, + ) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> { + // Assume the backend hasn't negotiated the inflight feature; it + // wouldn't be correct for the backend to do so, as we don't (yet) + // provide a way for it to handle such requests. + Err(VhostUserError::InvalidOperation("not supported")) + } + + fn set_inflight_fd( + &mut self, + _inflight: &vhost::vhost_user::message::VhostUserInflight, + _file: File, + ) -> VhostUserResult<()> { + Err(VhostUserError::InvalidOperation("not supported")) + } + + fn get_max_mem_slots(&mut self) -> VhostUserResult { + Ok(MAX_MEM_SLOTS) + } + + fn add_mem_region( + &mut self, + region: &VhostUserSingleMemoryRegion, + file: File, + ) -> VhostUserResult<()> { + let mmap_region = MmapRegion::from_file( + FileOffset::new(file, region.mmap_offset), + region.memory_size as usize, + ) + .map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?; + let guest_region = Arc::new( + GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err( + |e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)), + )?, + ); + + let mem = self + .atomic_mem + .memory() + .insert_region(guest_region) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.atomic_mem.lock().unwrap().replace(mem); + + self.backend + .update_memory(self.atomic_mem.clone()) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.mappings.push(AddrMapping { + vmm_addr: region.user_addr, + size: region.memory_size, + gpa_base: region.guest_phys_addr, + }); + + Ok(()) + } + + fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> { + let (mem, _) = self + .atomic_mem + .memory() + .remove_region(GuestAddress(region.guest_phys_addr), region.memory_size) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.atomic_mem.lock().unwrap().replace(mem); + + self.backend + .update_memory(self.atomic_mem.clone()) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.mappings + .retain(|mapping| mapping.gpa_base != region.guest_phys_addr); + + Ok(()) + } +} + +impl Drop for VhostUserHandler { + fn drop(&mut self) { + // Signal all working threads to exit. + self.send_exit_event(); + + for thread in self.worker_threads.drain(..) { + if let Err(e) = thread.join() { + error!("Error in vring worker: {:?}", e); + } + } + } +} diff --git a/crates/vhost-user-backend/src/lib.rs b/crates/vhost-user-backend/src/lib.rs new file mode 100644 index 0000000..c65a19e --- /dev/null +++ b/crates/vhost-user-backend/src/lib.rs @@ -0,0 +1,270 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! A simple framework to run a vhost-user backend service. + +#[macro_use] +extern crate log; + +use std::fmt::{Display, Formatter}; +use std::sync::{Arc, Mutex}; +use std::thread; + +use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler}; +use vm_memory::bitmap::Bitmap; +use vm_memory::mmap::NewBitmap; +use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; + +use self::handler::VhostUserHandler; + +mod backend; +pub use self::backend::{VhostUserBackend, VhostUserBackendMut}; + +mod event_loop; +pub use self::event_loop::VringEpollHandler; + +mod handler; +pub use self::handler::VhostUserHandlerError; + +mod vring; +pub use self::vring::{ + VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT, +}; + +/// An alias for `GuestMemoryAtomic>` to simplify code. +type GM = GuestMemoryAtomic>; + +#[derive(Debug)] +/// Errors related to vhost-user daemon. +pub enum Error { + /// Failed to create a new vhost-user handler. + NewVhostUserHandler(VhostUserHandlerError), + /// Failed creating vhost-user slave listener. + CreateSlaveListener(VhostUserError), + /// Failed creating vhost-user slave handler. + CreateSlaveReqHandler(VhostUserError), + /// Failed starting daemon thread. + StartDaemon(std::io::Error), + /// Failed waiting for daemon thread. + WaitDaemon(std::boxed::Box), + /// Failed handling a vhost-user request. + HandleRequest(VhostUserError), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e), + Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e), + Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e), + Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e), + Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"), + Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e), + } + } +} + +/// Result of vhost-user daemon operations. +pub type Result = std::result::Result; + +/// Implement a simple framework to run a vhost-user service daemon. +/// +/// This structure is the public API the backend is allowed to interact with in order to run +/// a fully functional vhost-user daemon. +pub struct VhostUserDaemon { + name: String, + handler: Arc>>, + main_thread: Option>>, +} + +impl VhostUserDaemon +where + S: VhostUserBackend + Clone + 'static, + V: VringT> + Clone + Send + Sync + 'static, + B: NewBitmap + Clone + Send + Sync, +{ + /// Create the daemon instance, providing the backend implementation of `VhostUserBackend`. + /// + /// Under the hood, this will start a dedicated thread responsible for listening onto + /// registered event. Those events can be vring events or custom events from the backend, + /// but they get to be registered later during the sequence. + pub fn new( + name: String, + backend: S, + atomic_mem: GuestMemoryAtomic>, + ) -> Result { + let handler = Arc::new(Mutex::new( + VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?, + )); + + Ok(VhostUserDaemon { + name, + handler, + main_thread: None, + }) + } + + /// Run a dedicated thread handling all requests coming through the socket. + /// This runs in an infinite loop that should be terminating once the other + /// end of the socket (the VMM) hangs up. + /// + /// This function is the common code for starting a new daemon, no matter if + /// it acts as a client or a server. + fn start_daemon( + &mut self, + mut handler: SlaveReqHandler>>, + ) -> Result<()> { + let handle = thread::Builder::new() + .name(self.name.clone()) + .spawn(move || loop { + handler.handle_request().map_err(Error::HandleRequest)?; + }) + .map_err(Error::StartDaemon)?; + + self.main_thread = Some(handle); + + Ok(()) + } + + /// Connect to the vhost-user socket and run a dedicated thread handling + /// all requests coming through this socket. This runs in an infinite loop + /// that should be terminating once the other end of the socket (the VMM) + /// hangs up. + pub fn start_client(&mut self, socket_path: &str) -> Result<()> { + let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone()) + .map_err(Error::CreateSlaveReqHandler)?; + self.start_daemon(slave_handler) + } + + /// Listen to the vhost-user socket and run a dedicated thread handling all requests coming + /// through this socket. + /// + /// This runs in an infinite loop that should be terminating once the other end of the socket + /// (the VMM) disconnects. + // TODO: the current implementation has limitations that only one incoming connection will be + // handled from the listener. Should it be enhanced to support reconnection? + pub fn start(&mut self, listener: Listener) -> Result<()> { + let mut slave_listener = SlaveListener::new(listener, self.handler.clone()) + .map_err(Error::CreateSlaveListener)?; + let slave_handler = self.accept(&mut slave_listener)?; + self.start_daemon(slave_handler) + } + + fn accept( + &self, + slave_listener: &mut SlaveListener>>, + ) -> Result>>> { + loop { + match slave_listener.accept() { + Err(e) => return Err(Error::CreateSlaveListener(e)), + Ok(Some(v)) => return Ok(v), + Ok(None) => continue, + } + } + } + + /// Wait for the thread handling the vhost-user socket connection to terminate. + pub fn wait(&mut self) -> Result<()> { + if let Some(handle) = self.main_thread.take() { + match handle.join().map_err(Error::WaitDaemon)? { + Ok(()) => Ok(()), + Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()), + Err(e) => Err(e), + } + } else { + Ok(()) + } + } + + /// Retrieve the vring epoll handler. + /// + /// This is necessary to perform further actions like registering and unregistering some extra + /// event file descriptors. + pub fn get_epoll_handlers(&self) -> Vec>> { + // Do not expect poisoned lock. + self.handler.lock().unwrap().get_epoll_handlers() + } +} + +#[cfg(test)] +mod tests { + use super::backend::tests::MockVhostBackend; + use super::*; + use std::os::unix::net::{UnixListener, UnixStream}; + use std::sync::Barrier; + use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; + + #[test] + fn test_new_daemon() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); + + let handlers = daemon.get_epoll_handlers(); + assert_eq!(handlers.len(), 2); + + let barrier = Arc::new(Barrier::new(2)); + let tmpdir = tempfile::tempdir().unwrap(); + let mut path = tmpdir.path().to_path_buf(); + path.push("socket"); + + let barrier2 = barrier.clone(); + let path1 = path.clone(); + let thread = thread::spawn(move || { + barrier2.wait(); + let socket = UnixStream::connect(&path1).unwrap(); + barrier2.wait(); + drop(socket) + }); + + let listener = Listener::new(&path, false).unwrap(); + barrier.wait(); + daemon.start(listener).unwrap(); + barrier.wait(); + // Above process generates a `HandleRequest(PartialMessage)` error. + daemon.wait().unwrap_err(); + daemon.wait().unwrap(); + thread.join().unwrap(); + } + + #[test] + fn test_new_daemon_client() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); + + let handlers = daemon.get_epoll_handlers(); + assert_eq!(handlers.len(), 2); + + let barrier = Arc::new(Barrier::new(2)); + let tmpdir = tempfile::tempdir().unwrap(); + let mut path = tmpdir.path().to_path_buf(); + path.push("socket"); + + let barrier2 = barrier.clone(); + let path1 = path.clone(); + let thread = thread::spawn(move || { + let listener = UnixListener::bind(&path1).unwrap(); + barrier2.wait(); + let (stream, _) = listener.accept().unwrap(); + barrier2.wait(); + drop(stream) + }); + + barrier.wait(); + daemon + .start_client(path.as_path().to_str().unwrap()) + .unwrap(); + barrier.wait(); + // Above process generates a `HandleRequest(PartialMessage)` error. + daemon.wait().unwrap_err(); + daemon.wait().unwrap(); + thread.join().unwrap(); + } +} diff --git a/crates/vhost-user-backend/src/vring.rs b/crates/vhost-user-backend/src/vring.rs new file mode 100644 index 0000000..0783676 --- /dev/null +++ b/crates/vhost-user-backend/src/vring.rs @@ -0,0 +1,547 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2021 Alibaba Cloud Computing. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Struct to maintain state information and manipulate vhost-user queues. + +use std::fs::File; +use std::io; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::{FromRawFd, IntoRawFd}; +use std::result::Result; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use virtio_queue::{Error as VirtQueError, Queue, QueueT}; +use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::eventfd::EventFd; + +/// Trait for objects returned by `VringT::get_ref()`. +pub trait VringStateGuard<'a, M: GuestAddressSpace> { + /// Type for guard returned by `VringT::get_ref()`. + type G: Deref>; +} + +/// Trait for objects returned by `VringT::get_mut()`. +pub trait VringStateMutGuard<'a, M: GuestAddressSpace> { + /// Type for guard returned by `VringT::get_mut()`. + type G: DerefMut>; +} + +pub trait VringT: + for<'a> VringStateGuard<'a, M> + for<'a> VringStateMutGuard<'a, M> +{ + /// Create a new instance of Vring. + fn new(mem: M, max_queue_size: u16) -> Result + where + Self: Sized; + + /// Get an immutable reference to the kick event fd. + fn get_ref(&self) -> >::G; + + /// Get a mutable reference to the kick event fd. + fn get_mut(&self) -> >::G; + + /// Add an used descriptor into the used queue. + fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError>; + + /// Notify the vhost-user master that used descriptors have been put into the used queue. + fn signal_used_queue(&self) -> io::Result<()>; + + /// Enable event notification for queue. + fn enable_notification(&self) -> Result; + + /// Disable event notification for queue. + fn disable_notification(&self) -> Result<(), VirtQueError>; + + /// Check whether a notification to the guest is needed. + fn needs_notification(&self) -> Result; + + /// Set vring enabled state. + fn set_enabled(&self, enabled: bool); + + /// Set queue addresses for descriptor table, available ring and used ring. + fn set_queue_info( + &self, + desc_table: u64, + avail_ring: u64, + used_ring: u64, + ) -> Result<(), VirtQueError>; + + /// Get queue next avail head. + fn queue_next_avail(&self) -> u16; + + /// Set queue next avail head. + fn set_queue_next_avail(&self, base: u16); + + /// Set configured queue size. + fn set_queue_size(&self, num: u16); + + /// Enable/disable queue event index feature. + fn set_queue_event_idx(&self, enabled: bool); + + /// Set queue enabled state. + fn set_queue_ready(&self, ready: bool); + + /// Set `EventFd` for kick. + fn set_kick(&self, file: Option); + + /// Read event from the kick `EventFd`. + fn read_kick(&self) -> io::Result; + + /// Set `EventFd` for call. + fn set_call(&self, file: Option); + + /// Set `EventFd` for err. + fn set_err(&self, file: Option); +} + +/// Struct to maintain raw state information for a vhost-user queue. +/// +/// This struct maintains all information of a virito queue, and could be used as an `VringT` +/// object for single-threaded context. +pub struct VringState> { + queue: Queue, + kick: Option, + call: Option, + err: Option, + enabled: bool, + mem: M, +} + +impl VringState { + /// Create a new instance of Vring. + fn new(mem: M, max_queue_size: u16) -> Result { + Ok(VringState { + queue: Queue::new(max_queue_size)?, + kick: None, + call: None, + err: None, + enabled: false, + mem, + }) + } + + /// Get an immutable reference to the underlying raw `Queue` object. + pub fn get_queue(&self) -> &Queue { + &self.queue + } + + /// Get a mutable reference to the underlying raw `Queue` object. + pub fn get_queue_mut(&mut self) -> &mut Queue { + &mut self.queue + } + + /// Add an used descriptor into the used queue. + pub fn add_used(&mut self, desc_index: u16, len: u32) -> Result<(), VirtQueError> { + self.queue + .add_used(self.mem.memory().deref(), desc_index, len) + } + + /// Notify the vhost-user master that used descriptors have been put into the used queue. + pub fn signal_used_queue(&self) -> io::Result<()> { + if let Some(call) = self.call.as_ref() { + call.write(1) + } else { + Ok(()) + } + } + + /// Enable event notification for queue. + pub fn enable_notification(&mut self) -> Result { + self.queue.enable_notification(self.mem.memory().deref()) + } + + /// Disable event notification for queue. + pub fn disable_notification(&mut self) -> Result<(), VirtQueError> { + self.queue.disable_notification(self.mem.memory().deref()) + } + + /// Check whether a notification to the guest is needed. + pub fn needs_notification(&mut self) -> Result { + self.queue.needs_notification(self.mem.memory().deref()) + } + + /// Set vring enabled state. + pub fn set_enabled(&mut self, enabled: bool) { + self.enabled = enabled; + } + + /// Set queue addresses for descriptor table, available ring and used ring. + pub fn set_queue_info( + &mut self, + desc_table: u64, + avail_ring: u64, + used_ring: u64, + ) -> Result<(), VirtQueError> { + self.queue + .try_set_desc_table_address(GuestAddress(desc_table))?; + self.queue + .try_set_avail_ring_address(GuestAddress(avail_ring))?; + self.queue + .try_set_used_ring_address(GuestAddress(used_ring)) + } + + /// Get queue next avail head. + fn queue_next_avail(&self) -> u16 { + self.queue.next_avail() + } + + /// Set queue next avail head. + fn set_queue_next_avail(&mut self, base: u16) { + self.queue.set_next_avail(base); + } + + /// Set configured queue size. + fn set_queue_size(&mut self, num: u16) { + self.queue.set_size(num); + } + + /// Enable/disable queue event index feature. + fn set_queue_event_idx(&mut self, enabled: bool) { + self.queue.set_event_idx(enabled); + } + + /// Set queue enabled state. + fn set_queue_ready(&mut self, ready: bool) { + self.queue.set_ready(ready); + } + + /// Get the `EventFd` for kick. + pub fn get_kick(&self) -> &Option { + &self.kick + } + + /// Set `EventFd` for kick. + fn set_kick(&mut self, file: Option) { + // SAFETY: + // EventFd requires that it has sole ownership of its fd. So does File, so this is safe. + // Ideally, we'd have a generic way to refer to a uniquely-owned fd, such as that proposed + // by Rust RFC #3128. + self.kick = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); + } + + /// Read event from the kick `EventFd`. + fn read_kick(&self) -> io::Result { + if let Some(kick) = &self.kick { + kick.read()?; + } + + Ok(self.enabled) + } + + /// Set `EventFd` for call. + fn set_call(&mut self, file: Option) { + // SAFETY: see comment in set_kick() + self.call = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); + } + + /// Get the `EventFd` for call. + pub fn get_call(&self) -> &Option { + &self.call + } + + /// Set `EventFd` for err. + fn set_err(&mut self, file: Option) { + // SAFETY: see comment in set_kick() + self.err = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); + } +} + +/// A `VringState` object protected by Mutex for multi-threading context. +#[derive(Clone)] +pub struct VringMutex> { + state: Arc>>, +} + +impl VringMutex { + /// Get a mutable guard to the underlying raw `VringState` object. + fn lock(&self) -> MutexGuard> { + self.state.lock().unwrap() + } +} + +impl<'a, M: 'a + GuestAddressSpace> VringStateGuard<'a, M> for VringMutex { + type G = MutexGuard<'a, VringState>; +} + +impl<'a, M: 'a + GuestAddressSpace> VringStateMutGuard<'a, M> for VringMutex { + type G = MutexGuard<'a, VringState>; +} + +impl VringT for VringMutex { + fn new(mem: M, max_queue_size: u16) -> Result { + Ok(VringMutex { + state: Arc::new(Mutex::new(VringState::new(mem, max_queue_size)?)), + }) + } + + fn get_ref(&self) -> >::G { + self.state.lock().unwrap() + } + + fn get_mut(&self) -> >::G { + self.lock() + } + + fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> { + self.lock().add_used(desc_index, len) + } + + fn signal_used_queue(&self) -> io::Result<()> { + self.get_ref().signal_used_queue() + } + + fn enable_notification(&self) -> Result { + self.lock().enable_notification() + } + + fn disable_notification(&self) -> Result<(), VirtQueError> { + self.lock().disable_notification() + } + + fn needs_notification(&self) -> Result { + self.lock().needs_notification() + } + + fn set_enabled(&self, enabled: bool) { + self.lock().set_enabled(enabled) + } + + fn set_queue_info( + &self, + desc_table: u64, + avail_ring: u64, + used_ring: u64, + ) -> Result<(), VirtQueError> { + self.lock() + .set_queue_info(desc_table, avail_ring, used_ring) + } + + fn queue_next_avail(&self) -> u16 { + self.get_ref().queue_next_avail() + } + + fn set_queue_next_avail(&self, base: u16) { + self.lock().set_queue_next_avail(base) + } + + fn set_queue_size(&self, num: u16) { + self.lock().set_queue_size(num); + } + + fn set_queue_event_idx(&self, enabled: bool) { + self.lock().set_queue_event_idx(enabled); + } + + fn set_queue_ready(&self, ready: bool) { + self.lock().set_queue_ready(ready); + } + + fn set_kick(&self, file: Option) { + self.lock().set_kick(file); + } + + fn read_kick(&self) -> io::Result { + self.get_ref().read_kick() + } + + fn set_call(&self, file: Option) { + self.lock().set_call(file) + } + + fn set_err(&self, file: Option) { + self.lock().set_err(file) + } +} + +/// A `VringState` object protected by RwLock for multi-threading context. +#[derive(Clone)] +pub struct VringRwLock> { + state: Arc>>, +} + +impl VringRwLock { + /// Get a mutable guard to the underlying raw `VringState` object. + fn write_lock(&self) -> RwLockWriteGuard> { + self.state.write().unwrap() + } +} + +impl<'a, M: 'a + GuestAddressSpace> VringStateGuard<'a, M> for VringRwLock { + type G = RwLockReadGuard<'a, VringState>; +} + +impl<'a, M: 'a + GuestAddressSpace> VringStateMutGuard<'a, M> for VringRwLock { + type G = RwLockWriteGuard<'a, VringState>; +} + +impl VringT for VringRwLock { + fn new(mem: M, max_queue_size: u16) -> Result { + Ok(VringRwLock { + state: Arc::new(RwLock::new(VringState::new(mem, max_queue_size)?)), + }) + } + + fn get_ref(&self) -> >::G { + self.state.read().unwrap() + } + + fn get_mut(&self) -> >::G { + self.write_lock() + } + + fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> { + self.write_lock().add_used(desc_index, len) + } + + fn signal_used_queue(&self) -> io::Result<()> { + self.get_ref().signal_used_queue() + } + + fn enable_notification(&self) -> Result { + self.write_lock().enable_notification() + } + + fn disable_notification(&self) -> Result<(), VirtQueError> { + self.write_lock().disable_notification() + } + + fn needs_notification(&self) -> Result { + self.write_lock().needs_notification() + } + + fn set_enabled(&self, enabled: bool) { + self.write_lock().set_enabled(enabled) + } + + fn set_queue_info( + &self, + desc_table: u64, + avail_ring: u64, + used_ring: u64, + ) -> Result<(), VirtQueError> { + self.write_lock() + .set_queue_info(desc_table, avail_ring, used_ring) + } + + fn queue_next_avail(&self) -> u16 { + self.get_ref().queue_next_avail() + } + + fn set_queue_next_avail(&self, base: u16) { + self.write_lock().set_queue_next_avail(base) + } + + fn set_queue_size(&self, num: u16) { + self.write_lock().set_queue_size(num); + } + + fn set_queue_event_idx(&self, enabled: bool) { + self.write_lock().set_queue_event_idx(enabled); + } + + fn set_queue_ready(&self, ready: bool) { + self.write_lock().set_queue_ready(ready); + } + + fn set_kick(&self, file: Option) { + self.write_lock().set_kick(file); + } + + fn read_kick(&self) -> io::Result { + self.get_ref().read_kick() + } + + fn set_call(&self, file: Option) { + self.write_lock().set_call(file) + } + + fn set_err(&self, file: Option) { + self.write_lock().set_err(file) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::os::unix::io::AsRawFd; + use vm_memory::bitmap::AtomicBitmap; + use vmm_sys_util::eventfd::EventFd; + + #[test] + fn test_new_vring() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::::from_ranges(&[(GuestAddress(0x100000), 0x10000)]) + .unwrap(), + ); + let vring = VringMutex::new(mem, 0x1000).unwrap(); + + assert!(vring.get_ref().get_kick().is_none()); + assert!(!vring.get_mut().enabled); + assert!(!vring.lock().queue.ready()); + assert!(!vring.lock().queue.event_idx_enabled()); + + vring.set_enabled(true); + assert!(vring.get_ref().enabled); + + vring.set_queue_info(0x100100, 0x100200, 0x100300).unwrap(); + assert_eq!(vring.lock().get_queue().desc_table(), 0x100100); + assert_eq!(vring.lock().get_queue().avail_ring(), 0x100200); + assert_eq!(vring.lock().get_queue().used_ring(), 0x100300); + + assert_eq!(vring.queue_next_avail(), 0); + vring.set_queue_next_avail(0x20); + assert_eq!(vring.queue_next_avail(), 0x20); + + vring.set_queue_size(0x200); + assert_eq!(vring.lock().queue.size(), 0x200); + + vring.set_queue_event_idx(true); + assert!(vring.lock().queue.event_idx_enabled()); + + vring.set_queue_ready(true); + assert!(vring.lock().queue.ready()); + } + + #[test] + fn test_vring_set_fd() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + let vring = VringMutex::new(mem, 0x1000).unwrap(); + + vring.set_enabled(true); + assert!(vring.get_ref().enabled); + + let eventfd = EventFd::new(0).unwrap(); + let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + assert!(vring.get_mut().kick.is_none()); + assert!(vring.read_kick().unwrap()); + vring.set_kick(Some(file)); + eventfd.write(1).unwrap(); + assert!(vring.read_kick().unwrap()); + assert!(vring.get_ref().kick.is_some()); + vring.set_kick(None); + assert!(vring.get_ref().kick.is_none()); + std::mem::forget(eventfd); + + let eventfd = EventFd::new(0).unwrap(); + let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + assert!(vring.get_ref().call.is_none()); + vring.set_call(Some(file)); + assert!(vring.get_ref().call.is_some()); + vring.set_call(None); + assert!(vring.get_ref().call.is_none()); + std::mem::forget(eventfd); + + let eventfd = EventFd::new(0).unwrap(); + let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + assert!(vring.get_ref().err.is_none()); + vring.set_err(Some(file)); + assert!(vring.get_ref().err.is_some()); + vring.set_err(None); + assert!(vring.get_ref().err.is_none()); + std::mem::forget(eventfd); + } +} diff --git a/crates/vhost-user-backend/tests/vhost-user-server.rs b/crates/vhost-user-backend/tests/vhost-user-server.rs new file mode 100644 index 0000000..3065647 --- /dev/null +++ b/crates/vhost-user-backend/tests/vhost-user-server.rs @@ -0,0 +1,297 @@ +use std::ffi::CString; +use std::fs::File; +use std::io::Result; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::net::UnixStream; +use std::path::Path; +use std::sync::{Arc, Barrier, Mutex}; +use std::thread; + +use vhost::vhost_user::message::{ + VhostUserConfigFlags, VhostUserHeaderFlag, VhostUserInflight, VhostUserProtocolFeatures, +}; +use vhost::vhost_user::{Listener, Master, SlaveFsCacheReq, VhostUserMaster}; +use vhost::{VhostBackend, VhostUserMemoryRegionInfo, VringConfigData}; +use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock}; +use vm_memory::{ + FileOffset, GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap, +}; +use vmm_sys_util::epoll::EventSet; +use vmm_sys_util::eventfd::EventFd; + +struct MockVhostBackend { + events: u64, + event_idx: bool, + acked_features: u64, +} + +impl MockVhostBackend { + fn new() -> Self { + MockVhostBackend { + events: 0, + event_idx: false, + acked_features: 0, + } + } +} + +impl VhostUserBackendMut for MockVhostBackend { + fn num_queues(&self) -> usize { + 2 + } + + fn max_queue_size(&self) -> usize { + 256 + } + + fn features(&self) -> u64 { + 0xffff_ffff_ffff_ffff + } + + fn acked_features(&mut self, features: u64) { + self.acked_features = features; + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::all() + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + assert_eq!(offset, 0x200); + assert_eq!(size, 8); + + vec![0xa5u8; 8] + } + + fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<()> { + assert_eq!(offset, 0x200); + assert_eq!(buf, &[0xa5u8; 8]); + + Ok(()) + } + + fn update_memory(&mut self, atomic_mem: GuestMemoryAtomic) -> Result<()> { + let mem = atomic_mem.memory(); + let region = mem.find_region(GuestAddress(0x100000)).unwrap(); + assert_eq!(region.size(), 0x100000); + Ok(()) + } + + fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + fn queues_per_thread(&self) -> Vec { + vec![1, 1] + } + + fn exit_event(&self, _thread_index: usize) -> Option { + let event_fd = EventFd::new(0).unwrap(); + + Some(event_fd) + } + + fn handle_event( + &mut self, + _device_event: u16, + _evset: EventSet, + _vrings: &[VringRwLock], + _thread_id: usize, + ) -> Result { + self.events += 1; + + Ok(false) + } +} + +fn setup_master(path: &Path, barrier: Arc) -> Master { + barrier.wait(); + let mut master = Master::connect(path, 1).unwrap(); + master.set_hdr_flags(VhostUserHeaderFlag::NEED_REPLY); + // Wait before issue service requests. + barrier.wait(); + + let features = master.get_features().unwrap(); + let proto = master.get_protocol_features().unwrap(); + master.set_features(features).unwrap(); + master.set_protocol_features(proto).unwrap(); + assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK)); + + master +} + +fn vhost_user_client(path: &Path, barrier: Arc) { + barrier.wait(); + let mut master = Master::connect(path, 1).unwrap(); + master.set_hdr_flags(VhostUserHeaderFlag::NEED_REPLY); + // Wait before issue service requests. + barrier.wait(); + + let features = master.get_features().unwrap(); + let proto = master.get_protocol_features().unwrap(); + master.set_features(features).unwrap(); + master.set_protocol_features(proto).unwrap(); + assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK)); + + let queue_num = master.get_queue_num().unwrap(); + assert_eq!(queue_num, 2); + + master.set_owner().unwrap(); + //master.set_owner().unwrap_err(); + master.reset_owner().unwrap(); + master.reset_owner().unwrap(); + master.set_owner().unwrap(); + + master.set_features(features).unwrap(); + master.set_protocol_features(proto).unwrap(); + assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK)); + + let memfd = nix::sys::memfd::memfd_create( + &CString::new("test").unwrap(), + nix::sys::memfd::MemFdCreateFlag::empty(), + ) + .unwrap(); + let file = unsafe { File::from_raw_fd(memfd) }; + file.set_len(0x100000).unwrap(); + let file_offset = FileOffset::new(file, 0); + let mem = GuestMemoryMmap::<()>::from_ranges_with_files(&[( + GuestAddress(0x100000), + 0x100000, + Some(file_offset), + )]) + .unwrap(); + let addr = mem.get_host_address(GuestAddress(0x100000)).unwrap() as u64; + let reg = mem.find_region(GuestAddress(0x100000)).unwrap(); + let fd = reg.file_offset().unwrap(); + let regions = [VhostUserMemoryRegionInfo { + guest_phys_addr: 0x100000, + memory_size: 0x100000, + userspace_addr: addr, + mmap_offset: 0, + mmap_handle: fd.file().as_raw_fd(), + }]; + master.set_mem_table(®ions).unwrap(); + + master.set_vring_num(0, 256).unwrap(); + + let config = VringConfigData { + queue_max_size: 256, + queue_size: 256, + flags: 0, + desc_table_addr: addr, + used_ring_addr: addr + 0x10000, + avail_ring_addr: addr + 0x20000, + log_addr: None, + }; + master.set_vring_addr(0, &config).unwrap(); + + let eventfd = EventFd::new(0).unwrap(); + master.set_vring_kick(0, &eventfd).unwrap(); + master.set_vring_call(0, &eventfd).unwrap(); + master.set_vring_err(0, &eventfd).unwrap(); + master.set_vring_enable(0, true).unwrap(); + + let buf = [0u8; 8]; + let (_cfg, data) = master + .get_config(0x200, 8, VhostUserConfigFlags::empty(), &buf) + .unwrap(); + assert_eq!(&data, &[0xa5u8; 8]); + master + .set_config(0x200, VhostUserConfigFlags::empty(), &data) + .unwrap(); + + let (tx, _rx) = UnixStream::pair().unwrap(); + master.set_slave_request_fd(&tx).unwrap(); + + let state = master.get_vring_base(0).unwrap(); + master.set_vring_base(0, state as u16).unwrap(); + + assert_eq!(master.get_max_mem_slots().unwrap(), 32); + let region = VhostUserMemoryRegionInfo { + guest_phys_addr: 0x800000, + memory_size: 0x100000, + userspace_addr: addr, + mmap_offset: 0, + mmap_handle: fd.file().as_raw_fd(), + }; + master.add_mem_region(®ion).unwrap(); + master.remove_mem_region(®ion).unwrap(); +} + +fn vhost_user_server(cb: fn(&Path, Arc)) { + let mem = GuestMemoryAtomic::new(GuestMemoryMmap::<()>::new()); + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let tmpdir = tempfile::tempdir().unwrap(); + let mut path = tmpdir.path().to_path_buf(); + path.push("socket"); + + let barrier2 = barrier.clone(); + let path1 = path.clone(); + let thread = thread::spawn(move || cb(&path1, barrier2)); + + let listener = Listener::new(&path, false).unwrap(); + barrier.wait(); + daemon.start(listener).unwrap(); + barrier.wait(); + + // handle service requests from clients. + thread.join().unwrap(); +} + +#[test] +fn test_vhost_user_server() { + vhost_user_server(vhost_user_client); +} + +fn vhost_user_enable(path: &Path, barrier: Arc) { + let master = setup_master(path, barrier); + master.set_owner().unwrap(); + master.set_owner().unwrap_err(); +} + +#[test] +fn test_vhost_user_enable() { + vhost_user_server(vhost_user_enable); +} + +fn vhost_user_set_inflight(path: &Path, barrier: Arc) { + let mut master = setup_master(path, barrier); + let eventfd = EventFd::new(0).unwrap(); + // No implementation for inflight_fd yet. + let inflight = VhostUserInflight { + mmap_size: 0x100000, + mmap_offset: 0, + num_queues: 1, + queue_size: 256, + }; + master + .set_inflight_fd(&inflight, eventfd.as_raw_fd()) + .unwrap_err(); +} + +#[test] +fn test_vhost_user_set_inflight() { + vhost_user_server(vhost_user_set_inflight); +} + +fn vhost_user_get_inflight(path: &Path, barrier: Arc) { + let mut master = setup_master(path, barrier); + // No implementation for inflight_fd yet. + let inflight = VhostUserInflight { + mmap_size: 0x100000, + mmap_offset: 0, + num_queues: 1, + queue_size: 256, + }; + assert!(master.get_inflight_fd(&inflight).is_err()); +} + +#[test] +fn test_vhost_user_get_inflight() { + vhost_user_server(vhost_user_get_inflight); +}