Refine VhostUserBackend for multi-threading
Refine VhostUserBackend to support interior mutability to better
support multi-threading. The previous version of VhostUserBackend
has been renamed as VhostUserBackendMut, with an implementatio of
impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> {
}
to ease transition.
The change also improves code readability.
Signed-off-by: Liu Jiang <gerry@linux.alibaba.com>
This commit is contained in:
parent
a001824c19
commit
cecd4ba565
2 changed files with 510 additions and 77 deletions
508
src/backend.rs
508
src/backend.rs
|
|
@ -3,9 +3,25 @@
|
|||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
//! Traits for vhost user backend servers to implement virtio data plain services.
|
||||
//!
|
||||
//! Define two traits for vhost user backend servers to implement virtio data plane services.
|
||||
//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is
|
||||
//! designed with interior mutability, so the implementor may choose the suitable way to protect
|
||||
//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior
|
||||
//! mutability, and an implementation of:
|
||||
//! ```ignore
|
||||
//! impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> { }
|
||||
//! ```
|
||||
//! is provided for convenience.
|
||||
//!
|
||||
//! [VhostUserBackend]: trait.VhostUserBackend.html
|
||||
//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html
|
||||
|
||||
use std::io;
|
||||
use std::ops::Deref;
|
||||
use std::result;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use vhost::vhost_user::message::VhostUserProtocolFeatures;
|
||||
use vhost::vhost_user::SlaveFsCacheReq;
|
||||
|
|
@ -14,37 +30,82 @@ use vmm_sys_util::eventfd::EventFd;
|
|||
|
||||
use super::Vring;
|
||||
|
||||
/// This trait must be implemented by the caller in order to provide backend
|
||||
/// specific implementation.
|
||||
/// Trait with interior mutability for vhost user backend servers to implement concrete services.
|
||||
///
|
||||
/// To support multi-threading and asynchronous IO, we enforce `the Send + Sync + 'static`.
|
||||
/// So there's no plan for support of "Rc<T>" and "RefCell<T>".
|
||||
pub trait VhostUserBackend: Send + Sync + 'static {
|
||||
/// Number of queues.
|
||||
/// Get number of queues supported.
|
||||
fn num_queues(&self) -> usize;
|
||||
|
||||
/// Depth of each queue.
|
||||
/// Get maximum queue size supported.
|
||||
fn max_queue_size(&self) -> usize;
|
||||
|
||||
/// Available virtio features.
|
||||
/// Get available virtio features.
|
||||
fn features(&self) -> u64;
|
||||
|
||||
/// Acked virtio features.
|
||||
fn acked_features(&mut self, _features: u64) {}
|
||||
/// Set acknowledged virtio features.
|
||||
fn acked_features(&self, _features: u64) {}
|
||||
|
||||
/// Virtio protocol features.
|
||||
/// Get available vhost protocol features.
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures;
|
||||
|
||||
/// Tell the backend if EVENT_IDX has been negotiated.
|
||||
fn set_event_idx(&mut self, enabled: bool);
|
||||
/// Enable or disable the virtio EVENT_IDX feature
|
||||
fn set_event_idx(&self, enabled: bool);
|
||||
|
||||
/// Get virtio device configuration.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Set virtio device configuration.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn set_config(&self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update guest memory regions.
|
||||
fn update_memory(
|
||||
&mut self,
|
||||
&self,
|
||||
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> result::Result<(), io::Error>;
|
||||
|
||||
/// This function gets called if the backend registered some additional
|
||||
/// listeners onto specific file descriptors. The library can handle
|
||||
/// virtqueues on its own, but does not know what to do with events
|
||||
/// happening on custom listeners.
|
||||
/// Set handler for communicating with the master by the slave communication channel.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
///
|
||||
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
|
||||
fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {}
|
||||
|
||||
/// Get the map to map queue index to worker thread index.
|
||||
///
|
||||
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
|
||||
/// the following two queues will be handled by worker thread 1, and the last four queues will
|
||||
/// be handled by worker thread 2.
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
vec![0xffff_ffff]
|
||||
}
|
||||
|
||||
/// Provide an optional exit EventFd for the specified worker thread.
|
||||
///
|
||||
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
|
||||
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
|
||||
/// the worker thread will exit.
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Handle IO events for backend registered file descriptors.
|
||||
///
|
||||
/// This function gets called if the backend registered some additional listeners onto specific
|
||||
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
|
||||
/// do with events happening on custom listeners.
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
|
|
@ -52,35 +113,422 @@ pub trait VhostUserBackend: Send + Sync + 'static {
|
|||
vrings: &[Arc<RwLock<Vring>>],
|
||||
thread_id: usize,
|
||||
) -> result::Result<bool, io::Error>;
|
||||
}
|
||||
|
||||
/// Trait without interior mutability for vhost user backend servers to implement concrete services.
|
||||
pub trait VhostUserBackendMut: Send + Sync + 'static {
|
||||
/// Get number of queues supported.
|
||||
fn num_queues(&self) -> usize;
|
||||
|
||||
/// Get maximum queue size supported.
|
||||
fn max_queue_size(&self) -> usize;
|
||||
|
||||
/// Get available virtio features.
|
||||
fn features(&self) -> u64;
|
||||
|
||||
/// Set acknowledged virtio features.
|
||||
fn acked_features(&mut self, _features: u64) {}
|
||||
|
||||
/// Get available vhost protocol features.
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures;
|
||||
|
||||
/// Enable or disable the virtio EVENT_IDX feature
|
||||
fn set_event_idx(&mut self, enabled: bool);
|
||||
|
||||
/// Get virtio device configuration.
|
||||
/// A default implementation is provided as we cannot expect all backends
|
||||
/// to implement this function.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Set virtio device configuration.
|
||||
/// A default implementation is provided as we cannot expect all backends
|
||||
/// to implement this function.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Provide an exit EventFd
|
||||
/// When this EventFd is written to the worker thread will exit. An optional id may
|
||||
/// also be provided, if it not provided then the exit event will be first event id
|
||||
/// after the last queue
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, Option<u16>)> {
|
||||
None
|
||||
}
|
||||
/// Update guest memory regions.
|
||||
fn update_memory(
|
||||
&mut self,
|
||||
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> result::Result<(), io::Error>;
|
||||
|
||||
/// Set slave fd.
|
||||
/// A default implementation is provided as we cannot expect all backends
|
||||
/// to implement this function.
|
||||
/// Set handler for communicating with the master by the slave communication channel.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
///
|
||||
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
|
||||
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||
|
||||
/// Get the map to map queue index to worker thread index.
|
||||
///
|
||||
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
|
||||
/// the following two queues will be handled by worker thread 1, and the last four queues will
|
||||
/// be handled by worker thread 2.
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
vec![0xffff_ffff]
|
||||
}
|
||||
|
||||
/// Provide an optional exit EventFd for the specified worker thread.
|
||||
///
|
||||
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
|
||||
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
|
||||
/// the worker thread will exit.
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Handle IO events for backend registered file descriptors.
|
||||
///
|
||||
/// This function gets called if the backend registered some additional listeners onto specific
|
||||
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
|
||||
/// do with events happening on custom listeners.
|
||||
fn handle_event(
|
||||
&mut self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Arc<RwLock<Vring>>],
|
||||
thread_id: usize,
|
||||
) -> result::Result<bool, io::Error>;
|
||||
}
|
||||
|
||||
impl<T: VhostUserBackend> VhostUserBackend for Arc<T> {
|
||||
fn num_queues(&self) -> usize {
|
||||
self.deref().num_queues()
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
self.deref().max_queue_size()
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
self.deref().features()
|
||||
}
|
||||
|
||||
fn acked_features(&self, features: u64) {
|
||||
self.deref().acked_features(features)
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
self.deref().protocol_features()
|
||||
}
|
||||
|
||||
fn set_event_idx(&self, enabled: bool) {
|
||||
self.deref().set_event_idx(enabled)
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
self.deref().get_config(offset, size)
|
||||
}
|
||||
|
||||
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
self.deref().set_config(offset, buf)
|
||||
}
|
||||
|
||||
fn update_memory(
|
||||
&self,
|
||||
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> Result<(), io::Error> {
|
||||
self.deref().update_memory(atomic_mem)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||
self.deref().set_slave_req_fd(vu_req)
|
||||
}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
self.deref().queues_per_thread()
|
||||
}
|
||||
|
||||
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
self.deref().exit_event(thread_index)
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Arc<RwLock<Vring>>],
|
||||
thread_id: usize,
|
||||
) -> Result<bool, io::Error> {
|
||||
self.deref()
|
||||
.handle_event(device_event, evset, vrings, thread_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: VhostUserBackendMut> VhostUserBackend for Mutex<T> {
|
||||
fn num_queues(&self) -> usize {
|
||||
self.lock().unwrap().num_queues()
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
self.lock().unwrap().max_queue_size()
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
self.lock().unwrap().features()
|
||||
}
|
||||
|
||||
fn acked_features(&self, features: u64) {
|
||||
self.lock().unwrap().acked_features(features)
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
self.lock().unwrap().protocol_features()
|
||||
}
|
||||
|
||||
fn set_event_idx(&self, enabled: bool) {
|
||||
self.lock().unwrap().set_event_idx(enabled)
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
self.lock().unwrap().get_config(offset, size)
|
||||
}
|
||||
|
||||
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
self.lock().unwrap().set_config(offset, buf)
|
||||
}
|
||||
|
||||
fn update_memory(
|
||||
&self,
|
||||
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> Result<(), io::Error> {
|
||||
self.lock().unwrap().update_memory(atomic_mem)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||
self.lock().unwrap().set_slave_req_fd(vu_req)
|
||||
}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
self.lock().unwrap().queues_per_thread()
|
||||
}
|
||||
|
||||
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
self.lock().unwrap().exit_event(thread_index)
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Arc<RwLock<Vring>>],
|
||||
thread_id: usize,
|
||||
) -> Result<bool, io::Error> {
|
||||
self.lock()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset, vrings, thread_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> {
|
||||
fn num_queues(&self) -> usize {
|
||||
self.read().unwrap().num_queues()
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
self.read().unwrap().max_queue_size()
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
self.read().unwrap().features()
|
||||
}
|
||||
|
||||
fn acked_features(&self, features: u64) {
|
||||
self.write().unwrap().acked_features(features)
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
self.read().unwrap().protocol_features()
|
||||
}
|
||||
|
||||
fn set_event_idx(&self, enabled: bool) {
|
||||
self.write().unwrap().set_event_idx(enabled)
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
self.read().unwrap().get_config(offset, size)
|
||||
}
|
||||
|
||||
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
self.write().unwrap().set_config(offset, buf)
|
||||
}
|
||||
|
||||
fn update_memory(
|
||||
&self,
|
||||
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> Result<(), io::Error> {
|
||||
self.write().unwrap().update_memory(atomic_mem)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||
self.write().unwrap().set_slave_req_fd(vu_req)
|
||||
}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
self.read().unwrap().queues_per_thread()
|
||||
}
|
||||
|
||||
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
self.read().unwrap().exit_event(thread_index)
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Arc<RwLock<Vring>>],
|
||||
thread_id: usize,
|
||||
) -> Result<bool, io::Error> {
|
||||
self.write()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset, vrings, thread_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use epoll::Events;
|
||||
use std::io::Error;
|
||||
use std::sync::Mutex;
|
||||
|
||||
struct MockVhostBackend {
|
||||
events: u64,
|
||||
event_idx: bool,
|
||||
acked_features: u64,
|
||||
}
|
||||
|
||||
impl MockVhostBackend {
|
||||
fn new() -> Self {
|
||||
MockVhostBackend {
|
||||
events: 0,
|
||||
event_idx: false,
|
||||
acked_features: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl VhostUserBackendMut for MockVhostBackend {
|
||||
fn num_queues(&self) -> usize {
|
||||
2
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
256
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
0xffff_ffff_ffff_ffff
|
||||
}
|
||||
|
||||
fn acked_features(&mut self, features: u64) {
|
||||
self.acked_features = features;
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
VhostUserProtocolFeatures::all()
|
||||
}
|
||||
|
||||
fn set_event_idx(&mut self, enabled: bool) {
|
||||
self.event_idx = enabled;
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
assert_eq!(offset, 0x200);
|
||||
assert_eq!(size, 8);
|
||||
|
||||
vec![0xa5u8; 8]
|
||||
}
|
||||
|
||||
fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<(), Error> {
|
||||
assert_eq!(offset, 0x200);
|
||||
assert_eq!(buf.len(), 8);
|
||||
assert_eq!(buf, &[0xa5u8; 8]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_memory(
|
||||
&mut self,
|
||||
_atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
vec![1, 1]
|
||||
}
|
||||
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
let event_fd = EventFd::new(0).unwrap();
|
||||
|
||||
Some((event_fd, 0x100))
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&mut self,
|
||||
_device_event: u16,
|
||||
_evset: Events,
|
||||
_vrings: &[Arc<RwLock<Vring>>],
|
||||
_thread_id: usize,
|
||||
) -> Result<bool, Error> {
|
||||
self.events += 1;
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_mock_backend_mutex() {
|
||||
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||
|
||||
assert_eq!(backend.num_queues(), 2);
|
||||
assert_eq!(backend.max_queue_size(), 256);
|
||||
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
|
||||
assert_eq!(
|
||||
backend.protocol_features(),
|
||||
VhostUserProtocolFeatures::all()
|
||||
);
|
||||
assert_eq!(backend.queues_per_thread(), [1, 1]);
|
||||
|
||||
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
|
||||
backend.set_config(0x200, &vec![0xa5; 8]).unwrap();
|
||||
|
||||
backend.acked_features(0xffff);
|
||||
assert_eq!(backend.lock().unwrap().acked_features, 0xffff);
|
||||
|
||||
backend.set_event_idx(true);
|
||||
assert_eq!(backend.lock().unwrap().event_idx, true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_mock_backend_rwlock() {
|
||||
let backend = Arc::new(RwLock::new(MockVhostBackend::new()));
|
||||
|
||||
assert_eq!(backend.num_queues(), 2);
|
||||
assert_eq!(backend.max_queue_size(), 256);
|
||||
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
|
||||
assert_eq!(
|
||||
backend.protocol_features(),
|
||||
VhostUserProtocolFeatures::all()
|
||||
);
|
||||
assert_eq!(backend.queues_per_thread(), [1, 1]);
|
||||
|
||||
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
|
||||
backend.set_config(0x200, &vec![0xa5; 8]).unwrap();
|
||||
|
||||
backend.acked_features(0xffff);
|
||||
assert_eq!(backend.read().unwrap().acked_features, 0xffff);
|
||||
|
||||
backend.set_event_idx(true);
|
||||
assert_eq!(backend.read().unwrap().event_idx, true);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
79
src/lib.rs
79
src/lib.rs
|
|
@ -34,8 +34,8 @@ use vm_memory::{
|
|||
};
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
mod backend;
|
||||
pub use backend::VhostUserBackend;
|
||||
pub mod backend;
|
||||
pub use backend::{VhostUserBackend, VhostUserBackendMut};
|
||||
|
||||
const MAX_MEM_SLOTS: u64 = 32;
|
||||
|
||||
|
|
@ -73,14 +73,14 @@ pub struct VhostUserDaemon<S: VhostUserBackend> {
|
|||
main_thread: Option<thread::JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend> VhostUserDaemon<S> {
|
||||
impl<S: VhostUserBackend + Clone> VhostUserDaemon<S> {
|
||||
/// Create the daemon instance, providing the backend implementation of
|
||||
/// VhostUserBackend.
|
||||
/// Under the hood, this will start a dedicated thread responsible for
|
||||
/// listening onto registered event. Those events can be vring events or
|
||||
/// custom events from the backend, but they get to be registered later
|
||||
/// during the sequence.
|
||||
pub fn new(name: String, backend: Arc<RwLock<S>>) -> Result<Self> {
|
||||
pub fn new(name: String, backend: S) -> Result<Self> {
|
||||
let handler = Arc::new(Mutex::new(
|
||||
VhostUserHandler::new(backend).map_err(Error::NewVhostUserHandler)?,
|
||||
));
|
||||
|
|
@ -194,7 +194,7 @@ pub enum VringEpollHandlerError {
|
|||
type VringEpollHandlerResult<T> = std::result::Result<T, VringEpollHandlerError>;
|
||||
|
||||
struct VringEpollHandler<S: VhostUserBackend> {
|
||||
backend: Arc<RwLock<S>>,
|
||||
backend: S,
|
||||
vrings: Vec<Arc<RwLock<Vring>>>,
|
||||
exit_event_id: Option<u16>,
|
||||
thread_id: usize,
|
||||
|
|
@ -225,8 +225,6 @@ impl<S: VhostUserBackend> VringEpollHandler<S> {
|
|||
}
|
||||
|
||||
self.backend
|
||||
.read()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset, &self.vrings, self.thread_id)
|
||||
.map_err(VringEpollHandlerError::HandleEventBackendHandling)
|
||||
}
|
||||
|
|
@ -373,7 +371,7 @@ impl error::Error for VhostUserHandlerError {}
|
|||
type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
|
||||
|
||||
struct VhostUserHandler<S: VhostUserBackend> {
|
||||
backend: Arc<RwLock<S>>,
|
||||
backend: S,
|
||||
workers: Vec<Arc<VringWorker>>,
|
||||
owned: bool,
|
||||
features_acked: bool,
|
||||
|
|
@ -388,11 +386,11 @@ struct VhostUserHandler<S: VhostUserBackend> {
|
|||
worker_threads: Vec<thread::JoinHandle<VringWorkerResult<()>>>,
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend> VhostUserHandler<S> {
|
||||
fn new(backend: Arc<RwLock<S>>) -> VhostUserHandlerResult<Self> {
|
||||
let num_queues = backend.read().unwrap().num_queues();
|
||||
let max_queue_size = backend.read().unwrap().max_queue_size();
|
||||
let queues_per_thread = backend.read().unwrap().queues_per_thread();
|
||||
impl<S: VhostUserBackend + Clone> VhostUserHandler<S> {
|
||||
fn new(backend: S) -> VhostUserHandlerResult<Self> {
|
||||
let num_queues = backend.num_queues();
|
||||
let max_queue_size = backend.max_queue_size();
|
||||
let queues_per_thread = backend.queues_per_thread();
|
||||
|
||||
let atomic_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
|
||||
|
||||
|
|
@ -416,21 +414,19 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
|
|||
let vring_worker = Arc::new(VringWorker { epoll_file });
|
||||
let worker = vring_worker.clone();
|
||||
|
||||
let exit_event_id = if let Some((exit_event_fd, exit_event_id)) =
|
||||
backend.read().unwrap().exit_event(thread_id)
|
||||
{
|
||||
let exit_event_id = exit_event_id.unwrap_or(num_queues as u16);
|
||||
worker
|
||||
.register_listener(
|
||||
exit_event_fd.as_raw_fd(),
|
||||
epoll::Events::EPOLLIN,
|
||||
u64::from(exit_event_id),
|
||||
)
|
||||
.map_err(VhostUserHandlerError::RegisterExitEvent)?;
|
||||
Some(exit_event_id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let exit_event_id =
|
||||
if let Some((exit_event_fd, exit_event_id)) = backend.exit_event(thread_id) {
|
||||
worker
|
||||
.register_listener(
|
||||
exit_event_fd.as_raw_fd(),
|
||||
epoll::Events::EPOLLIN,
|
||||
u64::from(exit_event_id),
|
||||
)
|
||||
.map_err(VhostUserHandlerError::RegisterExitEvent)?;
|
||||
Some(exit_event_id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut thread_vrings: Vec<Arc<RwLock<Vring>>> = Vec::new();
|
||||
for (index, vring) in vrings.iter().enumerate() {
|
||||
|
|
@ -487,7 +483,7 @@ impl<S: VhostUserBackend> VhostUserHandler<S> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
||||
impl<S: VhostUserBackend + Clone> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
||||
fn set_owner(&mut self) -> VhostUserResult<()> {
|
||||
if self.owned {
|
||||
return Err(VhostUserError::InvalidOperation);
|
||||
|
|
@ -505,11 +501,11 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
}
|
||||
|
||||
fn get_features(&mut self) -> VhostUserResult<u64> {
|
||||
Ok(self.backend.read().unwrap().features())
|
||||
Ok(self.backend.features())
|
||||
}
|
||||
|
||||
fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
|
||||
if (features & !self.backend.read().unwrap().features()) != 0 {
|
||||
if (features & !self.backend.features()) != 0 {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
|
|
@ -529,16 +525,13 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
vring.write().unwrap().enabled = vring_enabled;
|
||||
}
|
||||
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.acked_features(self.acked_features);
|
||||
self.backend.acked_features(self.acked_features);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
|
||||
Ok(self.backend.read().unwrap().protocol_features())
|
||||
Ok(self.backend.protocol_features())
|
||||
}
|
||||
|
||||
fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> {
|
||||
|
|
@ -581,8 +574,6 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
self.atomic_mem.lock().unwrap().replace(mem);
|
||||
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_memory(self.atomic_mem.clone())
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
|
|
@ -657,7 +648,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
.unwrap()
|
||||
.mut_queue()
|
||||
.set_event_idx(event_idx);
|
||||
self.backend.write().unwrap().set_event_idx(event_idx);
|
||||
self.backend.set_event_idx(event_idx);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -783,7 +774,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
size: u32,
|
||||
_flags: VhostUserConfigFlags,
|
||||
) -> VhostUserResult<Vec<u8>> {
|
||||
Ok(self.backend.read().unwrap().get_config(offset, size))
|
||||
Ok(self.backend.get_config(offset, size))
|
||||
}
|
||||
|
||||
fn set_config(
|
||||
|
|
@ -793,8 +784,6 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
_flags: VhostUserConfigFlags,
|
||||
) -> VhostUserResult<()> {
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.set_config(offset, buf)
|
||||
.map_err(VhostUserError::ReqHandlerError)
|
||||
}
|
||||
|
|
@ -804,7 +793,7 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
vu_req.set_reply_ack_flag(true);
|
||||
}
|
||||
|
||||
self.backend.write().unwrap().set_slave_req_fd(vu_req);
|
||||
self.backend.set_slave_req_fd(vu_req);
|
||||
}
|
||||
|
||||
fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
|
||||
|
|
@ -838,8 +827,6 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
self.atomic_mem.lock().unwrap().replace(mem);
|
||||
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_memory(self.atomic_mem.clone())
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
|
|
@ -866,8 +853,6 @@ impl<S: VhostUserBackend> VhostUserSlaveReqHandlerMut for VhostUserHandler<S> {
|
|||
self.atomic_mem.lock().unwrap().replace(mem);
|
||||
|
||||
self.backend
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_memory(self.atomic_mem.clone())
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue