commit
77bc4174bc
7 changed files with 1672 additions and 891 deletions
13
Cargo.toml
13
Cargo.toml
|
|
@ -2,6 +2,8 @@
|
|||
name = "vhost-user-backend"
|
||||
version = "0.1.0"
|
||||
authors = ["The Cloud Hypervisor Authors"]
|
||||
keywords = ["vhost-user", "virtio"]
|
||||
description = "A framework to build vhost-user backend service daemon"
|
||||
edition = "2018"
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
|
@ -9,8 +11,11 @@ license = "Apache-2.0"
|
|||
epoll = ">=4.0.1"
|
||||
libc = ">=0.2.39"
|
||||
log = ">=0.4.6"
|
||||
vhost = { git = "https://github.com/rust-vmm/vhost", features = ["vhost-user-slave"] }
|
||||
virtio-bindings = "0.1.0"
|
||||
vm-memory = {version = ">=0.2.0", features = ["backend-mmap", "backend-atomic"]}
|
||||
vhost = { version = "0.1", features = ["vhost-user-slave"] }
|
||||
virtio-bindings = "0.1"
|
||||
virtio-queue = { git = "https://github.com/rust-vmm/vm-virtio" }
|
||||
vmm-sys-util = ">=0.3.1"
|
||||
vm-memory = {version = "0.6", features = ["backend-mmap", "backend-atomic"]}
|
||||
vmm-sys-util = "0.8"
|
||||
|
||||
[dev-dependencies]
|
||||
vm-memory = {version = "0.6", features = ["backend-mmap", "backend-atomic", "backend-bitmap"]}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"coverage_score": 0,
|
||||
"coverage_score": 77.4,
|
||||
"exclude_path": "",
|
||||
"crate_features": ""
|
||||
}
|
||||
|
|
|
|||
520
src/backend.rs
Normal file
520
src/backend.rs
Normal file
|
|
@ -0,0 +1,520 @@
|
|||
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
//! Traits for vhost user backend servers to implement virtio data plain services.
|
||||
//!
|
||||
//! Define two traits for vhost user backend servers to implement virtio data plane services.
|
||||
//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is
|
||||
//! designed with interior mutability, so the implementor may choose the suitable way to protect
|
||||
//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior
|
||||
//! mutability, and an implementation of:
|
||||
//! ```ignore
|
||||
//! impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> { }
|
||||
//! ```
|
||||
//! is provided for convenience.
|
||||
//!
|
||||
//! [VhostUserBackend]: trait.VhostUserBackend.html
|
||||
//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html
|
||||
|
||||
use std::io;
|
||||
use std::ops::Deref;
|
||||
use std::result;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use vhost::vhost_user::message::VhostUserProtocolFeatures;
|
||||
use vhost::vhost_user::SlaveFsCacheReq;
|
||||
use vm_memory::bitmap::Bitmap;
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
use super::{Vring, GM};
|
||||
|
||||
/// Trait with interior mutability for vhost user backend servers to implement concrete services.
|
||||
///
|
||||
/// To support multi-threading and asynchronous IO, we enforce `the Send + Sync + 'static`.
|
||||
/// So there's no plan for support of "Rc<T>" and "RefCell<T>".
|
||||
pub trait VhostUserBackend<B: Bitmap + 'static = ()>: Send + Sync + 'static {
|
||||
/// Get number of queues supported.
|
||||
fn num_queues(&self) -> usize;
|
||||
|
||||
/// Get maximum queue size supported.
|
||||
fn max_queue_size(&self) -> usize;
|
||||
|
||||
/// Get available virtio features.
|
||||
fn features(&self) -> u64;
|
||||
|
||||
/// Set acknowledged virtio features.
|
||||
fn acked_features(&self, _features: u64) {}
|
||||
|
||||
/// Get available vhost protocol features.
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures;
|
||||
|
||||
/// Enable or disable the virtio EVENT_IDX feature
|
||||
fn set_event_idx(&self, enabled: bool);
|
||||
|
||||
/// Get virtio device configuration.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Set virtio device configuration.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn set_config(&self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update guest memory regions.
|
||||
fn update_memory(&self, mem: GM<B>) -> result::Result<(), io::Error>;
|
||||
|
||||
/// Set handler for communicating with the master by the slave communication channel.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
///
|
||||
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
|
||||
fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {}
|
||||
|
||||
/// Get the map to map queue index to worker thread index.
|
||||
///
|
||||
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
|
||||
/// the following two queues will be handled by worker thread 1, and the last four queues will
|
||||
/// be handled by worker thread 2.
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
vec![0xffff_ffff]
|
||||
}
|
||||
|
||||
/// Provide an optional exit EventFd for the specified worker thread.
|
||||
///
|
||||
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
|
||||
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
|
||||
/// the worker thread will exit.
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Handle IO events for backend registered file descriptors.
|
||||
///
|
||||
/// This function gets called if the backend registered some additional listeners onto specific
|
||||
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
|
||||
/// do with events happening on custom listeners.
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Vring<GM<B>>],
|
||||
thread_id: usize,
|
||||
) -> result::Result<bool, io::Error>;
|
||||
}
|
||||
|
||||
/// Trait without interior mutability for vhost user backend servers to implement concrete services.
|
||||
pub trait VhostUserBackendMut<B: Bitmap + 'static = ()>: Send + Sync + 'static {
|
||||
/// Get number of queues supported.
|
||||
fn num_queues(&self) -> usize;
|
||||
|
||||
/// Get maximum queue size supported.
|
||||
fn max_queue_size(&self) -> usize;
|
||||
|
||||
/// Get available virtio features.
|
||||
fn features(&self) -> u64;
|
||||
|
||||
/// Set acknowledged virtio features.
|
||||
fn acked_features(&mut self, _features: u64) {}
|
||||
|
||||
/// Get available vhost protocol features.
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures;
|
||||
|
||||
/// Enable or disable the virtio EVENT_IDX feature
|
||||
fn set_event_idx(&mut self, enabled: bool);
|
||||
|
||||
/// Get virtio device configuration.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Set virtio device configuration.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> result::Result<(), io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update guest memory regions.
|
||||
fn update_memory(&mut self, mem: GM<B>) -> result::Result<(), io::Error>;
|
||||
|
||||
/// Set handler for communicating with the master by the slave communication channel.
|
||||
///
|
||||
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||
/// function.
|
||||
///
|
||||
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
|
||||
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||
|
||||
/// Get the map to map queue index to worker thread index.
|
||||
///
|
||||
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
|
||||
/// the following two queues will be handled by worker thread 1, and the last four queues will
|
||||
/// be handled by worker thread 2.
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
vec![0xffff_ffff]
|
||||
}
|
||||
|
||||
/// Provide an optional exit EventFd for the specified worker thread.
|
||||
///
|
||||
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
|
||||
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
|
||||
/// the worker thread will exit.
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Handle IO events for backend registered file descriptors.
|
||||
///
|
||||
/// This function gets called if the backend registered some additional listeners onto specific
|
||||
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
|
||||
/// do with events happening on custom listeners.
|
||||
fn handle_event(
|
||||
&mut self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Vring<GM<B>>],
|
||||
thread_id: usize,
|
||||
) -> result::Result<bool, io::Error>;
|
||||
}
|
||||
|
||||
impl<T: VhostUserBackend<B>, B: Bitmap + 'static> VhostUserBackend<B> for Arc<T> {
|
||||
fn num_queues(&self) -> usize {
|
||||
self.deref().num_queues()
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
self.deref().max_queue_size()
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
self.deref().features()
|
||||
}
|
||||
|
||||
fn acked_features(&self, features: u64) {
|
||||
self.deref().acked_features(features)
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
self.deref().protocol_features()
|
||||
}
|
||||
|
||||
fn set_event_idx(&self, enabled: bool) {
|
||||
self.deref().set_event_idx(enabled)
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
self.deref().get_config(offset, size)
|
||||
}
|
||||
|
||||
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
self.deref().set_config(offset, buf)
|
||||
}
|
||||
|
||||
fn update_memory(&self, mem: GM<B>) -> Result<(), io::Error> {
|
||||
self.deref().update_memory(mem)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||
self.deref().set_slave_req_fd(vu_req)
|
||||
}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
self.deref().queues_per_thread()
|
||||
}
|
||||
|
||||
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
self.deref().exit_event(thread_index)
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Vring<GM<B>>],
|
||||
thread_id: usize,
|
||||
) -> Result<bool, io::Error> {
|
||||
self.deref()
|
||||
.handle_event(device_event, evset, vrings, thread_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: VhostUserBackendMut<B>, B: Bitmap + 'static> VhostUserBackend<B> for Mutex<T> {
|
||||
fn num_queues(&self) -> usize {
|
||||
self.lock().unwrap().num_queues()
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
self.lock().unwrap().max_queue_size()
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
self.lock().unwrap().features()
|
||||
}
|
||||
|
||||
fn acked_features(&self, features: u64) {
|
||||
self.lock().unwrap().acked_features(features)
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
self.lock().unwrap().protocol_features()
|
||||
}
|
||||
|
||||
fn set_event_idx(&self, enabled: bool) {
|
||||
self.lock().unwrap().set_event_idx(enabled)
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
self.lock().unwrap().get_config(offset, size)
|
||||
}
|
||||
|
||||
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
self.lock().unwrap().set_config(offset, buf)
|
||||
}
|
||||
|
||||
fn update_memory(&self, mem: GM<B>) -> Result<(), io::Error> {
|
||||
self.lock().unwrap().update_memory(mem)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||
self.lock().unwrap().set_slave_req_fd(vu_req)
|
||||
}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
self.lock().unwrap().queues_per_thread()
|
||||
}
|
||||
|
||||
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
self.lock().unwrap().exit_event(thread_index)
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Vring<GM<B>>],
|
||||
thread_id: usize,
|
||||
) -> Result<bool, io::Error> {
|
||||
self.lock()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset, vrings, thread_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: VhostUserBackendMut<B>, B: Bitmap + 'static> VhostUserBackend<B> for RwLock<T> {
|
||||
fn num_queues(&self) -> usize {
|
||||
self.read().unwrap().num_queues()
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
self.read().unwrap().max_queue_size()
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
self.read().unwrap().features()
|
||||
}
|
||||
|
||||
fn acked_features(&self, features: u64) {
|
||||
self.write().unwrap().acked_features(features)
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
self.read().unwrap().protocol_features()
|
||||
}
|
||||
|
||||
fn set_event_idx(&self, enabled: bool) {
|
||||
self.write().unwrap().set_event_idx(enabled)
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
self.read().unwrap().get_config(offset, size)
|
||||
}
|
||||
|
||||
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
self.write().unwrap().set_config(offset, buf)
|
||||
}
|
||||
|
||||
fn update_memory(&self, mem: GM<B>) -> Result<(), io::Error> {
|
||||
self.write().unwrap().update_memory(mem)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||
self.write().unwrap().set_slave_req_fd(vu_req)
|
||||
}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
self.read().unwrap().queues_per_thread()
|
||||
}
|
||||
|
||||
fn exit_event(&self, thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
self.read().unwrap().exit_event(thread_index)
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&self,
|
||||
device_event: u16,
|
||||
evset: epoll::Events,
|
||||
vrings: &[Vring<GM<B>>],
|
||||
thread_id: usize,
|
||||
) -> Result<bool, io::Error> {
|
||||
self.write()
|
||||
.unwrap()
|
||||
.handle_event(device_event, evset, vrings, thread_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
use epoll::Events;
|
||||
use std::io::Error;
|
||||
use std::sync::Mutex;
|
||||
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
|
||||
|
||||
pub struct MockVhostBackend {
|
||||
events: u64,
|
||||
event_idx: bool,
|
||||
acked_features: u64,
|
||||
}
|
||||
|
||||
impl MockVhostBackend {
|
||||
pub fn new() -> Self {
|
||||
MockVhostBackend {
|
||||
events: 0,
|
||||
event_idx: false,
|
||||
acked_features: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl VhostUserBackendMut<()> for MockVhostBackend {
|
||||
fn num_queues(&self) -> usize {
|
||||
2
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
256
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
0xffff_ffff_ffff_ffff
|
||||
}
|
||||
|
||||
fn acked_features(&mut self, features: u64) {
|
||||
self.acked_features = features;
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
VhostUserProtocolFeatures::all()
|
||||
}
|
||||
|
||||
fn set_event_idx(&mut self, enabled: bool) {
|
||||
self.event_idx = enabled;
|
||||
}
|
||||
|
||||
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||
assert_eq!(offset, 0x200);
|
||||
assert_eq!(size, 8);
|
||||
|
||||
vec![0xa5u8; 8]
|
||||
}
|
||||
|
||||
fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<(), Error> {
|
||||
assert_eq!(offset, 0x200);
|
||||
assert_eq!(buf.len(), 8);
|
||||
assert_eq!(buf, &[0xa5u8; 8]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_memory(
|
||||
&mut self,
|
||||
_atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||
|
||||
fn queues_per_thread(&self) -> Vec<u64> {
|
||||
vec![1, 1]
|
||||
}
|
||||
|
||||
fn exit_event(&self, _thread_index: usize) -> Option<(EventFd, u16)> {
|
||||
let event_fd = EventFd::new(0).unwrap();
|
||||
|
||||
Some((event_fd, 0x100))
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&mut self,
|
||||
_device_event: u16,
|
||||
_evset: Events,
|
||||
_vrings: &[Vring],
|
||||
_thread_id: usize,
|
||||
) -> Result<bool, Error> {
|
||||
self.events += 1;
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_mock_backend_mutex() {
|
||||
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||
|
||||
assert_eq!(backend.num_queues(), 2);
|
||||
assert_eq!(backend.max_queue_size(), 256);
|
||||
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
|
||||
assert_eq!(
|
||||
backend.protocol_features(),
|
||||
VhostUserProtocolFeatures::all()
|
||||
);
|
||||
assert_eq!(backend.queues_per_thread(), [1, 1]);
|
||||
|
||||
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
|
||||
backend.set_config(0x200, &[0xa5; 8]).unwrap();
|
||||
|
||||
backend.acked_features(0xffff);
|
||||
assert_eq!(backend.lock().unwrap().acked_features, 0xffff);
|
||||
|
||||
backend.set_event_idx(true);
|
||||
assert_eq!(backend.lock().unwrap().event_idx, true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_mock_backend_rwlock() {
|
||||
let backend = Arc::new(RwLock::new(MockVhostBackend::new()));
|
||||
|
||||
assert_eq!(backend.num_queues(), 2);
|
||||
assert_eq!(backend.max_queue_size(), 256);
|
||||
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
|
||||
assert_eq!(
|
||||
backend.protocol_features(),
|
||||
VhostUserProtocolFeatures::all()
|
||||
);
|
||||
assert_eq!(backend.queues_per_thread(), [1, 1]);
|
||||
|
||||
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
|
||||
backend.set_config(0x200, &[0xa5; 8]).unwrap();
|
||||
|
||||
backend.acked_features(0xffff);
|
||||
assert_eq!(backend.read().unwrap().acked_features, 0xffff);
|
||||
|
||||
backend.set_event_idx(true);
|
||||
assert_eq!(backend.read().unwrap().event_idx, true);
|
||||
}
|
||||
}
|
||||
260
src/event_loop.rs
Normal file
260
src/event_loop.rs
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
||||
use std::result;
|
||||
|
||||
use vm_memory::bitmap::Bitmap;
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
use super::{VhostUserBackend, Vring, GM};
|
||||
|
||||
/// Errors related to vring epoll event handling.
|
||||
#[derive(Debug)]
|
||||
pub enum VringEpollError {
|
||||
/// Failed to create epoll file descriptor.
|
||||
EpollCreateFd(io::Error),
|
||||
/// Failed while waiting for events.
|
||||
EpollWait(io::Error),
|
||||
/// Could not register exit event
|
||||
RegisterExitEvent(io::Error),
|
||||
/// Failed to read the event from kick EventFd.
|
||||
HandleEventReadKick(io::Error),
|
||||
/// Failed to handle the event from the backend.
|
||||
HandleEventBackendHandling(io::Error),
|
||||
}
|
||||
|
||||
impl Display for VringEpollError {
|
||||
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
VringEpollError::EpollCreateFd(e) => write!(f, "cannot create epoll fd: {}", e),
|
||||
VringEpollError::EpollWait(e) => write!(f, "failed to wait for epoll event: {}", e),
|
||||
VringEpollError::RegisterExitEvent(e) => write!(f, "cannot register exit event: {}", e),
|
||||
VringEpollError::HandleEventReadKick(e) => {
|
||||
write!(f, "cannot read vring kick event: {}", e)
|
||||
}
|
||||
VringEpollError::HandleEventBackendHandling(e) => {
|
||||
write!(f, "failed to handle epoll event: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for VringEpollError {}
|
||||
|
||||
/// Result of vring epoll operations.
|
||||
pub type VringEpollResult<T> = std::result::Result<T, VringEpollError>;
|
||||
|
||||
/// Epoll event handler to manage and process epoll events for registered file descriptor.
|
||||
///
|
||||
/// The `VringEpollHandler` structure provides interfaces to:
|
||||
/// - add file descriptors to be monitored by the epoll fd
|
||||
/// - remove registered file descriptors from the epoll fd
|
||||
/// - run the event loop to handle pending events on the epoll fd
|
||||
pub struct VringEpollHandler<S: VhostUserBackend<B>, B: Bitmap + 'static> {
|
||||
epoll_file: File,
|
||||
backend: S,
|
||||
vrings: Vec<Vring<GM<B>>>,
|
||||
thread_id: usize,
|
||||
exit_event_fd: Option<EventFd>,
|
||||
exit_event_id: Option<u16>,
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend<B>, B: Bitmap + 'static> VringEpollHandler<S, B> {
|
||||
/// Create a `VringEpollHandler` instance.
|
||||
pub(crate) fn new(
|
||||
backend: S,
|
||||
vrings: Vec<Vring<GM<B>>>,
|
||||
thread_id: usize,
|
||||
) -> VringEpollResult<Self> {
|
||||
let epoll_fd = epoll::create(true).map_err(VringEpollError::EpollCreateFd)?;
|
||||
let epoll_file = unsafe { File::from_raw_fd(epoll_fd) };
|
||||
|
||||
let handler = match backend.exit_event(thread_id) {
|
||||
Some((exit_event_fd, exit_event_id)) => {
|
||||
epoll::ctl(
|
||||
epoll_file.as_raw_fd(),
|
||||
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||
exit_event_fd.as_raw_fd(),
|
||||
epoll::Event::new(epoll::Events::EPOLLIN, u64::from(exit_event_id)),
|
||||
)
|
||||
.map_err(VringEpollError::RegisterExitEvent)?;
|
||||
|
||||
VringEpollHandler {
|
||||
epoll_file,
|
||||
backend,
|
||||
vrings,
|
||||
thread_id,
|
||||
exit_event_fd: Some(exit_event_fd),
|
||||
exit_event_id: Some(exit_event_id),
|
||||
}
|
||||
}
|
||||
None => VringEpollHandler {
|
||||
epoll_file,
|
||||
backend,
|
||||
vrings,
|
||||
thread_id,
|
||||
exit_event_fd: None,
|
||||
exit_event_id: None,
|
||||
},
|
||||
};
|
||||
|
||||
Ok(handler)
|
||||
}
|
||||
|
||||
/// Send `exit event` to break the event loop.
|
||||
pub fn send_exit_event(&self) {
|
||||
if let Some(eventfd) = self.exit_event_fd.as_ref() {
|
||||
let _ = eventfd.write(1);
|
||||
}
|
||||
}
|
||||
|
||||
/// Register an event into the epoll fd.
|
||||
///
|
||||
/// When this event is later triggered, the backend implementation of `handle_event` will be
|
||||
/// called.
|
||||
pub fn register_listener(
|
||||
&self,
|
||||
fd: RawFd,
|
||||
ev_type: epoll::Events,
|
||||
data: u64,
|
||||
) -> result::Result<(), io::Error> {
|
||||
epoll::ctl(
|
||||
self.epoll_file.as_raw_fd(),
|
||||
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||
fd,
|
||||
epoll::Event::new(ev_type, data),
|
||||
)
|
||||
}
|
||||
|
||||
/// Unregister an event from the epoll fd.
|
||||
///
|
||||
/// If the event is triggered after this function has been called, the event will be silently
|
||||
/// dropped.
|
||||
pub fn unregister_listener(
|
||||
&self,
|
||||
fd: RawFd,
|
||||
ev_type: epoll::Events,
|
||||
data: u64,
|
||||
) -> result::Result<(), io::Error> {
|
||||
epoll::ctl(
|
||||
self.epoll_file.as_raw_fd(),
|
||||
epoll::ControlOptions::EPOLL_CTL_DEL,
|
||||
fd,
|
||||
epoll::Event::new(ev_type, data),
|
||||
)
|
||||
}
|
||||
|
||||
/// Run the event poll loop to handle all pending events on registered fds.
|
||||
///
|
||||
/// The event loop will be terminated once an event is received from the `exit event fd`
|
||||
/// associated with the backend.
|
||||
pub(crate) fn run(&self) -> VringEpollResult<()> {
|
||||
const EPOLL_EVENTS_LEN: usize = 100;
|
||||
let mut events = vec![epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN];
|
||||
|
||||
'epoll: loop {
|
||||
let num_events = match epoll::wait(self.epoll_file.as_raw_fd(), -1, &mut events[..]) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::Interrupted {
|
||||
// It's well defined from the epoll_wait() syscall
|
||||
// documentation that the epoll loop can be interrupted
|
||||
// before any of the requested events occurred or the
|
||||
// timeout expired. In both those cases, epoll_wait()
|
||||
// returns an error of type EINTR, but this should not
|
||||
// be considered as a regular error. Instead it is more
|
||||
// appropriate to retry, by calling into epoll_wait().
|
||||
continue;
|
||||
}
|
||||
return Err(VringEpollError::EpollWait(e));
|
||||
}
|
||||
};
|
||||
|
||||
for event in events.iter().take(num_events) {
|
||||
let evset = match epoll::Events::from_bits(event.events) {
|
||||
Some(evset) => evset,
|
||||
None => {
|
||||
let evbits = event.events;
|
||||
println!("epoll: ignoring unknown event set: 0x{:x}", evbits);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let ev_type = event.data as u16;
|
||||
|
||||
// handle_event() returns true if an event is received from the exit event fd.
|
||||
if self.handle_event(ev_type, evset)? {
|
||||
break 'epoll;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_event(&self, device_event: u16, evset: epoll::Events) -> VringEpollResult<bool> {
|
||||
if self.exit_event_id == Some(device_event) {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if (device_event as usize) < self.vrings.len() {
|
||||
let vring = &self.vrings[device_event as usize];
|
||||
let enabled = vring
|
||||
.read_kick()
|
||||
.map_err(VringEpollError::HandleEventReadKick)?;
|
||||
|
||||
// If the vring is not enabled, it should not be processed.
|
||||
if !enabled {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
self.backend
|
||||
.handle_event(device_event, evset, &self.vrings, self.thread_id)
|
||||
.map_err(VringEpollError::HandleEventBackendHandling)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::backend::tests::MockVhostBackend;
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
#[test]
|
||||
fn test_vring_epoll_handler() {
|
||||
let mem = GuestMemoryAtomic::new(
|
||||
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||
);
|
||||
let vring = Vring::new(mem, 0x1000);
|
||||
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||
|
||||
let handler = VringEpollHandler::new(backend, vec![vring], 0x1).unwrap();
|
||||
assert!(handler.exit_event_id.is_some());
|
||||
|
||||
let eventfd = EventFd::new(0).unwrap();
|
||||
handler
|
||||
.register_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1)
|
||||
.unwrap();
|
||||
// Register an already registered fd.
|
||||
handler
|
||||
.register_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1)
|
||||
.unwrap_err();
|
||||
|
||||
handler
|
||||
.unregister_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1)
|
||||
.unwrap();
|
||||
// unregister an already unregistered fd.
|
||||
handler
|
||||
.unregister_listener(eventfd.as_raw_fd(), epoll::Events::EPOLLIN, 1)
|
||||
.unwrap_err();
|
||||
}
|
||||
}
|
||||
544
src/handler.rs
Normal file
544
src/handler.rs
Normal file
|
|
@ -0,0 +1,544 @@
|
|||
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::error;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use vhost::vhost_user::message::{
|
||||
VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures,
|
||||
VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags,
|
||||
VhostUserVringState,
|
||||
};
|
||||
use vhost::vhost_user::{Error as VhostUserError, Result as VhostUserResult, SlaveFsCacheReq};
|
||||
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
|
||||
use vm_memory::bitmap::Bitmap;
|
||||
use vm_memory::mmap::NewBitmap;
|
||||
use vm_memory::{FileOffset, GuestAddress, GuestMemoryMmap, GuestRegionMmap};
|
||||
|
||||
use super::event_loop::{VringEpollError, VringEpollResult};
|
||||
use super::*;
|
||||
|
||||
const MAX_MEM_SLOTS: u64 = 32;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Errors related to vhost-user handler.
|
||||
pub enum VhostUserHandlerError {
|
||||
/// Failed to create vring worker.
|
||||
CreateEpollHandler(VringEpollError),
|
||||
/// Failed to spawn vring worker.
|
||||
SpawnVringWorker(io::Error),
|
||||
/// Could not find the mapping from memory regions.
|
||||
MissingMemoryMapping,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for VhostUserHandlerError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
VhostUserHandlerError::CreateEpollHandler(e) => {
|
||||
write!(f, "failed to create vring epoll handler: {}", e)
|
||||
}
|
||||
VhostUserHandlerError::SpawnVringWorker(e) => {
|
||||
write!(f, "failed spawning the vring worker: {}", e)
|
||||
}
|
||||
VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for VhostUserHandlerError {}
|
||||
|
||||
/// Result of vhost-user handler operations.
|
||||
pub type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
|
||||
|
||||
struct AddrMapping {
|
||||
vmm_addr: u64,
|
||||
size: u64,
|
||||
gpa_base: u64,
|
||||
}
|
||||
|
||||
pub struct VhostUserHandler<S: VhostUserBackend<B>, B: Bitmap + 'static> {
|
||||
backend: S,
|
||||
handlers: Vec<Arc<VringEpollHandler<S, B>>>,
|
||||
owned: bool,
|
||||
features_acked: bool,
|
||||
acked_features: u64,
|
||||
acked_protocol_features: u64,
|
||||
num_queues: usize,
|
||||
max_queue_size: usize,
|
||||
queues_per_thread: Vec<u64>,
|
||||
mappings: Vec<AddrMapping>,
|
||||
atomic_mem: GM<B>,
|
||||
vrings: Vec<Vring<GM<B>>>,
|
||||
worker_threads: Vec<thread::JoinHandle<VringEpollResult<()>>>,
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend<B> + Clone, B: Bitmap + Clone + Send + Sync> VhostUserHandler<S, B> {
|
||||
pub(crate) fn new(backend: S, atomic_mem: GM<B>) -> VhostUserHandlerResult<Self> {
|
||||
let num_queues = backend.num_queues();
|
||||
let max_queue_size = backend.max_queue_size();
|
||||
let queues_per_thread = backend.queues_per_thread();
|
||||
|
||||
let mut vrings = Vec::new();
|
||||
for _ in 0..num_queues {
|
||||
let vring = Vring::new(atomic_mem.clone(), max_queue_size as u16);
|
||||
vrings.push(vring);
|
||||
}
|
||||
|
||||
let mut handlers = Vec::new();
|
||||
let mut worker_threads = Vec::new();
|
||||
for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
|
||||
let mut thread_vrings = Vec::new();
|
||||
for (index, vring) in vrings.iter().enumerate() {
|
||||
if (queues_mask >> index) & 1u64 == 1u64 {
|
||||
thread_vrings.push(vring.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let handler = Arc::new(
|
||||
VringEpollHandler::new(backend.clone(), thread_vrings, thread_id)
|
||||
.map_err(VhostUserHandlerError::CreateEpollHandler)?,
|
||||
);
|
||||
let handler2 = handler.clone();
|
||||
let worker_thread = thread::Builder::new()
|
||||
.name("vring_worker".to_string())
|
||||
.spawn(move || handler2.run())
|
||||
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
|
||||
|
||||
handlers.push(handler);
|
||||
worker_threads.push(worker_thread);
|
||||
}
|
||||
|
||||
Ok(VhostUserHandler {
|
||||
backend,
|
||||
handlers,
|
||||
owned: false,
|
||||
features_acked: false,
|
||||
acked_features: 0,
|
||||
acked_protocol_features: 0,
|
||||
num_queues,
|
||||
max_queue_size,
|
||||
queues_per_thread,
|
||||
mappings: Vec::new(),
|
||||
atomic_mem,
|
||||
vrings,
|
||||
worker_threads,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend<B>, B: Bitmap> VhostUserHandler<S, B> {
|
||||
pub(crate) fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, B>>> {
|
||||
self.handlers.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn send_exit_event(&self) {
|
||||
for handler in self.handlers.iter() {
|
||||
handler.send_exit_event();
|
||||
}
|
||||
}
|
||||
|
||||
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
|
||||
for mapping in self.mappings.iter() {
|
||||
if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size {
|
||||
return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base);
|
||||
}
|
||||
}
|
||||
|
||||
Err(VhostUserHandlerError::MissingMemoryMapping)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend<B>, B: NewBitmap + Clone> VhostUserSlaveReqHandlerMut
|
||||
for VhostUserHandler<S, B>
|
||||
{
|
||||
fn set_owner(&mut self) -> VhostUserResult<()> {
|
||||
if self.owned {
|
||||
return Err(VhostUserError::InvalidOperation);
|
||||
}
|
||||
self.owned = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_owner(&mut self) -> VhostUserResult<()> {
|
||||
self.owned = false;
|
||||
self.features_acked = false;
|
||||
self.acked_features = 0;
|
||||
self.acked_protocol_features = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_features(&mut self) -> VhostUserResult<u64> {
|
||||
Ok(self.backend.features())
|
||||
}
|
||||
|
||||
fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
|
||||
if (features & !self.backend.features()) != 0 {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
self.acked_features = features;
|
||||
self.features_acked = true;
|
||||
|
||||
// If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated,
|
||||
// the ring is initialized in an enabled state.
|
||||
// If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated,
|
||||
// the ring is initialized in a disabled state. Client must not
|
||||
// pass data to/from the backend until ring is enabled by
|
||||
// VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has
|
||||
// been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0.
|
||||
let vring_enabled =
|
||||
self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0;
|
||||
for vring in self.vrings.iter() {
|
||||
vring.set_enabled(vring_enabled);
|
||||
}
|
||||
|
||||
self.backend.acked_features(self.acked_features);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
|
||||
Ok(self.backend.protocol_features())
|
||||
}
|
||||
|
||||
fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> {
|
||||
// Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must
|
||||
// support this message even before VHOST_USER_SET_FEATURES was
|
||||
// called.
|
||||
self.acked_protocol_features = features;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_mem_table(
|
||||
&mut self,
|
||||
ctx: &[VhostUserMemoryRegion],
|
||||
files: Vec<File>,
|
||||
) -> VhostUserResult<()> {
|
||||
// We need to create tuple of ranges from the list of VhostUserMemoryRegion
|
||||
// that we get from the caller.
|
||||
let mut regions: Vec<(GuestAddress, usize, Option<FileOffset>)> = Vec::new();
|
||||
let mut mappings: Vec<AddrMapping> = Vec::new();
|
||||
|
||||
for (region, file) in ctx.iter().zip(files) {
|
||||
let g_addr = GuestAddress(region.guest_phys_addr);
|
||||
let len = region.memory_size as usize;
|
||||
let f_off = FileOffset::new(file, region.mmap_offset);
|
||||
|
||||
regions.push((g_addr, len, Some(f_off)));
|
||||
mappings.push(AddrMapping {
|
||||
vmm_addr: region.user_addr,
|
||||
size: region.memory_size,
|
||||
gpa_base: region.guest_phys_addr,
|
||||
});
|
||||
}
|
||||
|
||||
let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
|
||||
// Updating the inner GuestMemory object here will cause all our vrings to
|
||||
// see the new one the next time they call to `atomic_mem.memory()`.
|
||||
self.atomic_mem.lock().unwrap().replace(mem);
|
||||
|
||||
self.backend
|
||||
.update_memory(self.atomic_mem.clone())
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
self.mappings = mappings;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_queue_num(&mut self) -> VhostUserResult<u64> {
|
||||
Ok(self.num_queues as u64)
|
||||
}
|
||||
|
||||
fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> {
|
||||
if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
self.vrings[index as usize].set_queue_size(num as u16);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_vring_addr(
|
||||
&mut self,
|
||||
index: u32,
|
||||
_flags: VhostUserVringAddrFlags,
|
||||
descriptor: u64,
|
||||
used: u64,
|
||||
available: u64,
|
||||
_log: u64,
|
||||
) -> VhostUserResult<()> {
|
||||
if index as usize >= self.num_queues {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
if !self.mappings.is_empty() {
|
||||
let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
let used_ring = self.vmm_va_to_gpa(used).map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
self.vrings[index as usize].set_queue_info(desc_table, avail_ring, used_ring);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(VhostUserError::InvalidParam)
|
||||
}
|
||||
}
|
||||
|
||||
fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
|
||||
let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0;
|
||||
|
||||
self.vrings[index as usize].set_queue_next_avail(base as u16);
|
||||
self.vrings[index as usize].set_queue_event_idx(event_idx);
|
||||
self.backend.set_event_idx(event_idx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_vring_base(&mut self, index: u32) -> VhostUserResult<VhostUserVringState> {
|
||||
if index as usize >= self.num_queues {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
// Quote from vhost-user specification:
|
||||
// Client must start ring upon receiving a kick (that is, detecting
|
||||
// that file descriptor is readable) on the descriptor specified by
|
||||
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
|
||||
// VHOST_USER_GET_VRING_BASE.
|
||||
self.vrings[index as usize].set_queue_ready(false);
|
||||
if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() {
|
||||
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
|
||||
let shifted_queues_mask = queues_mask >> index;
|
||||
if shifted_queues_mask & 1u64 == 1u64 {
|
||||
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
|
||||
self.handlers[thread_index]
|
||||
.unregister_listener(
|
||||
fd.as_raw_fd(),
|
||||
epoll::Events::EPOLLIN,
|
||||
u64::from(evt_idx),
|
||||
)
|
||||
.map_err(VhostUserError::ReqHandlerError)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let next_avail = self.vrings[index as usize].queue_next_avail();
|
||||
|
||||
Ok(VhostUserVringState::new(index, u32::from(next_avail)))
|
||||
}
|
||||
|
||||
fn set_vring_kick(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
|
||||
if index as usize >= self.num_queues {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
// SAFETY: EventFd requires that it has sole ownership of its fd. So
|
||||
// does File, so this is safe.
|
||||
// Ideally, we'd have a generic way to refer to a uniquely-owned fd,
|
||||
// such as that proposed by Rust RFC #3128.
|
||||
self.vrings[index as usize].set_kick(file);
|
||||
|
||||
// Quote from vhost-user specification:
|
||||
// Client must start ring upon receiving a kick (that is, detecting
|
||||
// that file descriptor is readable) on the descriptor specified by
|
||||
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
|
||||
// VHOST_USER_GET_VRING_BASE.
|
||||
self.vrings[index as usize].set_queue_ready(true);
|
||||
if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() {
|
||||
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
|
||||
let shifted_queues_mask = queues_mask >> index;
|
||||
if shifted_queues_mask & 1u64 == 1u64 {
|
||||
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
|
||||
self.handlers[thread_index]
|
||||
.register_listener(
|
||||
fd.as_raw_fd(),
|
||||
epoll::Events::EPOLLIN,
|
||||
u64::from(evt_idx),
|
||||
)
|
||||
.map_err(VhostUserError::ReqHandlerError)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_vring_call(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
|
||||
if index as usize >= self.num_queues {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
self.vrings[index as usize].set_call(file);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_vring_err(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
|
||||
if index as usize >= self.num_queues {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
self.vrings[index as usize].set_err(file);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> {
|
||||
// This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES
|
||||
// has been negotiated.
|
||||
if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 {
|
||||
return Err(VhostUserError::InvalidOperation);
|
||||
} else if index as usize >= self.num_queues {
|
||||
return Err(VhostUserError::InvalidParam);
|
||||
}
|
||||
|
||||
// Slave must not pass data to/from the backend until ring is
|
||||
// enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1,
|
||||
// or after it has been disabled by VHOST_USER_SET_VRING_ENABLE
|
||||
// with parameter 0.
|
||||
self.vrings[index as usize].set_enabled(enable);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_config(
|
||||
&mut self,
|
||||
offset: u32,
|
||||
size: u32,
|
||||
_flags: VhostUserConfigFlags,
|
||||
) -> VhostUserResult<Vec<u8>> {
|
||||
Ok(self.backend.get_config(offset, size))
|
||||
}
|
||||
|
||||
fn set_config(
|
||||
&mut self,
|
||||
offset: u32,
|
||||
buf: &[u8],
|
||||
_flags: VhostUserConfigFlags,
|
||||
) -> VhostUserResult<()> {
|
||||
self.backend
|
||||
.set_config(offset, buf)
|
||||
.map_err(VhostUserError::ReqHandlerError)
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) {
|
||||
if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 {
|
||||
vu_req.set_reply_ack_flag(true);
|
||||
}
|
||||
|
||||
self.backend.set_slave_req_fd(vu_req);
|
||||
}
|
||||
|
||||
fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
|
||||
Ok(MAX_MEM_SLOTS)
|
||||
}
|
||||
|
||||
fn add_mem_region(
|
||||
&mut self,
|
||||
region: &VhostUserSingleMemoryRegion,
|
||||
file: File,
|
||||
) -> VhostUserResult<()> {
|
||||
let mmap_region = MmapRegion::from_file(
|
||||
FileOffset::new(file, region.mmap_offset),
|
||||
region.memory_size as usize,
|
||||
)
|
||||
.map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?;
|
||||
let guest_region = Arc::new(
|
||||
GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err(
|
||||
|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)),
|
||||
)?,
|
||||
);
|
||||
|
||||
let mem = self
|
||||
.atomic_mem
|
||||
.memory()
|
||||
.insert_region(guest_region)
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
|
||||
self.atomic_mem.lock().unwrap().replace(mem);
|
||||
|
||||
self.backend
|
||||
.update_memory(self.atomic_mem.clone())
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
|
||||
self.mappings.push(AddrMapping {
|
||||
vmm_addr: region.user_addr,
|
||||
size: region.memory_size,
|
||||
gpa_base: region.guest_phys_addr,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> {
|
||||
let (mem, _) = self
|
||||
.atomic_mem
|
||||
.memory()
|
||||
.remove_region(GuestAddress(region.guest_phys_addr), region.memory_size)
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
|
||||
self.atomic_mem.lock().unwrap().replace(mem);
|
||||
|
||||
self.backend
|
||||
.update_memory(self.atomic_mem.clone())
|
||||
.map_err(|e| {
|
||||
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||
})?;
|
||||
|
||||
self.mappings
|
||||
.retain(|mapping| mapping.gpa_base != region.guest_phys_addr);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_inflight_fd(
|
||||
&mut self,
|
||||
_inflight: &vhost::vhost_user::message::VhostUserInflight,
|
||||
) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> {
|
||||
// Assume the backend hasn't negotiated the inflight feature; it
|
||||
// wouldn't be correct for the backend to do so, as we don't (yet)
|
||||
// provide a way for it to handle such requests.
|
||||
Err(VhostUserError::InvalidOperation)
|
||||
}
|
||||
|
||||
fn set_inflight_fd(
|
||||
&mut self,
|
||||
_inflight: &vhost::vhost_user::message::VhostUserInflight,
|
||||
_file: File,
|
||||
) -> VhostUserResult<()> {
|
||||
Err(VhostUserError::InvalidOperation)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: VhostUserBackend<B>, B: Bitmap> Drop for VhostUserHandler<S, B> {
|
||||
fn drop(&mut self) {
|
||||
// Signal all working threads to exit.
|
||||
self.send_exit_event();
|
||||
|
||||
for thread in self.worker_threads.drain(..) {
|
||||
if let Err(e) = thread.join() {
|
||||
error!("Error in vring worker: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
971
src/lib.rs
971
src/lib.rs
File diff suppressed because it is too large
Load diff
253
src/vring.rs
Normal file
253
src/vring.rs
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||
// Copyright 2021 Alibaba Cloud Computing. All rights reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
//! Struct to maintain state information and manipulate vhost-user queues.
|
||||
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::os::unix::io::{FromRawFd, IntoRawFd};
|
||||
use std::result::Result;
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use virtio_queue::{Error as VirtQueError, Queue};
|
||||
use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
/// Struct to maintain raw state information for a vhost-user queue.
|
||||
pub struct VringState<M: GuestAddressSpace> {
|
||||
queue: Queue<M>,
|
||||
kick: Option<EventFd>,
|
||||
call: Option<EventFd>,
|
||||
err: Option<EventFd>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
impl<M: GuestAddressSpace> VringState<M> {
|
||||
fn new(mem: M, max_queue_size: u16) -> Self {
|
||||
VringState {
|
||||
queue: Queue::new(mem, max_queue_size),
|
||||
kick: None,
|
||||
call: None,
|
||||
err: None,
|
||||
enabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the underlying raw `Queue` object.
|
||||
pub fn get_queue_mut(&mut self) -> &mut Queue<M> {
|
||||
&mut self.queue
|
||||
}
|
||||
|
||||
/// Get a immutable reference to the kick event fd.
|
||||
pub fn get_kick(&self) -> &Option<EventFd> {
|
||||
&self.kick
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct to maintain state information and manipulate a vhost-user queue.
|
||||
#[derive(Clone)]
|
||||
pub struct Vring<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
|
||||
state: Arc<RwLock<VringState<M>>>,
|
||||
}
|
||||
|
||||
impl<M: GuestAddressSpace> Vring<M> {
|
||||
/// Get a immutable guard to the underlying raw `VringState` object.
|
||||
pub fn get_ref(&self) -> RwLockReadGuard<VringState<M>> {
|
||||
self.state.read().unwrap()
|
||||
}
|
||||
|
||||
/// Get a mutable guard to the underlying raw `VringState` object.
|
||||
pub fn get_mut(&self) -> RwLockWriteGuard<VringState<M>> {
|
||||
self.state.write().unwrap()
|
||||
}
|
||||
|
||||
/// Add an used descriptor into the used queue.
|
||||
pub fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
|
||||
self.get_mut().get_queue_mut().add_used(desc_index, len)
|
||||
}
|
||||
|
||||
/// Notify the vhost-user master that used descriptors have been put into the used queue.
|
||||
pub fn signal_used_queue(&self) -> io::Result<()> {
|
||||
if let Some(call) = self.get_ref().call.as_ref() {
|
||||
call.write(1)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Enable event notification for queue.
|
||||
pub fn enable_notification(&self) -> Result<bool, VirtQueError> {
|
||||
self.get_mut().get_queue_mut().enable_notification()
|
||||
}
|
||||
|
||||
/// Disable event notification for queue.
|
||||
pub fn disable_notification(&self) -> Result<(), VirtQueError> {
|
||||
self.get_mut().get_queue_mut().disable_notification()
|
||||
}
|
||||
|
||||
/// Check whether a notification to the guest is needed.
|
||||
pub fn needs_notification(&self) -> Result<bool, VirtQueError> {
|
||||
self.get_mut().get_queue_mut().needs_notification()
|
||||
}
|
||||
|
||||
pub(crate) fn new(mem: M, max_queue_size: u16) -> Self {
|
||||
Vring {
|
||||
state: Arc::new(RwLock::new(VringState::new(mem, max_queue_size))),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_enabled(&self, enabled: bool) {
|
||||
self.get_mut().enabled = enabled;
|
||||
}
|
||||
|
||||
pub(crate) fn set_queue_info(&self, desc_table: u64, avail_ring: u64, used_ring: u64) {
|
||||
let mut state = self.get_mut();
|
||||
|
||||
state.queue.desc_table = GuestAddress(desc_table);
|
||||
state.queue.avail_ring = GuestAddress(avail_ring);
|
||||
state.queue.used_ring = GuestAddress(used_ring);
|
||||
}
|
||||
|
||||
pub(crate) fn queue_next_avail(&self) -> u16 {
|
||||
self.get_ref().queue.next_avail()
|
||||
}
|
||||
|
||||
pub(crate) fn set_queue_next_avail(&self, base: u16) {
|
||||
self.get_mut().queue.set_next_avail(base);
|
||||
}
|
||||
|
||||
pub(crate) fn set_queue_size(&self, num: u16) {
|
||||
self.get_mut().queue.size = num;
|
||||
}
|
||||
|
||||
pub(crate) fn set_queue_event_idx(&self, enabled: bool) {
|
||||
self.get_mut().queue.set_event_idx(enabled);
|
||||
}
|
||||
|
||||
pub(crate) fn set_queue_ready(&self, ready: bool) {
|
||||
self.get_mut().queue.ready = ready;
|
||||
}
|
||||
|
||||
pub(crate) fn set_kick(&self, file: Option<File>) {
|
||||
// SAFETY:
|
||||
// EventFd requires that it has sole ownership of its fd. So does File, so this is safe.
|
||||
// Ideally, we'd have a generic way to refer to a uniquely-owned fd, such as that proposed
|
||||
// by Rust RFC #3128.
|
||||
self.get_mut().kick = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
|
||||
}
|
||||
|
||||
pub(crate) fn read_kick(&self) -> io::Result<bool> {
|
||||
let state = self.get_ref();
|
||||
|
||||
if let Some(kick) = &state.kick {
|
||||
kick.read()?;
|
||||
}
|
||||
|
||||
Ok(state.enabled)
|
||||
}
|
||||
|
||||
pub(crate) fn set_call(&self, file: Option<File>) {
|
||||
// SAFETY: see comment in set_kick()
|
||||
self.get_mut().call = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
|
||||
}
|
||||
|
||||
pub(crate) fn set_err(&self, file: Option<File>) {
|
||||
// SAFETY: see comment in set_kick()
|
||||
self.get_mut().err = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use vm_memory::bitmap::AtomicBitmap;
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
#[test]
|
||||
fn test_new_vring() {
|
||||
let mem = GuestMemoryAtomic::new(
|
||||
GuestMemoryMmap::<AtomicBitmap>::from_ranges(&[(GuestAddress(0x100000), 0x10000)])
|
||||
.unwrap(),
|
||||
);
|
||||
let vring = Vring::new(mem, 0x1000);
|
||||
|
||||
assert!(vring.get_ref().get_kick().is_none());
|
||||
assert_eq!(vring.get_ref().enabled, false);
|
||||
assert_eq!(vring.get_mut().get_queue_mut().ready, false);
|
||||
assert_eq!(vring.get_mut().get_queue_mut().event_idx_enabled, false);
|
||||
|
||||
vring.set_enabled(true);
|
||||
assert_eq!(vring.get_ref().enabled, true);
|
||||
|
||||
vring.set_queue_info(0x100100, 0x100200, 0x100300);
|
||||
assert_eq!(
|
||||
vring.get_mut().get_queue_mut().desc_table,
|
||||
GuestAddress(0x100100)
|
||||
);
|
||||
assert_eq!(
|
||||
vring.get_mut().get_queue_mut().avail_ring,
|
||||
GuestAddress(0x100200)
|
||||
);
|
||||
assert_eq!(
|
||||
vring.get_mut().get_queue_mut().used_ring,
|
||||
GuestAddress(0x100300)
|
||||
);
|
||||
|
||||
assert_eq!(vring.queue_next_avail(), 0);
|
||||
vring.set_queue_next_avail(0x20);
|
||||
assert_eq!(vring.queue_next_avail(), 0x20);
|
||||
|
||||
vring.set_queue_size(0x200);
|
||||
assert_eq!(vring.get_mut().get_queue_mut().size, 0x200);
|
||||
|
||||
vring.set_queue_event_idx(true);
|
||||
assert_eq!(vring.get_mut().get_queue_mut().event_idx_enabled, true);
|
||||
|
||||
vring.set_queue_ready(true);
|
||||
assert_eq!(vring.get_mut().get_queue_mut().ready, true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vring_set_fd() {
|
||||
let mem = GuestMemoryAtomic::new(
|
||||
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||
);
|
||||
let vring = Vring::new(mem, 0x1000);
|
||||
|
||||
vring.set_enabled(true);
|
||||
assert_eq!(vring.get_ref().enabled, true);
|
||||
|
||||
let eventfd = EventFd::new(0).unwrap();
|
||||
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
|
||||
assert!(vring.get_ref().kick.is_none());
|
||||
assert_eq!(vring.read_kick().unwrap(), true);
|
||||
vring.set_kick(Some(file));
|
||||
eventfd.write(1).unwrap();
|
||||
assert_eq!(vring.read_kick().unwrap(), true);
|
||||
assert!(vring.get_ref().kick.is_some());
|
||||
vring.set_kick(None);
|
||||
assert!(vring.get_ref().kick.is_none());
|
||||
std::mem::forget(eventfd);
|
||||
|
||||
let eventfd = EventFd::new(0).unwrap();
|
||||
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
|
||||
assert!(vring.get_ref().call.is_none());
|
||||
vring.set_call(Some(file));
|
||||
assert!(vring.get_ref().call.is_some());
|
||||
vring.set_call(None);
|
||||
assert!(vring.get_ref().call.is_none());
|
||||
std::mem::forget(eventfd);
|
||||
|
||||
let eventfd = EventFd::new(0).unwrap();
|
||||
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
|
||||
assert!(vring.get_ref().err.is_none());
|
||||
vring.set_err(Some(file));
|
||||
assert!(vring.get_ref().err.is_some());
|
||||
vring.set_err(None);
|
||||
assert!(vring.get_ref().err.is_none());
|
||||
std::mem::forget(eventfd);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue