diff --git a/Cargo.toml b/Cargo.toml index b920917..ffe55c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,8 @@ name = "vhost-user-backend" version = "0.1.0" authors = ["The Cloud Hypervisor Authors"] +keywords = ["vhost-user", "virtio"] +description = "A framework to build vhost-user backend service daemon" edition = "2018" license = "Apache-2.0" @@ -9,8 +11,11 @@ license = "Apache-2.0" epoll = ">=4.0.1" libc = ">=0.2.39" log = ">=0.4.6" -vhost = { git = "https://github.com/rust-vmm/vhost", features = ["vhost-user-slave"] } -virtio-bindings = "0.1.0" -vm-memory = {version = ">=0.2.0", features = ["backend-mmap", "backend-atomic"]} +vhost = { version = "0.1", features = ["vhost-user-slave"] } +virtio-bindings = "0.1" virtio-queue = { git = "https://github.com/rust-vmm/vm-virtio" } -vmm-sys-util = ">=0.3.1" +vm-memory = {version = "0.6", features = ["backend-mmap", "backend-atomic"]} +vmm-sys-util = "0.8" + +[dev-dependencies] +vm-memory = {version = "0.6", features = ["backend-mmap", "backend-atomic", "backend-bitmap"]} diff --git a/coverage_config_x86_64.json b/coverage_config_x86_64.json index a2e5cd1..0036d98 100644 --- a/coverage_config_x86_64.json +++ b/coverage_config_x86_64.json @@ -1,5 +1,5 @@ { - "coverage_score": 0, + "coverage_score": 77.4, "exclude_path": "", "crate_features": "" } diff --git a/src/backend.rs b/src/backend.rs new file mode 100644 index 0000000..d15629e --- /dev/null +++ b/src/backend.rs @@ -0,0 +1,520 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Traits for vhost user backend servers to implement virtio data plain services. +//! +//! Define two traits for vhost user backend servers to implement virtio data plane services. +//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is +//! designed with interior mutability, so the implementor may choose the suitable way to protect +//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior +//! mutability, and an implementation of: +//! ```ignore +//! impl VhostUserBackend for RwLock { } +//! ``` +//! is provided for convenience. +//! +//! [VhostUserBackend]: trait.VhostUserBackend.html +//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html + +use std::io; +use std::ops::Deref; +use std::result; +use std::sync::{Arc, Mutex, RwLock}; + +use vhost::vhost_user::message::VhostUserProtocolFeatures; +use vhost::vhost_user::SlaveFsCacheReq; +use vm_memory::bitmap::Bitmap; +use vmm_sys_util::eventfd::EventFd; + +use super::{Vring, GM}; + +/// Trait with interior mutability for vhost user backend servers to implement concrete services. +/// +/// To support multi-threading and asynchronous IO, we enforce `the Send + Sync + 'static`. +/// So there's no plan for support of "Rc" and "RefCell". +pub trait VhostUserBackend: Send + Sync + 'static { + /// Get number of queues supported. + fn num_queues(&self) -> usize; + + /// Get maximum queue size supported. + fn max_queue_size(&self) -> usize; + + /// Get available virtio features. + fn features(&self) -> u64; + + /// Set acknowledged virtio features. + fn acked_features(&self, _features: u64) {} + + /// Get available vhost protocol features. + fn protocol_features(&self) -> VhostUserProtocolFeatures; + + /// Enable or disable the virtio EVENT_IDX feature + fn set_event_idx(&self, enabled: bool); + + /// Get virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn get_config(&self, _offset: u32, _size: u32) -> Vec { + Vec::new() + } + + /// Set virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn set_config(&self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> { + Ok(()) + } + + /// Update guest memory regions. + fn update_memory(&self, mem: GM) -> result::Result<(), io::Error>; + + /// Set handler for communicating with the master by the slave communication channel. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + /// + /// TODO: this interface is designed only for vhost-user-fs, it should be refined. + fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {} + + /// Get the map to map queue index to worker thread index. + /// + /// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0, + /// the following two queues will be handled by worker thread 1, and the last four queues will + /// be handled by worker thread 2. + fn queues_per_thread(&self) -> Vec { + vec![0xffff_ffff] + } + + /// Provide an optional exit EventFd for the specified worker thread. + /// + /// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO + /// events by using epoll with the specified `token`. When the returned EventFd is written to, + /// the worker thread will exit. + fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> { + None + } + + /// Handle IO events for backend registered file descriptors. + /// + /// This function gets called if the backend registered some additional listeners onto specific + /// file descriptors. The library can handle virtqueues on its own, but does not know what to + /// do with events happening on custom listeners. + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Vring>], + thread_id: usize, + ) -> result::Result; +} + +/// Trait without interior mutability for vhost user backend servers to implement concrete services. +pub trait VhostUserBackendMut: Send + Sync + 'static { + /// Get number of queues supported. + fn num_queues(&self) -> usize; + + /// Get maximum queue size supported. + fn max_queue_size(&self) -> usize; + + /// Get available virtio features. + fn features(&self) -> u64; + + /// Set acknowledged virtio features. + fn acked_features(&mut self, _features: u64) {} + + /// Get available vhost protocol features. + fn protocol_features(&self) -> VhostUserProtocolFeatures; + + /// Enable or disable the virtio EVENT_IDX feature + fn set_event_idx(&mut self, enabled: bool); + + /// Get virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn get_config(&self, _offset: u32, _size: u32) -> Vec { + Vec::new() + } + + /// Set virtio device configuration. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> { + Ok(()) + } + + /// Update guest memory regions. + fn update_memory(&mut self, mem: GM) -> result::Result<(), io::Error>; + + /// Set handler for communicating with the master by the slave communication channel. + /// + /// A default implementation is provided as we cannot expect all backends to implement this + /// function. + /// + /// TODO: this interface is designed only for vhost-user-fs, it should be refined. + fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + /// Get the map to map queue index to worker thread index. + /// + /// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0, + /// the following two queues will be handled by worker thread 1, and the last four queues will + /// be handled by worker thread 2. + fn queues_per_thread(&self) -> Vec { + vec![0xffff_ffff] + } + + /// Provide an optional exit EventFd for the specified worker thread. + /// + /// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO + /// events by using epoll with the specified `token`. When the returned EventFd is written to, + /// the worker thread will exit. + fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> { + None + } + + /// Handle IO events for backend registered file descriptors. + /// + /// This function gets called if the backend registered some additional listeners onto specific + /// file descriptors. The library can handle virtqueues on its own, but does not know what to + /// do with events happening on custom listeners. + fn handle_event( + &mut self, + device_event: u16, + evset: epoll::Events, + vrings: &[Vring>], + thread_id: usize, + ) -> result::Result; +} + +impl, B: Bitmap + 'static> VhostUserBackend for Arc { + fn num_queues(&self) -> usize { + self.deref().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.deref().max_queue_size() + } + + fn features(&self) -> u64 { + self.deref().features() + } + + fn acked_features(&self, features: u64) { + self.deref().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.deref().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.deref().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.deref().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> { + self.deref().set_config(offset, buf) + } + + fn update_memory(&self, mem: GM) -> Result<(), io::Error> { + self.deref().update_memory(mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.deref().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.deref().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> { + self.deref().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Vring>], + thread_id: usize, + ) -> Result { + self.deref() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +impl, B: Bitmap + 'static> VhostUserBackend for Mutex { + fn num_queues(&self) -> usize { + self.lock().unwrap().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.lock().unwrap().max_queue_size() + } + + fn features(&self) -> u64 { + self.lock().unwrap().features() + } + + fn acked_features(&self, features: u64) { + self.lock().unwrap().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.lock().unwrap().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.lock().unwrap().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.lock().unwrap().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> { + self.lock().unwrap().set_config(offset, buf) + } + + fn update_memory(&self, mem: GM) -> Result<(), io::Error> { + self.lock().unwrap().update_memory(mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.lock().unwrap().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.lock().unwrap().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> { + self.lock().unwrap().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Vring>], + thread_id: usize, + ) -> Result { + self.lock() + .unwrap() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +impl, B: Bitmap + 'static> VhostUserBackend for RwLock { + fn num_queues(&self) -> usize { + self.read().unwrap().num_queues() + } + + fn max_queue_size(&self) -> usize { + self.read().unwrap().max_queue_size() + } + + fn features(&self) -> u64 { + self.read().unwrap().features() + } + + fn acked_features(&self, features: u64) { + self.write().unwrap().acked_features(features) + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + self.read().unwrap().protocol_features() + } + + fn set_event_idx(&self, enabled: bool) { + self.write().unwrap().set_event_idx(enabled) + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + self.read().unwrap().get_config(offset, size) + } + + fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> { + self.write().unwrap().set_config(offset, buf) + } + + fn update_memory(&self, mem: GM) -> Result<(), io::Error> { + self.write().unwrap().update_memory(mem) + } + + fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) { + self.write().unwrap().set_slave_req_fd(vu_req) + } + + fn queues_per_thread(&self) -> Vec { + self.read().unwrap().queues_per_thread() + } + + fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> { + self.read().unwrap().exit_event(thread_index) + } + + fn handle_event( + &self, + device_event: u16, + evset: epoll::Events, + vrings: &[Vring>], + thread_id: usize, + ) -> Result { + self.write() + .unwrap() + .handle_event(device_event, evset, vrings, thread_id) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use epoll::Events; + use std::io::Error; + use std::sync::Mutex; + use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap}; + + pub struct MockVhostBackend { + events: u64, + event_idx: bool, + acked_features: u64, + } + + impl MockVhostBackend { + pub fn new() -> Self { + MockVhostBackend { + events: 0, + event_idx: false, + acked_features: 0, + } + } + } + + impl VhostUserBackendMut<()> for MockVhostBackend { + fn num_queues(&self) -> usize { + 2 + } + + fn max_queue_size(&self) -> usize { + 256 + } + + fn features(&self) -> u64 { + 0xffff_ffff_ffff_ffff + } + + fn acked_features(&mut self, features: u64) { + self.acked_features = features; + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::all() + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn get_config(&self, offset: u32, size: u32) -> Vec { + assert_eq!(offset, 0x200); + assert_eq!(size, 8); + + vec![0xa5u8; 8] + } + + fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<(), Error> { + assert_eq!(offset, 0x200); + assert_eq!(buf.len(), 8); + assert_eq!(buf, &[0xa5u8; 8]); + + Ok(()) + } + + fn update_memory( + &mut self, + _atomic_mem: GuestMemoryAtomic, + ) -> Result<(), Error> { + Ok(()) + } + + fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} + + fn queues_per_thread(&self) -> Vec { + vec![1, 1] + } + + fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> { + let event_fd = EventFd::new(0).unwrap(); + + Some((event_fd, 0x100)) + } + + fn handle_event( + &mut self, + _device_event: u16, + _evset: Events, + _vrings: &[Vring], + _thread_id: usize, + ) -> Result { + self.events += 1; + + Ok(false) + } + } + + #[test] + fn test_new_mock_backend_mutex() { + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + + assert_eq!(backend.num_queues(), 2); + assert_eq!(backend.max_queue_size(), 256); + assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff); + assert_eq!( + backend.protocol_features(), + VhostUserProtocolFeatures::all() + ); + assert_eq!(backend.queues_per_thread(), [1, 1]); + + assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]); + backend.set_config(0x200, &[0xa5; 8]).unwrap(); + + backend.acked_features(0xffff); + assert_eq!(backend.lock().unwrap().acked_features, 0xffff); + + backend.set_event_idx(true); + assert_eq!(backend.lock().unwrap().event_idx, true); + } + + #[test] + fn test_new_mock_backend_rwlock() { + let backend = Arc::new(RwLock::new(MockVhostBackend::new())); + + assert_eq!(backend.num_queues(), 2); + assert_eq!(backend.max_queue_size(), 256); + assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff); + assert_eq!( + backend.protocol_features(), + VhostUserProtocolFeatures::all() + ); + assert_eq!(backend.queues_per_thread(), [1, 1]); + + assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]); + backend.set_config(0x200, &[0xa5; 8]).unwrap(); + + backend.acked_features(0xffff); + assert_eq!(backend.read().unwrap().acked_features, 0xffff); + + backend.set_event_idx(true); + assert_eq!(backend.read().unwrap().event_idx, true); + } +} diff --git a/src/event_loop.rs b/src/event_loop.rs new file mode 100644 index 0000000..eafe33b --- /dev/null +++ b/src/event_loop.rs @@ -0,0 +1,260 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::fmt::{Display, Formatter}; +use std::fs::File; +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::result; + +use vm_memory::bitmap::Bitmap; +use vmm_sys_util::eventfd::EventFd; + +use super::{VhostUserBackend, Vring, GM}; + +/// Errors related to vring epoll event handling. +#[derive(Debug)] +pub enum VringEpollError { + /// Failed to create epoll file descriptor. + EpollCreateFd(io::Error), + /// Failed while waiting for events. + EpollWait(io::Error), + /// Could not register exit event + RegisterExitEvent(io::Error), + /// Failed to read the event from kick EventFd. + HandleEventReadKick(io::Error), + /// Failed to handle the event from the backend. + HandleEventBackendHandling(io::Error), +} + +impl Display for VringEpollError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + VringEpollError::EpollCreateFd(e) => write!(f, "cannot create epoll fd: {}", e), + VringEpollError::EpollWait(e) => write!(f, "failed to wait for epoll event: {}", e), + VringEpollError::RegisterExitEvent(e) => write!(f, "cannot register exit event: {}", e), + VringEpollError::HandleEventReadKick(e) => { + write!(f, "cannot read vring kick event: {}", e) + } + VringEpollError::HandleEventBackendHandling(e) => { + write!(f, "failed to handle epoll event: {}", e) + } + } + } +} + +impl std::error::Error for VringEpollError {} + +/// Result of vring epoll operations. +pub type VringEpollResult = std::result::Result; + +/// Epoll event handler to manage and process epoll events for registered file descriptor. +/// +/// The `VringEpollHandler` structure provides interfaces to: +/// - add file descriptors to be monitored by the epoll fd +/// - remove registered file descriptors from the epoll fd +/// - run the event loop to handle pending events on the epoll fd +pub struct VringEpollHandler, B: Bitmap + 'static> { + epoll_file: File, + backend: S, + vrings: Vec>>, + thread_id: usize, + exit_event_fd: Option, + exit_event_id: Option, +} + +impl, B: Bitmap + 'static> VringEpollHandler { + /// Create a `VringEpollHandler` instance. + pub(crate) fn new( + backend: S, + vrings: Vec>>, + thread_id: usize, + ) -> VringEpollResult { + let epoll_fd = epoll::create(true).map_err(VringEpollError::EpollCreateFd)?; + let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; + + let handler = match backend.exit_event(thread_id) { + Some((exit_event_fd, exit_event_id)) => { + epoll::ctl( + epoll_file.as_raw_fd(), + epoll::ControlOptions::EPOLL_CTL_ADD, + exit_event_fd.as_raw_fd(), + epoll::Event::new(epoll::Events::EPOLLIN, u64::from(exit_event_id)), + ) + .map_err(VringEpollError::RegisterExitEvent)?; + + VringEpollHandler { + epoll_file, + backend, + vrings, + thread_id, + exit_event_fd: Some(exit_event_fd), + exit_event_id: Some(exit_event_id), + } + } + None => VringEpollHandler { + epoll_file, + backend, + vrings, + thread_id, + exit_event_fd: None, + exit_event_id: None, + }, + }; + + Ok(handler) + } + + /// Send `exit event` to break the event loop. + pub fn send_exit_event(&self) { + if let Some(eventfd) = self.exit_event_fd.as_ref() { + let _ = eventfd.write(1); + } + } + + /// Register an event into the epoll fd. + /// + /// When this event is later triggered, the backend implementation of `handle_event` will be + /// called. + pub fn register_listener( + &self, + fd: RawFd, + ev_type: epoll::Events, + data: u64, + ) -> result::Result<(), io::Error> { + epoll::ctl( + self.epoll_file.as_raw_fd(), + epoll::ControlOptions::EPOLL_CTL_ADD, + fd, + epoll::Event::new(ev_type, data), + ) + } + + /// Unregister an event from the epoll fd. + /// + /// If the event is triggered after this function has been called, the event will be silently + /// dropped. + pub fn unregister_listener( + &self, + fd: RawFd, + ev_type: epoll::Events, + data: u64, + ) -> result::Result<(), io::Error> { + epoll::ctl( + self.epoll_file.as_raw_fd(), + epoll::ControlOptions::EPOLL_CTL_DEL, + fd, + epoll::Event::new(ev_type, data), + ) + } + + /// Run the event poll loop to handle all pending events on registered fds. + /// + /// The event loop will be terminated once an event is received from the `exit event fd` + /// associated with the backend. + pub(crate) fn run(&self) -> VringEpollResult<()> { + const EPOLL_EVENTS_LEN: usize = 100; + let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN]; + + 'epoll: loop { + let num_events = match epoll::wait(self.epoll_file.as_raw_fd(), -1, &mut events[..]) { + Ok(res) => res, + Err(e) => { + if e.kind() == io::ErrorKind::Interrupted { + // It's well defined from the epoll_wait() syscall + // documentation that the epoll loop can be interrupted + // before any of the requested events occurred or the + // timeout expired. In both those cases, epoll_wait() + // returns an error of type EINTR, but this should not + // be considered as a regular error. Instead it is more + // appropriate to retry, by calling into epoll_wait(). + continue; + } + return Err(VringEpollError::EpollWait(e)); + } + }; + + for event in events.iter().take(num_events) { + let evset = match epoll::Events::from_bits(event.events) { + Some(evset) => evset, + None => { + let evbits = event.events; + println!("epoll: ignoring unknown event set: 0x{:x}", evbits); + continue; + } + }; + + let ev_type = event.data as u16; + + // handle_event() returns true if an event is received from the exit event fd. + if self.handle_event(ev_type, evset)? { + break 'epoll; + } + } + } + + Ok(()) + } + + fn handle_event(&self, device_event: u16, evset: epoll::Events) -> VringEpollResult { + if self.exit_event_id == Some(device_event) { + return Ok(true); + } + + if (device_event as usize) < self.vrings.len() { + let vring = &self.vrings[device_event as usize]; + let enabled = vring + .read_kick() + .map_err(VringEpollError::HandleEventReadKick)?; + + // If the vring is not enabled, it should not be processed. + if !enabled { + return Ok(false); + } + } + + self.backend + .handle_event(device_event, evset, &self.vrings, self.thread_id) + .map_err(VringEpollError::HandleEventBackendHandling) + } +} + +#[cfg(test)] +mod tests { + use super::super::backend::tests::MockVhostBackend; + use super::*; + use std::sync::{Arc, Mutex}; + use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; + use vmm_sys_util::eventfd::EventFd; + + #[test] + fn test_vring_epoll_handler() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + let vring = Vring::new(mem, 0x1000); + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + + let handler = VringEpollHandler::new(backend, vec![vring], 0x1).unwrap(); + assert!(handler.exit_event_id.is_some()); + + let eventfd = EventFd::new(0).unwrap(); + handler + .register_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1) + .unwrap(); + // Register an already registered fd. + handler + .register_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1) + .unwrap_err(); + + handler + .unregister_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1) + .unwrap(); + // unregister an already unregistered fd. + handler + .unregister_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1) + .unwrap_err(); + } +} diff --git a/src/handler.rs b/src/handler.rs new file mode 100644 index 0000000..2bdb58d --- /dev/null +++ b/src/handler.rs @@ -0,0 +1,544 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2019-2021 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::error; +use std::fs::File; +use std::io; +use std::os::unix::io::AsRawFd; +use std::sync::Arc; +use std::thread; + +use vhost::vhost_user::message::{ + VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures, + VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags, + VhostUserVringState, +}; +use vhost::vhost_user::{Error as VhostUserError, Result as VhostUserResult, SlaveFsCacheReq}; +use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; +use vm_memory::bitmap::Bitmap; +use vm_memory::mmap::NewBitmap; +use vm_memory::{FileOffset, GuestAddress, GuestMemoryMmap, GuestRegionMmap}; + +use super::event_loop::{VringEpollError, VringEpollResult}; +use super::*; + +const MAX_MEM_SLOTS: u64 = 32; + +#[derive(Debug)] +/// Errors related to vhost-user handler. +pub enum VhostUserHandlerError { + /// Failed to create vring worker. + CreateEpollHandler(VringEpollError), + /// Failed to spawn vring worker. + SpawnVringWorker(io::Error), + /// Could not find the mapping from memory regions. + MissingMemoryMapping, +} + +impl std::fmt::Display for VhostUserHandlerError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + VhostUserHandlerError::CreateEpollHandler(e) => { + write!(f, "failed to create vring epoll handler: {}", e) + } + VhostUserHandlerError::SpawnVringWorker(e) => { + write!(f, "failed spawning the vring worker: {}", e) + } + VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"), + } + } +} + +impl error::Error for VhostUserHandlerError {} + +/// Result of vhost-user handler operations. +pub type VhostUserHandlerResult = std::result::Result; + +struct AddrMapping { + vmm_addr: u64, + size: u64, + gpa_base: u64, +} + +pub struct VhostUserHandler, B: Bitmap + 'static> { + backend: S, + handlers: Vec>>, + owned: bool, + features_acked: bool, + acked_features: u64, + acked_protocol_features: u64, + num_queues: usize, + max_queue_size: usize, + queues_per_thread: Vec, + mappings: Vec, + atomic_mem: GM, + vrings: Vec>>, + worker_threads: Vec>>, +} + +impl + Clone, B: Bitmap + Clone + Send + Sync> VhostUserHandler { + pub(crate) fn new(backend: S, atomic_mem: GM) -> VhostUserHandlerResult { + let num_queues = backend.num_queues(); + let max_queue_size = backend.max_queue_size(); + let queues_per_thread = backend.queues_per_thread(); + + let mut vrings = Vec::new(); + for _ in 0..num_queues { + let vring = Vring::new(atomic_mem.clone(), max_queue_size as u16); + vrings.push(vring); + } + + let mut handlers = Vec::new(); + let mut worker_threads = Vec::new(); + for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() { + let mut thread_vrings = Vec::new(); + for (index, vring) in vrings.iter().enumerate() { + if (queues_mask >> index) & 1u64 == 1u64 { + thread_vrings.push(vring.clone()); + } + } + + let handler = Arc::new( + VringEpollHandler::new(backend.clone(), thread_vrings, thread_id) + .map_err(VhostUserHandlerError::CreateEpollHandler)?, + ); + let handler2 = handler.clone(); + let worker_thread = thread::Builder::new() + .name("vring_worker".to_string()) + .spawn(move || handler2.run()) + .map_err(VhostUserHandlerError::SpawnVringWorker)?; + + handlers.push(handler); + worker_threads.push(worker_thread); + } + + Ok(VhostUserHandler { + backend, + handlers, + owned: false, + features_acked: false, + acked_features: 0, + acked_protocol_features: 0, + num_queues, + max_queue_size, + queues_per_thread, + mappings: Vec::new(), + atomic_mem, + vrings, + worker_threads, + }) + } +} + +impl, B: Bitmap> VhostUserHandler { + pub(crate) fn get_epoll_handlers(&self) -> Vec>> { + self.handlers.clone() + } + + pub(crate) fn send_exit_event(&self) { + for handler in self.handlers.iter() { + handler.send_exit_event(); + } + } + + fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult { + for mapping in self.mappings.iter() { + if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size { + return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base); + } + } + + Err(VhostUserHandlerError::MissingMemoryMapping) + } +} + +impl, B: NewBitmap + Clone> VhostUserSlaveReqHandlerMut + for VhostUserHandler +{ + fn set_owner(&mut self) -> VhostUserResult<()> { + if self.owned { + return Err(VhostUserError::InvalidOperation); + } + self.owned = true; + Ok(()) + } + + fn reset_owner(&mut self) -> VhostUserResult<()> { + self.owned = false; + self.features_acked = false; + self.acked_features = 0; + self.acked_protocol_features = 0; + Ok(()) + } + + fn get_features(&mut self) -> VhostUserResult { + Ok(self.backend.features()) + } + + fn set_features(&mut self, features: u64) -> VhostUserResult<()> { + if (features & !self.backend.features()) != 0 { + return Err(VhostUserError::InvalidParam); + } + + self.acked_features = features; + self.features_acked = true; + + // If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated, + // the ring is initialized in an enabled state. + // If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated, + // the ring is initialized in a disabled state. Client must not + // pass data to/from the backend until ring is enabled by + // VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has + // been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0. + let vring_enabled = + self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0; + for vring in self.vrings.iter() { + vring.set_enabled(vring_enabled); + } + + self.backend.acked_features(self.acked_features); + + Ok(()) + } + + fn get_protocol_features(&mut self) -> VhostUserResult { + Ok(self.backend.protocol_features()) + } + + fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> { + // Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must + // support this message even before VHOST_USER_SET_FEATURES was + // called. + self.acked_protocol_features = features; + Ok(()) + } + + fn set_mem_table( + &mut self, + ctx: &[VhostUserMemoryRegion], + files: Vec, + ) -> VhostUserResult<()> { + // We need to create tuple of ranges from the list of VhostUserMemoryRegion + // that we get from the caller. + let mut regions: Vec<(GuestAddress, usize, Option)> = Vec::new(); + let mut mappings: Vec = Vec::new(); + + for (region, file) in ctx.iter().zip(files) { + let g_addr = GuestAddress(region.guest_phys_addr); + let len = region.memory_size as usize; + let f_off = FileOffset::new(file, region.mmap_offset); + + regions.push((g_addr, len, Some(f_off))); + mappings.push(AddrMapping { + vmm_addr: region.user_addr, + size: region.memory_size, + gpa_base: region.guest_phys_addr, + }); + } + + let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + // Updating the inner GuestMemory object here will cause all our vrings to + // see the new one the next time they call to `atomic_mem.memory()`. + self.atomic_mem.lock().unwrap().replace(mem); + + self.backend + .update_memory(self.atomic_mem.clone()) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + self.mappings = mappings; + + Ok(()) + } + + fn get_queue_num(&mut self) -> VhostUserResult { + Ok(self.num_queues as u64) + } + + fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> { + if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size { + return Err(VhostUserError::InvalidParam); + } + self.vrings[index as usize].set_queue_size(num as u16); + Ok(()) + } + + fn set_vring_addr( + &mut self, + index: u32, + _flags: VhostUserVringAddrFlags, + descriptor: u64, + used: u64, + available: u64, + _log: u64, + ) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + if !self.mappings.is_empty() { + let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + let used_ring = self.vmm_va_to_gpa(used).map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + self.vrings[index as usize].set_queue_info(desc_table, avail_ring, used_ring); + Ok(()) + } else { + Err(VhostUserError::InvalidParam) + } + } + + fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> { + let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; + + self.vrings[index as usize].set_queue_next_avail(base as u16); + self.vrings[index as usize].set_queue_event_idx(event_idx); + self.backend.set_event_idx(event_idx); + + Ok(()) + } + + fn get_vring_base(&mut self, index: u32) -> VhostUserResult { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + // Quote from vhost-user specification: + // Client must start ring upon receiving a kick (that is, detecting + // that file descriptor is readable) on the descriptor specified by + // VHOST_USER_SET_VRING_KICK, and stop ring upon receiving + // VHOST_USER_GET_VRING_BASE. + self.vrings[index as usize].set_queue_ready(false); + if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() { + for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { + let shifted_queues_mask = queues_mask >> index; + if shifted_queues_mask & 1u64 == 1u64 { + let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); + self.handlers[thread_index] + .unregister_listener( + fd.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(evt_idx), + ) + .map_err(VhostUserError::ReqHandlerError)?; + break; + } + } + } + + let next_avail = self.vrings[index as usize].queue_next_avail(); + + Ok(VhostUserVringState::new(index, u32::from(next_avail))) + } + + fn set_vring_kick(&mut self, index: u8, file: Option) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + // SAFETY: EventFd requires that it has sole ownership of its fd. So + // does File, so this is safe. + // Ideally, we'd have a generic way to refer to a uniquely-owned fd, + // such as that proposed by Rust RFC #3128. + self.vrings[index as usize].set_kick(file); + + // Quote from vhost-user specification: + // Client must start ring upon receiving a kick (that is, detecting + // that file descriptor is readable) on the descriptor specified by + // VHOST_USER_SET_VRING_KICK, and stop ring upon receiving + // VHOST_USER_GET_VRING_BASE. + self.vrings[index as usize].set_queue_ready(true); + if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() { + for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { + let shifted_queues_mask = queues_mask >> index; + if shifted_queues_mask & 1u64 == 1u64 { + let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); + self.handlers[thread_index] + .register_listener( + fd.as_raw_fd(), + epoll::Events::EPOLLIN, + u64::from(evt_idx), + ) + .map_err(VhostUserError::ReqHandlerError)?; + break; + } + } + } + + Ok(()) + } + + fn set_vring_call(&mut self, index: u8, file: Option) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + self.vrings[index as usize].set_call(file); + + Ok(()) + } + + fn set_vring_err(&mut self, index: u8, file: Option) -> VhostUserResult<()> { + if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + self.vrings[index as usize].set_err(file); + + Ok(()) + } + + fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> { + // This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES + // has been negotiated. + if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 { + return Err(VhostUserError::InvalidOperation); + } else if index as usize >= self.num_queues { + return Err(VhostUserError::InvalidParam); + } + + // Slave must not pass data to/from the backend until ring is + // enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1, + // or after it has been disabled by VHOST_USER_SET_VRING_ENABLE + // with parameter 0. + self.vrings[index as usize].set_enabled(enable); + + Ok(()) + } + + fn get_config( + &mut self, + offset: u32, + size: u32, + _flags: VhostUserConfigFlags, + ) -> VhostUserResult> { + Ok(self.backend.get_config(offset, size)) + } + + fn set_config( + &mut self, + offset: u32, + buf: &[u8], + _flags: VhostUserConfigFlags, + ) -> VhostUserResult<()> { + self.backend + .set_config(offset, buf) + .map_err(VhostUserError::ReqHandlerError) + } + + fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) { + if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 { + vu_req.set_reply_ack_flag(true); + } + + self.backend.set_slave_req_fd(vu_req); + } + + fn get_max_mem_slots(&mut self) -> VhostUserResult { + Ok(MAX_MEM_SLOTS) + } + + fn add_mem_region( + &mut self, + region: &VhostUserSingleMemoryRegion, + file: File, + ) -> VhostUserResult<()> { + let mmap_region = MmapRegion::from_file( + FileOffset::new(file, region.mmap_offset), + region.memory_size as usize, + ) + .map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?; + let guest_region = Arc::new( + GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err( + |e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)), + )?, + ); + + let mem = self + .atomic_mem + .memory() + .insert_region(guest_region) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.atomic_mem.lock().unwrap().replace(mem); + + self.backend + .update_memory(self.atomic_mem.clone()) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.mappings.push(AddrMapping { + vmm_addr: region.user_addr, + size: region.memory_size, + gpa_base: region.guest_phys_addr, + }); + + Ok(()) + } + + fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> { + let (mem, _) = self + .atomic_mem + .memory() + .remove_region(GuestAddress(region.guest_phys_addr), region.memory_size) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.atomic_mem.lock().unwrap().replace(mem); + + self.backend + .update_memory(self.atomic_mem.clone()) + .map_err(|e| { + VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) + })?; + + self.mappings + .retain(|mapping| mapping.gpa_base != region.guest_phys_addr); + + Ok(()) + } + + fn get_inflight_fd( + &mut self, + _inflight: &vhost::vhost_user::message::VhostUserInflight, + ) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> { + // Assume the backend hasn't negotiated the inflight feature; it + // wouldn't be correct for the backend to do so, as we don't (yet) + // provide a way for it to handle such requests. + Err(VhostUserError::InvalidOperation) + } + + fn set_inflight_fd( + &mut self, + _inflight: &vhost::vhost_user::message::VhostUserInflight, + _file: File, + ) -> VhostUserResult<()> { + Err(VhostUserError::InvalidOperation) + } +} + +impl, B: Bitmap> Drop for VhostUserHandler { + fn drop(&mut self) { + // Signal all working threads to exit. + self.send_exit_event(); + + for thread in self.worker_threads.drain(..) { + if let Err(e) = thread.join() { + error!("Error in vring worker: {:?}", e); + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 3564b06..1af2241 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,39 +1,42 @@ // Copyright 2019 Intel Corporation. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 +// Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved. // -// Copyright 2019 Alibaba Cloud Computing. All rights reserved. // SPDX-License-Identifier: Apache-2.0 +//! A simple framework to run a vhost-user backend service. + #[macro_use] extern crate log; -use std::error; -use std::fs::File; +use std::fmt::{Display, Formatter}; use std::io; -use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use std::os::unix::prelude::IntoRawFd; use std::result; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::thread; -use vhost::vhost_user::message::{ - VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures, - VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags, - VhostUserVringState, -}; -use vhost::vhost_user::{ - Error as VhostUserError, Listener, Result as VhostUserResult, SlaveFsCacheReq, SlaveListener, - VhostUserSlaveReqHandlerMut, -}; -use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; -use vm_memory::guest_memory::FileOffset; -use vm_memory::{ - GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap, GuestRegionMmap, - MmapRegion, -}; -use virtio_queue::Queue; -use vmm_sys_util::eventfd::EventFd; -const MAX_MEM_SLOTS: u64 = 32; +use vhost::vhost_user::{ + Error as VhostUserError, Listener, SlaveListener, VhostUserSlaveReqHandlerMut, +}; +use vm_memory::bitmap::Bitmap; +use vm_memory::mmap::NewBitmap; +use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap, MmapRegion}; + +use self::handler::VhostUserHandler; + +mod backend; +pub use self::backend::{VhostUserBackend, VhostUserBackendMut}; + +mod event_loop; +pub use self::event_loop::VringEpollHandler; + +mod handler; +pub use self::handler::VhostUserHandlerError; + +mod vring; +pub use self::vring::{Vring, VringState}; + +/// An alias for `GuestMemoryAtomic>` to simplify code. +type GM = GuestMemoryAtomic>; #[derive(Debug)] /// Errors related to vhost-user daemon. @@ -50,106 +53,47 @@ pub enum Error { WaitDaemon(std::boxed::Box), /// Failed handling a vhost-user request. HandleRequest(VhostUserError), - /// Failed to process queue. - ProcessQueue(VringEpollHandlerError), - /// Failed to register listener. - RegisterListener(io::Error), - /// Failed to unregister listener. - UnregisterListener(io::Error), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e), + Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e), + Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e), + Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e), + Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"), + Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e), + } + } } /// Result of vhost-user daemon operations. pub type Result = result::Result; -/// This trait must be implemented by the caller in order to provide backend -/// specific implementation. -pub trait VhostUserBackend: Send + Sync + 'static { - /// Number of queues. - fn num_queues(&self) -> usize; - - /// Depth of each queue. - fn max_queue_size(&self) -> usize; - - /// Available virtio features. - fn features(&self) -> u64; - - /// Acked virtio features. - fn acked_features(&mut self, _features: u64) {} - - /// Virtio protocol features. - fn protocol_features(&self) -> VhostUserProtocolFeatures; - - /// Tell the backend if EVENT_IDX has been negotiated. - fn set_event_idx(&mut self, enabled: bool); - - /// Update guest memory regions. - fn update_memory( - &mut self, - atomic_mem: GuestMemoryAtomic, - ) -> result::Result<(), io::Error>; - - /// This function gets called if the backend registered some additional - /// listeners onto specific file descriptors. The library can handle - /// virtqueues on its own, but does not know what to do with events - /// happening on custom listeners. - fn handle_event( - &self, - device_event: u16, - evset: epoll::Events, - vrings: &[Arc>], - thread_id: usize, - ) -> result::Result; - - /// Get virtio device configuration. - /// A default implementation is provided as we cannot expect all backends - /// to implement this function. - fn get_config(&self, _offset: u32, _size: u32) -> Vec { - Vec::new() - } - - /// Set virtio device configuration. - /// A default implementation is provided as we cannot expect all backends - /// to implement this function. - fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> { - Ok(()) - } - - /// Provide an exit EventFd - /// When this EventFd is written to the worker thread will exit. An optional id may - /// also be provided, if it not provided then the exit event will be first event id - /// after the last queue - fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, Option)> { - None - } - - /// Set slave fd. - /// A default implementation is provided as we cannot expect all backends - /// to implement this function. - fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {} - - fn queues_per_thread(&self) -> Vec { - vec![0xffff_ffff] - } -} - -/// This structure is the public API the backend is allowed to interact with -/// in order to run a fully functional vhost-user daemon. -pub struct VhostUserDaemon { +/// Implement a simple framework to run a vhost-user service daemon. +/// +/// This structure is the public API the backend is allowed to interact with in order to run +/// a fully functional vhost-user daemon. +pub struct VhostUserDaemon, B: Bitmap + 'static = ()> { name: String, - handler: Arc>>, + handler: Arc>>, main_thread: Option>>, } -impl VhostUserDaemon { - /// Create the daemon instance, providing the backend implementation of - /// VhostUserBackend. - /// Under the hood, this will start a dedicated thread responsible for - /// listening onto registered event. Those events can be vring events or - /// custom events from the backend, but they get to be registered later - /// during the sequence. - pub fn new(name: String, backend: Arc>) -> Result { +impl + Clone, B: NewBitmap + Clone + Send + Sync> VhostUserDaemon { + /// Create the daemon instance, providing the backend implementation of `VhostUserBackend`. + /// + /// Under the hood, this will start a dedicated thread responsible for listening onto + /// registered event. Those events can be vring events or custom events from the backend, + /// but they get to be registered later during the sequence. + pub fn new( + name: String, + backend: S, + atomic_mem: GuestMemoryAtomic>, + ) -> Result { let handler = Arc::new(Mutex::new( - VhostUserHandler::new(backend).map_err(Error::NewVhostUserHandler)?, + VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?, )); Ok(VhostUserDaemon { @@ -159,10 +103,11 @@ impl VhostUserDaemon { }) } - /// Connect to the vhost-user socket and run a dedicated thread handling - /// all requests coming through this socket. This runs in an infinite loop - /// that should be terminating once the other end of the socket (the VMM) - /// disconnects. + /// Connect to the vhost-user socket and run a dedicated thread handling all requests coming + /// through this socket. + /// + /// This runs in an infinite loop that should be terminating once the other end of the socket + /// (the VMM) disconnects. pub fn start(&mut self, listener: Listener) -> Result<()> { let mut slave_listener = SlaveListener::new(listener, self.handler.clone()) .map_err(Error::CreateSlaveListener)?; @@ -184,8 +129,7 @@ impl VhostUserDaemon { Ok(()) } - /// Wait for the thread handling the vhost-user socket connection to - /// terminate. + /// Wait for the thread handling the vhost-user socket connection to terminate. pub fn wait(&mut self) -> Result<()> { if let Some(handle) = self.main_thread.take() { match handle.join().map_err(Error::WaitDaemon)? { @@ -198,775 +142,30 @@ impl VhostUserDaemon { } } - /// Retrieve the vring worker. This is necessary to perform further - /// actions like registering and unregistering some extra event file - /// descriptors. - pub fn get_vring_workers(&self) -> Vec> { - self.handler.lock().unwrap().get_vring_workers() + /// Retrieve the vring epoll handler. + /// + /// This is necessary to perform further actions like registering and unregistering some extra + /// event file descriptors. + pub fn get_epoll_handlers(&self) -> Vec>> { + self.handler.lock().unwrap().get_epoll_handlers() } } -struct AddrMapping { - vmm_addr: u64, - size: u64, - gpa_base: u64, -} - -pub struct Vring { - queue: Queue>, - kick: Option, - call: Option, - err: Option, - enabled: bool, -} - -impl Vring { - fn new(atomic_mem: GuestMemoryAtomic, max_queue_size: u16) -> Self { - Vring { - queue: Queue::new(atomic_mem, max_queue_size), - kick: None, - call: None, - err: None, - enabled: false, - } - } - - pub fn mut_queue(&mut self) -> &mut Queue> { - &mut self.queue - } - - pub fn signal_used_queue(&mut self) -> result::Result<(), io::Error> { - if let Some(call) = self.call.as_ref() { - call.write(1) - } else { - Ok(()) - } - } -} - -#[derive(Debug)] -/// Errors related to vring epoll handler. -pub enum VringEpollHandlerError { - /// Failed to process the queue from the backend. - ProcessQueueBackendProcessing(io::Error), - /// Failed to signal used queue. - SignalUsedQueue(io::Error), - /// Failed to read the event from kick EventFd. - HandleEventReadKick(io::Error), - /// Failed to handle the event from the backend. - HandleEventBackendHandling(io::Error), -} - -/// Result of vring epoll handler operations. -type VringEpollHandlerResult = std::result::Result; - -struct VringEpollHandler { - backend: Arc>, - vrings: Vec>>, - exit_event_id: Option, - thread_id: usize, -} - -impl VringEpollHandler { - fn handle_event( - &self, - device_event: u16, - evset: epoll::Events, - ) -> VringEpollHandlerResult { - if self.exit_event_id == Some(device_event) { - return Ok(true); - } - - let num_queues = self.vrings.len(); - if (device_event as usize) < num_queues { - if let Some(kick) = &self.vrings[device_event as usize].read().unwrap().kick { - kick.read() - .map_err(VringEpollHandlerError::HandleEventReadKick)?; - } - - // If the vring is not enabled, it should not be processed. - // The event is only read to be discarded. - if !self.vrings[device_event as usize].read().unwrap().enabled { - return Ok(false); - } - } - - self.backend - .read() - .unwrap() - .handle_event(device_event, evset, &self.vrings, self.thread_id) - .map_err(VringEpollHandlerError::HandleEventBackendHandling) - } -} - -#[derive(Debug)] -/// Errors related to vring worker. -enum VringWorkerError { - /// Failed while waiting for events. - EpollWait(io::Error), - /// Failed to handle the event. - HandleEvent(VringEpollHandlerError), -} - -/// Result of vring worker operations. -type VringWorkerResult = std::result::Result; - -pub struct VringWorker { - epoll_file: File, -} - -impl AsRawFd for VringWorker { - fn as_raw_fd(&self) -> RawFd { - self.epoll_file.as_raw_fd() - } -} - -impl VringWorker { - fn run(&self, handler: VringEpollHandler) -> VringWorkerResult<()> { - const EPOLL_EVENTS_LEN: usize = 100; - let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN]; - - 'epoll: loop { - let num_events = match epoll::wait(self.epoll_file.as_raw_fd(), -1, &mut events[..]) { - Ok(res) => res, - Err(e) => { - if e.kind() == io::ErrorKind::Interrupted { - // It's well defined from the epoll_wait() syscall - // documentation that the epoll loop can be interrupted - // before any of the requested events occurred or the - // timeout expired. In both those cases, epoll_wait() - // returns an error of type EINTR, but this should not - // be considered as a regular error. Instead it is more - // appropriate to retry, by calling into epoll_wait(). - continue; - } - return Err(VringWorkerError::EpollWait(e)); - } - }; - - for event in events.iter().take(num_events) { - let evset = match epoll::Events::from_bits(event.events) { - Some(evset) => evset, - None => { - let evbits = event.events; - println!("epoll: ignoring unknown event set: 0x{:x}", evbits); - continue; - } - }; - - let ev_type = event.data as u16; - - if handler - .handle_event(ev_type, evset) - .map_err(VringWorkerError::HandleEvent)? - { - break 'epoll; - } - } - } - - Ok(()) - } - - /// Register a custom event only meaningful to the caller. When this event - /// is later triggered, and because only the caller knows what to do about - /// it, the backend implementation of `handle_event` will be called. - /// This lets entire control to the caller about what needs to be done for - /// this special event, without forcing it to run its own dedicated epoll - /// loop for it. - pub fn register_listener( - &self, - fd: RawFd, - ev_type: epoll::Events, - data: u64, - ) -> result::Result<(), io::Error> { - epoll::ctl( - self.epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_ADD, - fd, - epoll::Event::new(ev_type, data), - ) - } - - /// Unregister a custom event. If the custom event is triggered after this - /// function has been called, nothing will happen as it will be removed - /// from the list of file descriptors the epoll loop is listening to. - pub fn unregister_listener( - &self, - fd: RawFd, - ev_type: epoll::Events, - data: u64, - ) -> result::Result<(), io::Error> { - epoll::ctl( - self.epoll_file.as_raw_fd(), - epoll::ControlOptions::EPOLL_CTL_DEL, - fd, - epoll::Event::new(ev_type, data), - ) - } -} - -#[derive(Debug)] -/// Errors related to vhost-user handler. -pub enum VhostUserHandlerError { - /// Failed to create epoll file descriptor. - EpollCreateFd(io::Error), - /// Failed to spawn vring worker. - SpawnVringWorker(io::Error), - /// Could not find the mapping from memory regions. - MissingMemoryMapping, - /// Could not register exit event - RegisterExitEvent(io::Error), -} - -impl std::fmt::Display for VhostUserHandlerError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - VhostUserHandlerError::EpollCreateFd(e) => write!(f, "failed creating epoll fd: {}", e), - VhostUserHandlerError::SpawnVringWorker(e) => { - write!(f, "failed spawning the vring worker: {}", e) - } - VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"), - VhostUserHandlerError::RegisterExitEvent(e) => { - write!(f, "Failed to register exit event: {}", e) - } - } - } -} - -impl error::Error for VhostUserHandlerError {} - -/// Result of vhost-user handler operations. -type VhostUserHandlerResult = std::result::Result; - -struct VhostUserHandler { - backend: Arc>, - workers: Vec>, - owned: bool, - features_acked: bool, - acked_features: u64, - acked_protocol_features: u64, - num_queues: usize, - max_queue_size: usize, - queues_per_thread: Vec, - mappings: Vec, - atomic_mem: GuestMemoryAtomic, - vrings: Vec>>, - worker_threads: Vec>>, -} - -impl VhostUserHandler { - fn new(backend: Arc>) -> VhostUserHandlerResult { - let num_queues = backend.read().unwrap().num_queues(); - let max_queue_size = backend.read().unwrap().max_queue_size(); - let queues_per_thread = backend.read().unwrap().queues_per_thread(); - - let atomic_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); - - let mut vrings: Vec>> = Vec::new(); - for _ in 0..num_queues { - let vring = Arc::new(RwLock::new(Vring::new( - atomic_mem.clone(), - max_queue_size as u16, - ))); - vrings.push(vring); - } - - let mut workers = Vec::new(); - let mut worker_threads = Vec::new(); - for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() { - // Create the epoll file descriptor - let epoll_fd = epoll::create(true).map_err(VhostUserHandlerError::EpollCreateFd)?; - // Use 'File' to enforce closing on 'epoll_fd' - let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; - - let vring_worker = Arc::new(VringWorker { epoll_file }); - let worker = vring_worker.clone(); - - let exit_event_id = if let Some((exit_event_fd, exit_event_id)) = - backend.read().unwrap().exit_event(thread_id) - { - let exit_event_id = exit_event_id.unwrap_or(num_queues as u16); - worker - .register_listener( - exit_event_fd.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(exit_event_id), - ) - .map_err(VhostUserHandlerError::RegisterExitEvent)?; - Some(exit_event_id) - } else { - None - }; - - let mut thread_vrings: Vec>> = Vec::new(); - for (index, vring) in vrings.iter().enumerate() { - if (queues_mask >> index) & 1u64 == 1u64 { - thread_vrings.push(vring.clone()); - } - } - - let vring_handler = VringEpollHandler { - backend: backend.clone(), - vrings: thread_vrings, - exit_event_id, - thread_id, - }; - - let worker_thread = thread::Builder::new() - .name("vring_worker".to_string()) - .spawn(move || vring_worker.run(vring_handler)) - .map_err(VhostUserHandlerError::SpawnVringWorker)?; - - workers.push(worker); - worker_threads.push(worker_thread); - } - - Ok(VhostUserHandler { - backend, - workers, - owned: false, - features_acked: false, - acked_features: 0, - acked_protocol_features: 0, - num_queues, - max_queue_size, - queues_per_thread, - mappings: Vec::new(), - atomic_mem, - vrings, - worker_threads, - }) - } - - fn get_vring_workers(&self) -> Vec> { - self.workers.clone() - } - - fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult { - for mapping in self.mappings.iter() { - if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size { - return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base); - } - } - - Err(VhostUserHandlerError::MissingMemoryMapping) - } -} - -impl VhostUserSlaveReqHandlerMut for VhostUserHandler { - fn set_owner(&mut self) -> VhostUserResult<()> { - if self.owned { - return Err(VhostUserError::InvalidOperation); - } - self.owned = true; - Ok(()) - } - - fn reset_owner(&mut self) -> VhostUserResult<()> { - self.owned = false; - self.features_acked = false; - self.acked_features = 0; - self.acked_protocol_features = 0; - Ok(()) - } - - fn get_features(&mut self) -> VhostUserResult { - Ok(self.backend.read().unwrap().features()) - } - - fn set_features(&mut self, features: u64) -> VhostUserResult<()> { - if (features & !self.backend.read().unwrap().features()) != 0 { - return Err(VhostUserError::InvalidParam); - } - - self.acked_features = features; - self.features_acked = true; - - // If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated, - // the ring is initialized in an enabled state. - // If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated, - // the ring is initialized in a disabled state. Client must not - // pass data to/from the backend until ring is enabled by - // VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has - // been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0. - let vring_enabled = - self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0; - for vring in self.vrings.iter_mut() { - vring.write().unwrap().enabled = vring_enabled; - } - - self.backend - .write() - .unwrap() - .acked_features(self.acked_features); - - Ok(()) - } - - fn get_protocol_features(&mut self) -> VhostUserResult { - Ok(self.backend.read().unwrap().protocol_features()) - } - - fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> { - // Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must - // support this message even before VHOST_USER_SET_FEATURES was - // called. - self.acked_protocol_features = features; - Ok(()) - } - - fn set_mem_table( - &mut self, - ctx: &[VhostUserMemoryRegion], - files: Vec, - ) -> VhostUserResult<()> { - // We need to create tuple of ranges from the list of VhostUserMemoryRegion - // that we get from the caller. - let mut regions: Vec<(GuestAddress, usize, Option)> = Vec::new(); - let mut mappings: Vec = Vec::new(); - - for (region, file) in ctx.iter().zip(files) { - let g_addr = GuestAddress(region.guest_phys_addr); - let len = region.memory_size as usize; - let f_off = FileOffset::new(file, region.mmap_offset); - - regions.push((g_addr, len, Some(f_off))); - mappings.push(AddrMapping { - vmm_addr: region.user_addr, - size: region.memory_size, - gpa_base: region.guest_phys_addr, - }); - } - - let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - - // Updating the inner GuestMemory object here will cause all our vrings to - // see the new one the next time they call to `atomic_mem.memory()`. - self.atomic_mem.lock().unwrap().replace(mem); - - self.backend - .write() - .unwrap() - .update_memory(self.atomic_mem.clone()) - .map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - self.mappings = mappings; - - Ok(()) - } - - fn get_queue_num(&mut self) -> VhostUserResult { - Ok(self.num_queues as u64) - } - - fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> { - if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size { - return Err(VhostUserError::InvalidParam); - } - self.vrings[index as usize].write().unwrap().queue.size = num as u16; - Ok(()) - } - - fn set_vring_addr( - &mut self, - index: u32, - _flags: VhostUserVringAddrFlags, - descriptor: u64, - used: u64, - available: u64, - _log: u64, - ) -> VhostUserResult<()> { - if index as usize >= self.num_queues { - return Err(VhostUserError::InvalidParam); - } - - if !self.mappings.is_empty() { - let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - let used_ring = self.vmm_va_to_gpa(used).map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - self.vrings[index as usize] - .write() - .unwrap() - .queue - .desc_table = GuestAddress(desc_table); - self.vrings[index as usize] - .write() - .unwrap() - .queue - .avail_ring = GuestAddress(avail_ring); - self.vrings[index as usize].write().unwrap().queue.used_ring = GuestAddress(used_ring); - Ok(()) - } else { - Err(VhostUserError::InvalidParam) - } - } - - fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> { - self.vrings[index as usize] - .write() - .unwrap() - .queue - .set_next_avail(base as u16); - - let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - self.vrings[index as usize] - .write() - .unwrap() - .mut_queue() - .set_event_idx(event_idx); - self.backend.write().unwrap().set_event_idx(event_idx); - Ok(()) - } - - fn get_vring_base(&mut self, index: u32) -> VhostUserResult { - if index as usize >= self.num_queues { - return Err(VhostUserError::InvalidParam); - } - // Quote from vhost-user specification: - // Client must start ring upon receiving a kick (that is, detecting - // that file descriptor is readable) on the descriptor specified by - // VHOST_USER_SET_VRING_KICK, and stop ring upon receiving - // VHOST_USER_GET_VRING_BASE. - self.vrings[index as usize].write().unwrap().queue.ready = false; - if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() { - for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { - let shifted_queues_mask = queues_mask >> index; - if shifted_queues_mask & 1u64 == 1u64 { - let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); - self.workers[thread_index] - .unregister_listener( - fd.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(evt_idx), - ) - .map_err(VhostUserError::ReqHandlerError)?; - break; - } - } - } - - let next_avail = self.vrings[index as usize] - .read() - .unwrap() - .queue - .next_avail(); - - Ok(VhostUserVringState::new(index, u32::from(next_avail))) - } - - fn set_vring_kick(&mut self, index: u8, file: Option) -> VhostUserResult<()> { - if index as usize >= self.num_queues { - return Err(VhostUserError::InvalidParam); - } - - // SAFETY: EventFd requires that it has sole ownership of its fd. So - // does File, so this is safe. - // Ideally, we'd have a generic way to refer to a uniquely-owned fd, - // such as that proposed by Rust RFC #3128. - self.vrings[index as usize].write().unwrap().kick = - file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); - - // Quote from vhost-user specification: - // Client must start ring upon receiving a kick (that is, detecting - // that file descriptor is readable) on the descriptor specified by - // VHOST_USER_SET_VRING_KICK, and stop ring upon receiving - // VHOST_USER_GET_VRING_BASE. - self.vrings[index as usize].write().unwrap().queue.ready = true; - if let Some(fd) = self.vrings[index as usize].read().unwrap().kick.as_ref() { - for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() { - let shifted_queues_mask = queues_mask >> index; - if shifted_queues_mask & 1u64 == 1u64 { - let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones(); - self.workers[thread_index] - .register_listener( - fd.as_raw_fd(), - epoll::Events::EPOLLIN, - u64::from(evt_idx), - ) - .map_err(VhostUserError::ReqHandlerError)?; - break; - } - } - } - - Ok(()) - } - - fn set_vring_call(&mut self, index: u8, file: Option) -> VhostUserResult<()> { - if index as usize >= self.num_queues { - return Err(VhostUserError::InvalidParam); - } - - // SAFETY: see comment in set_vring_kick() - self.vrings[index as usize].write().unwrap().call = - file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); - - Ok(()) - } - - fn set_vring_err(&mut self, index: u8, file: Option) -> VhostUserResult<()> { - if index as usize >= self.num_queues { - return Err(VhostUserError::InvalidParam); - } - - // SAFETY: see comment in set_vring_kick() - self.vrings[index as usize].write().unwrap().err = - file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); - - Ok(()) - } - - fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> { - // This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES - // has been negotiated. - if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 { - return Err(VhostUserError::InvalidOperation); - } else if index as usize >= self.num_queues { - return Err(VhostUserError::InvalidParam); - } - - // Slave must not pass data to/from the backend until ring is - // enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1, - // or after it has been disabled by VHOST_USER_SET_VRING_ENABLE - // with parameter 0. - self.vrings[index as usize].write().unwrap().enabled = enable; - - Ok(()) - } - - fn get_config( - &mut self, - offset: u32, - size: u32, - _flags: VhostUserConfigFlags, - ) -> VhostUserResult> { - Ok(self.backend.read().unwrap().get_config(offset, size)) - } - - fn set_config( - &mut self, - offset: u32, - buf: &[u8], - _flags: VhostUserConfigFlags, - ) -> VhostUserResult<()> { - self.backend - .write() - .unwrap() - .set_config(offset, buf) - .map_err(VhostUserError::ReqHandlerError) - } - - fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) { - if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 { - vu_req.set_reply_ack_flag(true); - } - - self.backend.write().unwrap().set_slave_req_fd(vu_req); - } - - fn get_max_mem_slots(&mut self) -> VhostUserResult { - Ok(MAX_MEM_SLOTS) - } - - fn add_mem_region( - &mut self, - region: &VhostUserSingleMemoryRegion, - file: File, - ) -> VhostUserResult<()> { - let mmap_region = MmapRegion::from_file( - FileOffset::new(file, region.mmap_offset), - region.memory_size as usize, - ) - .map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?; - let guest_region = Arc::new( - GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err( - |e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)), - )?, +#[cfg(test)] +mod tests { + use super::backend::tests::MockVhostBackend; + use super::*; + use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; + + #[test] + fn test_new_daemon() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), ); + let backend = Arc::new(Mutex::new(MockVhostBackend::new())); + let daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap(); - let mem = self - .atomic_mem - .memory() - .insert_region(guest_region) - .map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - - self.atomic_mem.lock().unwrap().replace(mem); - - self.backend - .write() - .unwrap() - .update_memory(self.atomic_mem.clone()) - .map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - - self.mappings.push(AddrMapping { - vmm_addr: region.user_addr, - size: region.memory_size, - gpa_base: region.guest_phys_addr, - }); - - Ok(()) - } - - fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> { - let (mem, _) = self - .atomic_mem - .memory() - .remove_region(GuestAddress(region.guest_phys_addr), region.memory_size) - .map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - - self.atomic_mem.lock().unwrap().replace(mem); - - self.backend - .write() - .unwrap() - .update_memory(self.atomic_mem.clone()) - .map_err(|e| { - VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)) - })?; - - self.mappings - .retain(|mapping| mapping.gpa_base != region.guest_phys_addr); - - Ok(()) - } - - fn get_inflight_fd( - &mut self, - _inflight: &vhost::vhost_user::message::VhostUserInflight, - ) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> { - // Assume the backend hasn't negotiated the inflight feature; it - // wouldn't be correct for the backend to do so, as we don't (yet) - // provide a way for it to handle such requests. - Err(VhostUserError::InvalidOperation) - } - - fn set_inflight_fd(&mut self, _inflight: &vhost::vhost_user::message::VhostUserInflight, _file: File) -> VhostUserResult<()> { - Err(VhostUserError::InvalidOperation) - } -} - -impl Drop for VhostUserHandler { - fn drop(&mut self) { - for thread in self.worker_threads.drain(..) { - if let Err(e) = thread.join() { - error!("Error in vring worker: {:?}", e); - } - } + assert_eq!(daemon.get_epoll_handlers().len(), 2); + //daemon.start(Listener::new()).unwrap(); } } diff --git a/src/vring.rs b/src/vring.rs new file mode 100644 index 0000000..31bf4a7 --- /dev/null +++ b/src/vring.rs @@ -0,0 +1,253 @@ +// Copyright 2019 Intel Corporation. All Rights Reserved. +// Copyright 2021 Alibaba Cloud Computing. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Struct to maintain state information and manipulate vhost-user queues. + +use std::fs::File; +use std::io; +use std::os::unix::io::{FromRawFd, IntoRawFd}; +use std::result::Result; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use virtio_queue::{Error as VirtQueError, Queue}; +use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::eventfd::EventFd; + +/// Struct to maintain raw state information for a vhost-user queue. +pub struct VringState { + queue: Queue, + kick: Option, + call: Option, + err: Option, + enabled: bool, +} + +impl VringState { + fn new(mem: M, max_queue_size: u16) -> Self { + VringState { + queue: Queue::new(mem, max_queue_size), + kick: None, + call: None, + err: None, + enabled: false, + } + } + + /// Get a mutable reference to the underlying raw `Queue` object. + pub fn get_queue_mut(&mut self) -> &mut Queue { + &mut self.queue + } + + /// Get a immutable reference to the kick event fd. + pub fn get_kick(&self) -> &Option { + &self.kick + } +} + +/// Struct to maintain state information and manipulate a vhost-user queue. +#[derive(Clone)] +pub struct Vring> { + state: Arc>>, +} + +impl Vring { + /// Get a immutable guard to the underlying raw `VringState` object. + pub fn get_ref(&self) -> RwLockReadGuard> { + self.state.read().unwrap() + } + + /// Get a mutable guard to the underlying raw `VringState` object. + pub fn get_mut(&self) -> RwLockWriteGuard> { + self.state.write().unwrap() + } + + /// Add an used descriptor into the used queue. + pub fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> { + self.get_mut().get_queue_mut().add_used(desc_index, len) + } + + /// Notify the vhost-user master that used descriptors have been put into the used queue. + pub fn signal_used_queue(&self) -> io::Result<()> { + if let Some(call) = self.get_ref().call.as_ref() { + call.write(1) + } else { + Ok(()) + } + } + + /// Enable event notification for queue. + pub fn enable_notification(&self) -> Result { + self.get_mut().get_queue_mut().enable_notification() + } + + /// Disable event notification for queue. + pub fn disable_notification(&self) -> Result<(), VirtQueError> { + self.get_mut().get_queue_mut().disable_notification() + } + + /// Check whether a notification to the guest is needed. + pub fn needs_notification(&self) -> Result { + self.get_mut().get_queue_mut().needs_notification() + } + + pub(crate) fn new(mem: M, max_queue_size: u16) -> Self { + Vring { + state: Arc::new(RwLock::new(VringState::new(mem, max_queue_size))), + } + } + + pub(crate) fn set_enabled(&self, enabled: bool) { + self.get_mut().enabled = enabled; + } + + pub(crate) fn set_queue_info(&self, desc_table: u64, avail_ring: u64, used_ring: u64) { + let mut state = self.get_mut(); + + state.queue.desc_table = GuestAddress(desc_table); + state.queue.avail_ring = GuestAddress(avail_ring); + state.queue.used_ring = GuestAddress(used_ring); + } + + pub(crate) fn queue_next_avail(&self) -> u16 { + self.get_ref().queue.next_avail() + } + + pub(crate) fn set_queue_next_avail(&self, base: u16) { + self.get_mut().queue.set_next_avail(base); + } + + pub(crate) fn set_queue_size(&self, num: u16) { + self.get_mut().queue.size = num; + } + + pub(crate) fn set_queue_event_idx(&self, enabled: bool) { + self.get_mut().queue.set_event_idx(enabled); + } + + pub(crate) fn set_queue_ready(&self, ready: bool) { + self.get_mut().queue.ready = ready; + } + + pub(crate) fn set_kick(&self, file: Option) { + // SAFETY: + // EventFd requires that it has sole ownership of its fd. So does File, so this is safe. + // Ideally, we'd have a generic way to refer to a uniquely-owned fd, such as that proposed + // by Rust RFC #3128. + self.get_mut().kick = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); + } + + pub(crate) fn read_kick(&self) -> io::Result { + let state = self.get_ref(); + + if let Some(kick) = &state.kick { + kick.read()?; + } + + Ok(state.enabled) + } + + pub(crate) fn set_call(&self, file: Option) { + // SAFETY: see comment in set_kick() + self.get_mut().call = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); + } + + pub(crate) fn set_err(&self, file: Option) { + // SAFETY: see comment in set_kick() + self.get_mut().err = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::os::unix::io::AsRawFd; + use vm_memory::bitmap::AtomicBitmap; + use vmm_sys_util::eventfd::EventFd; + + #[test] + fn test_new_vring() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::::from_ranges(&[(GuestAddress(0x100000), 0x10000)]) + .unwrap(), + ); + let vring = Vring::new(mem, 0x1000); + + assert!(vring.get_ref().get_kick().is_none()); + assert_eq!(vring.get_ref().enabled, false); + assert_eq!(vring.get_mut().get_queue_mut().ready, false); + assert_eq!(vring.get_mut().get_queue_mut().event_idx_enabled, false); + + vring.set_enabled(true); + assert_eq!(vring.get_ref().enabled, true); + + vring.set_queue_info(0x100100, 0x100200, 0x100300); + assert_eq!( + vring.get_mut().get_queue_mut().desc_table, + GuestAddress(0x100100) + ); + assert_eq!( + vring.get_mut().get_queue_mut().avail_ring, + GuestAddress(0x100200) + ); + assert_eq!( + vring.get_mut().get_queue_mut().used_ring, + GuestAddress(0x100300) + ); + + assert_eq!(vring.queue_next_avail(), 0); + vring.set_queue_next_avail(0x20); + assert_eq!(vring.queue_next_avail(), 0x20); + + vring.set_queue_size(0x200); + assert_eq!(vring.get_mut().get_queue_mut().size, 0x200); + + vring.set_queue_event_idx(true); + assert_eq!(vring.get_mut().get_queue_mut().event_idx_enabled, true); + + vring.set_queue_ready(true); + assert_eq!(vring.get_mut().get_queue_mut().ready, true); + } + + #[test] + fn test_vring_set_fd() { + let mem = GuestMemoryAtomic::new( + GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(), + ); + let vring = Vring::new(mem, 0x1000); + + vring.set_enabled(true); + assert_eq!(vring.get_ref().enabled, true); + + let eventfd = EventFd::new(0).unwrap(); + let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + assert!(vring.get_ref().kick.is_none()); + assert_eq!(vring.read_kick().unwrap(), true); + vring.set_kick(Some(file)); + eventfd.write(1).unwrap(); + assert_eq!(vring.read_kick().unwrap(), true); + assert!(vring.get_ref().kick.is_some()); + vring.set_kick(None); + assert!(vring.get_ref().kick.is_none()); + std::mem::forget(eventfd); + + let eventfd = EventFd::new(0).unwrap(); + let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + assert!(vring.get_ref().call.is_none()); + vring.set_call(Some(file)); + assert!(vring.get_ref().call.is_some()); + vring.set_call(None); + assert!(vring.get_ref().call.is_none()); + std::mem::forget(eventfd); + + let eventfd = EventFd::new(0).unwrap(); + let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) }; + assert!(vring.get_ref().err.is_none()); + vring.set_err(Some(file)); + assert!(vring.get_ref().err.is_some()); + vring.set_err(None); + assert!(vring.get_ref().err.is_none()); + std::mem::forget(eventfd); + } +}