Convert vhost repo as a workspace (#114)

The workspace will contain vhost and vhost-user-backend crates for now
and will get vhost-user-vmm later on.
This commit is contained in:
Andreea Florescu 2022-10-17 15:22:11 +03:00 committed by GitHub
commit 6078c590b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
38 changed files with 2842 additions and 67 deletions

View file

@ -1,5 +1,13 @@
version: 2
updates:
- package-ecosystem: cargo
directory: "/"
schedule:
interval: weekly
open-pull-requests-limit: 3
allow:
- dependency-type: direct
- dependency-type: indirect
- package-ecosystem: gitsubmodule
directory: "/"
schedule:

View file

@ -1,36 +1,6 @@
[package]
name = "vhost"
version = "0.5.0"
keywords = ["vhost", "vhost-user", "virtio", "vdpa"]
description = "a pure rust library for vdpa, vhost and vhost-user"
authors = ["Liu Jiang <gerry@linux.alibaba.com>"]
repository = "https://github.com/rust-vmm/vhost"
documentation = "https://docs.rs/vhost"
readme = "README.md"
license = "Apache-2.0 OR BSD-3-Clause"
edition = "2018"
[workspace]
[package.metadata.docs.rs]
all-features = true
[features]
default = []
vhost-vsock = []
vhost-kern = []
vhost-vdpa = ["vhost-kern"]
vhost-net = ["vhost-kern"]
vhost-user = []
vhost-user-master = ["vhost-user"]
vhost-user-slave = ["vhost-user"]
[dependencies]
bitflags = "1.0"
libc = "0.2.39"
vmm-sys-util = "0.10.0"
vm-memory = "0.9.0"
[dev-dependencies]
tempfile = "3.2.0"
vm-memory = { version = "0.9.0", features=["backend-mmap"] }
serial_test = "0.5"
members = [
"crates/vhost",
"crates/vhost-user-backend",
]

View file

@ -1,32 +1,8 @@
# vHost
A pure rust library for vDPA, vhost and vhost-user.
# vhost
The `vhost` crate aims to help implementing dataplane for virtio backend drivers. It supports three different types of dataplane drivers:
- vhost: the dataplane is implemented by linux kernel
- vhost-user: the dataplane is implemented by dedicated vhost-user servers
- vDPA(vhost DataPath Accelerator): the dataplane is implemented by hardwares
The `vhost` workspace hosts libraries related to the vhost and vhost-user
protocols. It currently consists of the following crates:
The main relationship among Traits and Structs exported by the `vhost` crate is as below:
![vhost Architecture](/docs/vhost_architecture.png)
## Kernel-based vHost Backend Drivers
The vhost drivers in Linux provide in-kernel virtio device emulation. Normally
the hypervisor userspace process emulates I/O accesses from the guest.
Vhost puts virtio emulation code into the kernel, taking hypervisor userspace
out of the picture. This allows device emulation code to directly call into
kernel subsystems instead of performing system calls from userspace.
The hypervisor relies on ioctl based interfaces to control those in-kernel
vhost drivers, such as vhost-net, vhost-scsi and vhost-vsock etc.
## vHost-user Backend Drivers
The [vhost-user protocol](https://qemu.readthedocs.io/en/latest/interop/vhost-user.html#communication) aims to implement vhost backend drivers in
userspace, which complements the ioctl interface used to control the vhost
implementation in the Linux kernel. It implements the control plane needed
to establish virtqueue sharing with a user space process on the same host.
It uses communication over a Unix domain socket to share file descriptors in
the ancillary data of the message.
The protocol defines two sides of the communication, master and slave.
Master is the application that shares its virtqueues, slave is the consumer
of the virtqueues. Master and slave can be either a client (i.e. connecting)
or server (listening) in the socket communication.
- `vhost` -> A pure rust library for vDPA, vhost and vhost-user.
- `vhost-user-backend` -> It provides a framework to implement `vhost-user`
backend services.

View file

@ -1 +1,5 @@
{"coverage_score": 39.8, "exclude_path": "", "crate_features": "vhost-vsock,vhost-kern,vhost-user-master,vhost-user-slave"}
{
"coverage_score": 39.8,
"exclude_path": "",
"crate_features": "vhost/vhost-vsock,vhost/vhost-kern,vhost/vhost-user-master,vhost/vhost-user-slave"
}

View file

@ -1 +1,5 @@
{"coverage_score": 80.5, "exclude_path": "src/vhost_kern/", "crate_features": "vhost-user-master,vhost-user-slave"}
{
"coverage_score": 83.3,
"exclude_path": "vhost/src/vhost_kern/",
"crate_features": "vhost/vhost-user-master,vhost/vhost-user-slave"
}

View file

@ -0,0 +1,66 @@
# Changelog
## [Unreleased]
### Added
### Changed
### Fixed
### Deprecated
## v0.7.0
### Changed
- Started using caret dependencies
- Updated dependency nix 0.24 -> 0.25
- Updated depepdency log 0.4.6 -> 0.4.17
- Updated dependency vhost 0.4 -> 0.5
- Updated dependency virtio-queue 0.5.0 -> 0.6
- Updated dependency vm-memory 0.7 -> 0.9
## v0.6.0
### Changed
- Moved to rust-vmm/virtio-queue v0.5.0
### Fixed
- Fixed vring initialization logic
## v0.5.1
### Changed
- Moved to rust-vmm/vmm-sys-util 0.10.0
## v0.5.0
### Changed
- Moved to rust-vmm/virtio-queue v0.4.0
## v0.4.0
### Changed
- Moved to rust-vmm/virtio-queue v0.3.0
- Relaxed rust-vmm/vm-memory dependency to require ">=0.7"
## v0.3.0
### Changed
- Moved to rust-vmm/vhost v0.4.0
## v0.2.0
### Added
- Ability to run the daemon as a client
- VringEpollHandler implements AsRawFd
## v0.1.0
First release

View file

@ -0,0 +1,23 @@
[package]
name = "vhost-user-backend"
version = "0.7.0"
authors = ["The Cloud Hypervisor Authors"]
keywords = ["vhost-user", "virtio"]
description = "A framework to build vhost-user backend service daemon"
edition = "2018"
license = "Apache-2.0"
[dependencies]
libc = "0.2.39"
log = "0.4.17"
vhost = { path = "../vhost", version = "0.5", features = ["vhost-user-slave"] }
virtio-bindings = "0.1"
virtio-queue = "0.6"
vm-memory = { version = "0.9", features = ["backend-mmap", "backend-atomic"] }
vmm-sys-util = "0.10"
[dev-dependencies]
nix = "0.25"
vhost = { path = "../vhost", version = "0.5", features = ["vhost-user-master", "vhost-user-slave"] }
vm-memory = { version = "0.9", features = ["backend-mmap", "backend-atomic", "backend-bitmap"] }
tempfile = "3.2.0"

View file

@ -0,0 +1,105 @@
# vhost-user-backend
## Design
The `vhost-user-backend` crate provides a framework to implement `vhost-user` backend services,
which includes following external public APIs:
- A daemon control object (`VhostUserDaemon`) to start and stop the service daemon.
- A vhost-user backend trait (`VhostUserBackendMut`) to handle vhost-user control messages and virtio
messages.
- A vring access trait (`VringT`) to access virtio queues, and three implementations of the trait:
`VringState`, `VringMutex` and `VringRwLock`.
## Usage
The `vhost-user-backend` crate provides a framework to implement vhost-user backend services. The main interface provided by `vhost-user-backend` library is the `struct VhostUserDaemon`:
```rust
pub struct VhostUserDaemon<S, V, B = ()>
where
S: VhostUserBackend<V, B>,
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
B: Bitmap + 'static,
{
pub fn new(name: String, backend: S, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>) -> Result<Self>;
pub fn start(&mut self, listener: Listener) -> Result<()>;
pub fn wait(&mut self) -> Result<()>;
pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>>;
}
```
### Create a `VhostUserDaemon` Instance
The `VhostUserDaemon::new()` creates an instance of `VhostUserDaemon` object. The client needs to
pass in an `VhostUserBackend` object, which will be used to configure the `VhostUserDaemon`
instance, handle control messages from the vhost-user master and handle virtio requests from
virtio queues. A group of working threads will be created to handle virtio requests from configured
virtio queues.
### Start the `VhostUserDaemon`
The `VhostUserDaemon::start()` method waits for an incoming connection from the vhost-user masters
on the `listener`. Once a connection is ready, a main thread will be created to handle vhost-user
messages from the vhost-user master.
### Stop the `VhostUserDaemon`
The `VhostUserDaemon::stop()` method waits for the main thread to exit. An exit event must be sent
to the main thread by writing to the `exit_event` EventFd before waiting for it to exit.
### Threading Model
The main thread and virtio queue working threads will concurrently access the underlying virtio
queues, so all virtio queue in multi-threading model. But the main thread only accesses virtio
queues for configuration, so client could adopt locking policies to optimize for the virtio queue
working threads.
## Example
Example code to handle virtio messages from a virtio queue:
```rust
impl VhostUserBackendMut for VhostUserService {
fn process_queue(&mut self, vring: &VringMutex) -> Result<bool> {
let mut used_any = false;
let mem = match &self.mem {
Some(m) => m.memory(),
None => return Err(Error::NoMemoryConfigured),
};
let mut vring_state = vring.get_mut();
while let Some(avail_desc) = vring_state
.get_queue_mut()
.iter()
.map_err(|_| Error::IterateQueue)?
.next()
{
// Process the request...
if self.event_idx {
if vring_state.add_used(head_index, 0).is_err() {
warn!("Couldn't return used descriptors to the ring");
}
match vring_state.needs_notification() {
Err(_) => {
warn!("Couldn't check if queue needs to be notified");
vring_state.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring_state.signal_used_queue().unwrap();
}
}
}
} else {
if vring_state.add_used(head_index, 0).is_err() {
warn!("Couldn't return used descriptors to the ring");
}
vring_state.signal_used_queue().unwrap();
}
}
Ok(used_any)
}
}
```
## License
This project is licensed under
- [Apache License](http://www.apache.org/licenses/LICENSE-2.0), Version 2.0

View file

@ -0,0 +1,555 @@
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//! Traits for vhost user backend servers to implement virtio data plain services.
//!
//! Define two traits for vhost user backend servers to implement virtio data plane services.
//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is
//! designed with interior mutability, so the implementor may choose the suitable way to protect
//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior
//! mutability, and an implementation of:
//! ```ignore
//! impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> { }
//! ```
//! is provided for convenience.
//!
//! [VhostUserBackend]: trait.VhostUserBackend.html
//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html
use std::io::Result;
use std::ops::Deref;
use std::sync::{Arc, Mutex, RwLock};
use vhost::vhost_user::message::VhostUserProtocolFeatures;
use vhost::vhost_user::SlaveFsCacheReq;
use vm_memory::bitmap::Bitmap;
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;
use super::vring::VringT;
use super::GM;
/// Trait with interior mutability for vhost user backend servers to implement concrete services.
///
/// To support multi-threading and asynchronous IO, we enforce `Send + Sync` bound.
pub trait VhostUserBackend<V, B = ()>: Send + Sync
where
V: VringT<GM<B>>,
B: Bitmap + 'static,
{
/// Get number of queues supported.
fn num_queues(&self) -> usize;
/// Get maximum queue size supported.
fn max_queue_size(&self) -> usize;
/// Get available virtio features.
fn features(&self) -> u64;
/// Set acknowledged virtio features.
fn acked_features(&self, _features: u64) {}
/// Get available vhost protocol features.
fn protocol_features(&self) -> VhostUserProtocolFeatures;
/// Enable or disable the virtio EVENT_IDX feature
fn set_event_idx(&self, enabled: bool);
/// Get virtio device configuration.
///
/// A default implementation is provided as we cannot expect all backends to implement this
/// function.
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
Vec::new()
}
/// Set virtio device configuration.
///
/// A default implementation is provided as we cannot expect all backends to implement this
/// function.
fn set_config(&self, _offset: u32, _buf: &[u8]) -> Result<()> {
Ok(())
}
/// Update guest memory regions.
fn update_memory(&self, mem: GM<B>) -> Result<()>;
/// Set handler for communicating with the master by the slave communication channel.
///
/// A default implementation is provided as we cannot expect all backends to implement this
/// function.
///
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {}
/// Get the map to map queue index to worker thread index.
///
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
/// the following two queues will be handled by worker thread 1, and the last four queues will
/// be handled by worker thread 2.
fn queues_per_thread(&self) -> Vec<u64> {
vec![0xffff_ffff]
}
/// Provide an optional exit EventFd for the specified worker thread.
///
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
/// the worker thread will exit.
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
None
}
/// Handle IO events for backend registered file descriptors.
///
/// This function gets called if the backend registered some additional listeners onto specific
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
/// do with events happening on custom listeners.
fn handle_event(
&self,
device_event: u16,
evset: EventSet,
vrings: &[V],
thread_id: usize,
) -> Result<bool>;
}
/// Trait without interior mutability for vhost user backend servers to implement concrete services.
pub trait VhostUserBackendMut<V, B = ()>: Send + Sync
where
V: VringT<GM<B>>,
B: Bitmap + 'static,
{
/// Get number of queues supported.
fn num_queues(&self) -> usize;
/// Get maximum queue size supported.
fn max_queue_size(&self) -> usize;
/// Get available virtio features.
fn features(&self) -> u64;
/// Set acknowledged virtio features.
fn acked_features(&mut self, _features: u64) {}
/// Get available vhost protocol features.
fn protocol_features(&self) -> VhostUserProtocolFeatures;
/// Enable or disable the virtio EVENT_IDX feature
fn set_event_idx(&mut self, enabled: bool);
/// Get virtio device configuration.
///
/// A default implementation is provided as we cannot expect all backends to implement this
/// function.
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
Vec::new()
}
/// Set virtio device configuration.
///
/// A default implementation is provided as we cannot expect all backends to implement this
/// function.
fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> Result<()> {
Ok(())
}
/// Update guest memory regions.
fn update_memory(&mut self, mem: GM<B>) -> Result<()>;
/// Set handler for communicating with the master by the slave communication channel.
///
/// A default implementation is provided as we cannot expect all backends to implement this
/// function.
///
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
/// Get the map to map queue index to worker thread index.
///
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
/// the following two queues will be handled by worker thread 1, and the last four queues will
/// be handled by worker thread 2.
fn queues_per_thread(&self) -> Vec<u64> {
vec![0xffff_ffff]
}
/// Provide an optional exit EventFd for the specified worker thread.
///
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
/// the worker thread will exit.
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
None
}
/// Handle IO events for backend registered file descriptors.
///
/// This function gets called if the backend registered some additional listeners onto specific
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
/// do with events happening on custom listeners.
fn handle_event(
&mut self,
device_event: u16,
evset: EventSet,
vrings: &[V],
thread_id: usize,
) -> Result<bool>;
}
impl<T: VhostUserBackend<V, B>, V, B> VhostUserBackend<V, B> for Arc<T>
where
V: VringT<GM<B>>,
B: Bitmap + 'static,
{
fn num_queues(&self) -> usize {
self.deref().num_queues()
}
fn max_queue_size(&self) -> usize {
self.deref().max_queue_size()
}
fn features(&self) -> u64 {
self.deref().features()
}
fn acked_features(&self, features: u64) {
self.deref().acked_features(features)
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
self.deref().protocol_features()
}
fn set_event_idx(&self, enabled: bool) {
self.deref().set_event_idx(enabled)
}
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
self.deref().get_config(offset, size)
}
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> {
self.deref().set_config(offset, buf)
}
fn update_memory(&self, mem: GM<B>) -> Result<()> {
self.deref().update_memory(mem)
}
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
self.deref().set_slave_req_fd(vu_req)
}
fn queues_per_thread(&self) -> Vec<u64> {
self.deref().queues_per_thread()
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
self.deref().exit_event(thread_index)
}
fn handle_event(
&self,
device_event: u16,
evset: EventSet,
vrings: &[V],
thread_id: usize,
) -> Result<bool> {
self.deref()
.handle_event(device_event, evset, vrings, thread_id)
}
}
impl<T: VhostUserBackendMut<V, B>, V, B> VhostUserBackend<V, B> for Mutex<T>
where
V: VringT<GM<B>>,
B: Bitmap + 'static,
{
fn num_queues(&self) -> usize {
self.lock().unwrap().num_queues()
}
fn max_queue_size(&self) -> usize {
self.lock().unwrap().max_queue_size()
}
fn features(&self) -> u64 {
self.lock().unwrap().features()
}
fn acked_features(&self, features: u64) {
self.lock().unwrap().acked_features(features)
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
self.lock().unwrap().protocol_features()
}
fn set_event_idx(&self, enabled: bool) {
self.lock().unwrap().set_event_idx(enabled)
}
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
self.lock().unwrap().get_config(offset, size)
}
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> {
self.lock().unwrap().set_config(offset, buf)
}
fn update_memory(&self, mem: GM<B>) -> Result<()> {
self.lock().unwrap().update_memory(mem)
}
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
self.lock().unwrap().set_slave_req_fd(vu_req)
}
fn queues_per_thread(&self) -> Vec<u64> {
self.lock().unwrap().queues_per_thread()
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
self.lock().unwrap().exit_event(thread_index)
}
fn handle_event(
&self,
device_event: u16,
evset: EventSet,
vrings: &[V],
thread_id: usize,
) -> Result<bool> {
self.lock()
.unwrap()
.handle_event(device_event, evset, vrings, thread_id)
}
}
impl<T: VhostUserBackendMut<V, B>, V, B> VhostUserBackend<V, B> for RwLock<T>
where
V: VringT<GM<B>>,
B: Bitmap + 'static,
{
fn num_queues(&self) -> usize {
self.read().unwrap().num_queues()
}
fn max_queue_size(&self) -> usize {
self.read().unwrap().max_queue_size()
}
fn features(&self) -> u64 {
self.read().unwrap().features()
}
fn acked_features(&self, features: u64) {
self.write().unwrap().acked_features(features)
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
self.read().unwrap().protocol_features()
}
fn set_event_idx(&self, enabled: bool) {
self.write().unwrap().set_event_idx(enabled)
}
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
self.read().unwrap().get_config(offset, size)
}
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> {
self.write().unwrap().set_config(offset, buf)
}
fn update_memory(&self, mem: GM<B>) -> Result<()> {
self.write().unwrap().update_memory(mem)
}
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
self.write().unwrap().set_slave_req_fd(vu_req)
}
fn queues_per_thread(&self) -> Vec<u64> {
self.read().unwrap().queues_per_thread()
}
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
self.read().unwrap().exit_event(thread_index)
}
fn handle_event(
&self,
device_event: u16,
evset: EventSet,
vrings: &[V],
thread_id: usize,
) -> Result<bool> {
self.write()
.unwrap()
.handle_event(device_event, evset, vrings, thread_id)
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::VringRwLock;
use std::sync::Mutex;
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
pub struct MockVhostBackend {
events: u64,
event_idx: bool,
acked_features: u64,
}
impl MockVhostBackend {
pub fn new() -> Self {
MockVhostBackend {
events: 0,
event_idx: false,
acked_features: 0,
}
}
}
impl VhostUserBackendMut<VringRwLock, ()> for MockVhostBackend {
fn num_queues(&self) -> usize {
2
}
fn max_queue_size(&self) -> usize {
256
}
fn features(&self) -> u64 {
0xffff_ffff_ffff_ffff
}
fn acked_features(&mut self, features: u64) {
self.acked_features = features;
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
VhostUserProtocolFeatures::all()
}
fn set_event_idx(&mut self, enabled: bool) {
self.event_idx = enabled;
}
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
assert_eq!(offset, 0x200);
assert_eq!(size, 8);
vec![0xa5u8; 8]
}
fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<()> {
assert_eq!(offset, 0x200);
assert_eq!(buf.len(), 8);
assert_eq!(buf, &[0xa5u8; 8]);
Ok(())
}
fn update_memory(&mut self, _atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>) -> Result<()> {
Ok(())
}
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
fn queues_per_thread(&self) -> Vec<u64> {
vec![1, 1]
}
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
let event_fd = EventFd::new(0).unwrap();
Some(event_fd)
}
fn handle_event(
&mut self,
_device_event: u16,
_evset: EventSet,
_vrings: &[VringRwLock],
_thread_id: usize,
) -> Result<bool> {
self.events += 1;
Ok(false)
}
}
#[test]
fn test_new_mock_backend_mutex() {
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
assert_eq!(backend.num_queues(), 2);
assert_eq!(backend.max_queue_size(), 256);
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
assert_eq!(
backend.protocol_features(),
VhostUserProtocolFeatures::all()
);
assert_eq!(backend.queues_per_thread(), [1, 1]);
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
backend.set_config(0x200, &[0xa5; 8]).unwrap();
backend.acked_features(0xffff);
assert_eq!(backend.lock().unwrap().acked_features, 0xffff);
backend.set_event_idx(true);
assert!(backend.lock().unwrap().event_idx);
let _ = backend.exit_event(0).unwrap();
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
backend.update_memory(mem).unwrap();
}
#[test]
fn test_new_mock_backend_rwlock() {
let backend = Arc::new(RwLock::new(MockVhostBackend::new()));
assert_eq!(backend.num_queues(), 2);
assert_eq!(backend.max_queue_size(), 256);
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
assert_eq!(
backend.protocol_features(),
VhostUserProtocolFeatures::all()
);
assert_eq!(backend.queues_per_thread(), [1, 1]);
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
backend.set_config(0x200, &[0xa5; 8]).unwrap();
backend.acked_features(0xffff);
assert_eq!(backend.read().unwrap().acked_features, 0xffff);
backend.set_event_idx(true);
assert!(backend.read().unwrap().event_idx);
let _ = backend.exit_event(0).unwrap();
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
backend.update_memory(mem.clone()).unwrap();
let vring = VringRwLock::new(mem, 0x1000).unwrap();
backend
.handle_event(0x1, EventSet::IN, &[vring], 0)
.unwrap();
}
}

View file

@ -0,0 +1,281 @@
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
use std::fmt::{Display, Formatter};
use std::io::{self, Result};
use std::marker::PhantomData;
use std::os::unix::io::{AsRawFd, RawFd};
use vm_memory::bitmap::Bitmap;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::eventfd::EventFd;
use super::backend::VhostUserBackend;
use super::vring::VringT;
use super::GM;
/// Errors related to vring epoll event handling.
#[derive(Debug)]
pub enum VringEpollError {
/// Failed to create epoll file descriptor.
EpollCreateFd(io::Error),
/// Failed while waiting for events.
EpollWait(io::Error),
/// Could not register exit event
RegisterExitEvent(io::Error),
/// Failed to read the event from kick EventFd.
HandleEventReadKick(io::Error),
/// Failed to handle the event from the backend.
HandleEventBackendHandling(io::Error),
}
impl Display for VringEpollError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
VringEpollError::EpollCreateFd(e) => write!(f, "cannot create epoll fd: {}", e),
VringEpollError::EpollWait(e) => write!(f, "failed to wait for epoll event: {}", e),
VringEpollError::RegisterExitEvent(e) => write!(f, "cannot register exit event: {}", e),
VringEpollError::HandleEventReadKick(e) => {
write!(f, "cannot read vring kick event: {}", e)
}
VringEpollError::HandleEventBackendHandling(e) => {
write!(f, "failed to handle epoll event: {}", e)
}
}
}
}
impl std::error::Error for VringEpollError {}
/// Result of vring epoll operations.
pub type VringEpollResult<T> = std::result::Result<T, VringEpollError>;
/// Epoll event handler to manage and process epoll events for registered file descriptor.
///
/// The `VringEpollHandler` structure provides interfaces to:
/// - add file descriptors to be monitored by the epoll fd
/// - remove registered file descriptors from the epoll fd
/// - run the event loop to handle pending events on the epoll fd
pub struct VringEpollHandler<S, V, B> {
epoll: Epoll,
backend: S,
vrings: Vec<V>,
thread_id: usize,
exit_event_fd: Option<EventFd>,
phantom: PhantomData<B>,
}
impl<S, V, B> VringEpollHandler<S, V, B> {
/// Send `exit event` to break the event loop.
pub fn send_exit_event(&self) {
if let Some(eventfd) = self.exit_event_fd.as_ref() {
let _ = eventfd.write(1);
}
}
}
impl<S, V, B> VringEpollHandler<S, V, B>
where
S: VhostUserBackend<V, B>,
V: VringT<GM<B>>,
B: Bitmap + 'static,
{
/// Create a `VringEpollHandler` instance.
pub(crate) fn new(backend: S, vrings: Vec<V>, thread_id: usize) -> VringEpollResult<Self> {
let epoll = Epoll::new().map_err(VringEpollError::EpollCreateFd)?;
let handler = match backend.exit_event(thread_id) {
Some(exit_event_fd) => {
let id = backend.num_queues();
epoll
.ctl(
ControlOperation::Add,
exit_event_fd.as_raw_fd(),
EpollEvent::new(EventSet::IN, id as u64),
)
.map_err(VringEpollError::RegisterExitEvent)?;
VringEpollHandler {
epoll,
backend,
vrings,
thread_id,
exit_event_fd: Some(exit_event_fd),
phantom: PhantomData,
}
}
None => VringEpollHandler {
epoll,
backend,
vrings,
thread_id,
exit_event_fd: None,
phantom: PhantomData,
},
};
Ok(handler)
}
/// Register an event into the epoll fd.
///
/// When this event is later triggered, the backend implementation of `handle_event` will be
/// called.
pub fn register_listener(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
// `data` range [0...num_queues] is reserved for queues and exit event.
if data <= self.backend.num_queues() as u64 {
Err(io::Error::from_raw_os_error(libc::EINVAL))
} else {
self.register_event(fd, ev_type, data)
}
}
/// Unregister an event from the epoll fd.
///
/// If the event is triggered after this function has been called, the event will be silently
/// dropped.
pub fn unregister_listener(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
// `data` range [0...num_queues] is reserved for queues and exit event.
if data <= self.backend.num_queues() as u64 {
Err(io::Error::from_raw_os_error(libc::EINVAL))
} else {
self.unregister_event(fd, ev_type, data)
}
}
pub(crate) fn register_event(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
self.epoll
.ctl(ControlOperation::Add, fd, EpollEvent::new(ev_type, data))
}
pub(crate) fn unregister_event(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
self.epoll
.ctl(ControlOperation::Delete, fd, EpollEvent::new(ev_type, data))
}
/// Run the event poll loop to handle all pending events on registered fds.
///
/// The event loop will be terminated once an event is received from the `exit event fd`
/// associated with the backend.
pub(crate) fn run(&self) -> VringEpollResult<()> {
const EPOLL_EVENTS_LEN: usize = 100;
let mut events = vec![EpollEvent::new(EventSet::empty(), 0); EPOLL_EVENTS_LEN];
'epoll: loop {
let num_events = match self.epoll.wait(-1, &mut events[..]) {
Ok(res) => res,
Err(e) => {
if e.kind() == io::ErrorKind::Interrupted {
// It's well defined from the epoll_wait() syscall
// documentation that the epoll loop can be interrupted
// before any of the requested events occurred or the
// timeout expired. In both those cases, epoll_wait()
// returns an error of type EINTR, but this should not
// be considered as a regular error. Instead it is more
// appropriate to retry, by calling into epoll_wait().
continue;
}
return Err(VringEpollError::EpollWait(e));
}
};
for event in events.iter().take(num_events) {
let evset = match EventSet::from_bits(event.events) {
Some(evset) => evset,
None => {
let evbits = event.events;
println!("epoll: ignoring unknown event set: 0x{:x}", evbits);
continue;
}
};
let ev_type = event.data() as u16;
// handle_event() returns true if an event is received from the exit event fd.
if self.handle_event(ev_type, evset)? {
break 'epoll;
}
}
}
Ok(())
}
fn handle_event(&self, device_event: u16, evset: EventSet) -> VringEpollResult<bool> {
if self.exit_event_fd.is_some() && device_event as usize == self.backend.num_queues() {
return Ok(true);
}
if (device_event as usize) < self.vrings.len() {
let vring = &self.vrings[device_event as usize];
let enabled = vring
.read_kick()
.map_err(VringEpollError::HandleEventReadKick)?;
// If the vring is not enabled, it should not be processed.
if !enabled {
return Ok(false);
}
}
self.backend
.handle_event(device_event, evset, &self.vrings, self.thread_id)
.map_err(VringEpollError::HandleEventBackendHandling)
}
}
impl<S, V, B> AsRawFd for VringEpollHandler<S, V, B> {
fn as_raw_fd(&self) -> RawFd {
self.epoll.as_raw_fd()
}
}
#[cfg(test)]
mod tests {
use super::super::backend::tests::MockVhostBackend;
use super::super::vring::VringRwLock;
use super::*;
use std::sync::{Arc, Mutex};
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::eventfd::EventFd;
#[test]
fn test_vring_epoll_handler() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let vring = VringRwLock::new(mem, 0x1000).unwrap();
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let handler = VringEpollHandler::new(backend, vec![vring], 0x1).unwrap();
let eventfd = EventFd::new(0).unwrap();
handler
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.unwrap();
// Register an already registered fd.
handler
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.unwrap_err();
// Register an invalid data.
handler
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 1)
.unwrap_err();
handler
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.unwrap();
// unregister an already unregistered fd.
handler
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
.unwrap_err();
// unregister an invalid data.
handler
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 1)
.unwrap_err();
// Check we retrieve the correct file descriptor
assert_eq!(handler.as_raw_fd(), handler.epoll.as_raw_fd());
}
}

View file

@ -0,0 +1,601 @@
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
use std::error;
use std::fs::File;
use std::io;
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use std::thread;
use vhost::vhost_user::message::{
VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures,
VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags,
VhostUserVringState,
};
use vhost::vhost_user::{
Error as VhostUserError, Result as VhostUserResult, SlaveFsCacheReq,
VhostUserSlaveReqHandlerMut,
};
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_queue::{Error as VirtQueError, QueueT};
use vm_memory::bitmap::Bitmap;
use vm_memory::mmap::NewBitmap;
use vm_memory::{
FileOffset, GuestAddress, GuestAddressSpace, GuestMemoryMmap, GuestRegionMmap, MmapRegion,
};
use vmm_sys_util::epoll::EventSet;
use super::backend::VhostUserBackend;
use super::event_loop::VringEpollHandler;
use super::event_loop::{VringEpollError, VringEpollResult};
use super::vring::VringT;
use super::GM;
const MAX_MEM_SLOTS: u64 = 32;
#[derive(Debug)]
/// Errors related to vhost-user handler.
pub enum VhostUserHandlerError {
/// Failed to create a `Vring`.
CreateVring(VirtQueError),
/// Failed to create vring worker.
CreateEpollHandler(VringEpollError),
/// Failed to spawn vring worker.
SpawnVringWorker(io::Error),
/// Could not find the mapping from memory regions.
MissingMemoryMapping,
}
impl std::fmt::Display for VhostUserHandlerError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
VhostUserHandlerError::CreateVring(e) => {
write!(f, "failed to create vring: {}", e)
}
VhostUserHandlerError::CreateEpollHandler(e) => {
write!(f, "failed to create vring epoll handler: {}", e)
}
VhostUserHandlerError::SpawnVringWorker(e) => {
write!(f, "failed spawning the vring worker: {}", e)
}
VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"),
}
}
}
impl error::Error for VhostUserHandlerError {}
/// Result of vhost-user handler operations.
pub type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
struct AddrMapping {
vmm_addr: u64,
size: u64,
gpa_base: u64,
}
pub struct VhostUserHandler<S, V, B: Bitmap + 'static> {
backend: S,
handlers: Vec<Arc<VringEpollHandler<S, V, B>>>,
owned: bool,
features_acked: bool,
acked_features: u64,
acked_protocol_features: u64,
num_queues: usize,
max_queue_size: usize,
queues_per_thread: Vec<u64>,
mappings: Vec<AddrMapping>,
atomic_mem: GM<B>,
vrings: Vec<V>,
worker_threads: Vec<thread::JoinHandle<VringEpollResult<()>>>,
}
// Ensure VhostUserHandler: Clone + Send + Sync + 'static.
impl<S, V, B> VhostUserHandler<S, V, B>
where
S: VhostUserBackend<V, B> + Clone + 'static,
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
B: Bitmap + Clone + Send + Sync + 'static,
{
pub(crate) fn new(backend: S, atomic_mem: GM<B>) -> VhostUserHandlerResult<Self> {
let num_queues = backend.num_queues();
let max_queue_size = backend.max_queue_size();
let queues_per_thread = backend.queues_per_thread();
let mut vrings = Vec::new();
for _ in 0..num_queues {
let vring = V::new(atomic_mem.clone(), max_queue_size as u16)
.map_err(VhostUserHandlerError::CreateVring)?;
vrings.push(vring);
}
let mut handlers = Vec::new();
let mut worker_threads = Vec::new();
for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
let mut thread_vrings = Vec::new();
for (index, vring) in vrings.iter().enumerate() {
if (queues_mask >> index) & 1u64 == 1u64 {
thread_vrings.push(vring.clone());
}
}
let handler = Arc::new(
VringEpollHandler::new(backend.clone(), thread_vrings, thread_id)
.map_err(VhostUserHandlerError::CreateEpollHandler)?,
);
let handler2 = handler.clone();
let worker_thread = thread::Builder::new()
.name("vring_worker".to_string())
.spawn(move || handler2.run())
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
handlers.push(handler);
worker_threads.push(worker_thread);
}
Ok(VhostUserHandler {
backend,
handlers,
owned: false,
features_acked: false,
acked_features: 0,
acked_protocol_features: 0,
num_queues,
max_queue_size,
queues_per_thread,
mappings: Vec::new(),
atomic_mem,
vrings,
worker_threads,
})
}
}
impl<S, V, B: Bitmap> VhostUserHandler<S, V, B> {
pub(crate) fn send_exit_event(&self) {
for handler in self.handlers.iter() {
handler.send_exit_event();
}
}
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
for mapping in self.mappings.iter() {
if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size {
return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base);
}
}
Err(VhostUserHandlerError::MissingMemoryMapping)
}
}
impl<S, V, B> VhostUserHandler<S, V, B>
where
S: VhostUserBackend<V, B>,
V: VringT<GM<B>>,
B: Bitmap,
{
pub(crate) fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> {
self.handlers.clone()
}
fn vring_needs_init(&self, vring: &V) -> bool {
let vring_state = vring.get_ref();
// If the vring wasn't initialized and we already have an EventFd for
// VRING_KICK, initialize it now.
!vring_state.get_queue().ready() && vring_state.get_kick().is_some()
}
fn initialize_vring(&self, vring: &V, index: u8) -> VhostUserResult<()> {
assert!(vring.get_ref().get_kick().is_some());
if let Some(fd) = vring.get_ref().get_kick() {
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.handlers[thread_index]
.register_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
self.vrings[index as usize].set_queue_ready(true);
Ok(())
}
}
impl<S, V, B> VhostUserSlaveReqHandlerMut for VhostUserHandler<S, V, B>
where
S: VhostUserBackend<V, B>,
V: VringT<GM<B>>,
B: NewBitmap + Clone,
{
fn set_owner(&mut self) -> VhostUserResult<()> {
if self.owned {
return Err(VhostUserError::InvalidOperation("already claimed"));
}
self.owned = true;
Ok(())
}
fn reset_owner(&mut self) -> VhostUserResult<()> {
self.owned = false;
self.features_acked = false;
self.acked_features = 0;
self.acked_protocol_features = 0;
Ok(())
}
fn get_features(&mut self) -> VhostUserResult<u64> {
Ok(self.backend.features())
}
fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
if (features & !self.backend.features()) != 0 {
return Err(VhostUserError::InvalidParam);
}
self.acked_features = features;
self.features_acked = true;
// If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated,
// the ring is initialized in an enabled state.
// If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated,
// the ring is initialized in a disabled state. Client must not
// pass data to/from the backend until ring is enabled by
// VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has
// been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0.
let vring_enabled =
self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0;
for vring in self.vrings.iter_mut() {
vring.set_enabled(vring_enabled);
}
self.backend.acked_features(self.acked_features);
Ok(())
}
fn set_mem_table(
&mut self,
ctx: &[VhostUserMemoryRegion],
files: Vec<File>,
) -> VhostUserResult<()> {
// We need to create tuple of ranges from the list of VhostUserMemoryRegion
// that we get from the caller.
let mut regions: Vec<(GuestAddress, usize, Option<FileOffset>)> = Vec::new();
let mut mappings: Vec<AddrMapping> = Vec::new();
for (region, file) in ctx.iter().zip(files) {
let g_addr = GuestAddress(region.guest_phys_addr);
let len = region.memory_size as usize;
let f_off = FileOffset::new(file, region.mmap_offset);
regions.push((g_addr, len, Some(f_off)));
mappings.push(AddrMapping {
vmm_addr: region.user_addr,
size: region.memory_size,
gpa_base: region.guest_phys_addr,
});
}
let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
// Updating the inner GuestMemory object here will cause all our vrings to
// see the new one the next time they call to `atomic_mem.memory()`.
self.atomic_mem.lock().unwrap().replace(mem);
self.backend
.update_memory(self.atomic_mem.clone())
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.mappings = mappings;
Ok(())
}
fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> {
if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize].set_queue_size(num as u16);
Ok(())
}
fn set_vring_addr(
&mut self,
index: u32,
_flags: VhostUserVringAddrFlags,
descriptor: u64,
used: u64,
available: u64,
_log: u64,
) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
if !self.mappings.is_empty() {
let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
let used_ring = self.vmm_va_to_gpa(used).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.vrings[index as usize]
.set_queue_info(desc_table, avail_ring, used_ring)
.map_err(|_| VhostUserError::InvalidParam)?;
Ok(())
} else {
Err(VhostUserError::InvalidParam)
}
}
fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0;
self.vrings[index as usize].set_queue_next_avail(base as u16);
self.vrings[index as usize].set_queue_event_idx(event_idx);
self.backend.set_event_idx(event_idx);
Ok(())
}
fn get_vring_base(&mut self, index: u32) -> VhostUserResult<VhostUserVringState> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
// Quote from vhost-user specification:
// Client must start ring upon receiving a kick (that is, detecting
// that file descriptor is readable) on the descriptor specified by
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
// VHOST_USER_GET_VRING_BASE.
self.vrings[index as usize].set_queue_ready(false);
if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() {
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.handlers[thread_index]
.unregister_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
self.vrings[index as usize].set_kick(None);
self.vrings[index as usize].set_call(None);
// Strictly speaking, we should do this upon receiving the first kick,
// but it's actually easier to just do it here so we're ready in case
// the vring gets re-initialized by the guest.
self.vrings[index as usize]
.get_mut()
.get_queue_mut()
.reset();
let next_avail = self.vrings[index as usize].queue_next_avail();
Ok(VhostUserVringState::new(index, u32::from(next_avail)))
}
fn set_vring_kick(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
// SAFETY: EventFd requires that it has sole ownership of its fd. So
// does File, so this is safe.
// Ideally, we'd have a generic way to refer to a uniquely-owned fd,
// such as that proposed by Rust RFC #3128.
self.vrings[index as usize].set_kick(file);
if self.vring_needs_init(&self.vrings[index as usize]) {
self.initialize_vring(&self.vrings[index as usize], index)?;
}
Ok(())
}
fn set_vring_call(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize].set_call(file);
if self.vring_needs_init(&self.vrings[index as usize]) {
self.initialize_vring(&self.vrings[index as usize], index)?;
}
Ok(())
}
fn set_vring_err(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
self.vrings[index as usize].set_err(file);
Ok(())
}
fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
Ok(self.backend.protocol_features())
}
fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> {
// Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must
// support this message even before VHOST_USER_SET_FEATURES was
// called.
self.acked_protocol_features = features;
Ok(())
}
fn get_queue_num(&mut self) -> VhostUserResult<u64> {
Ok(self.num_queues as u64)
}
fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> {
// This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES
// has been negotiated.
if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 {
return Err(VhostUserError::InvalidOperation(
"protocol features not set",
));
} else if index as usize >= self.num_queues {
return Err(VhostUserError::InvalidParam);
}
// Slave must not pass data to/from the backend until ring is
// enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1,
// or after it has been disabled by VHOST_USER_SET_VRING_ENABLE
// with parameter 0.
self.vrings[index as usize].set_enabled(enable);
Ok(())
}
fn get_config(
&mut self,
offset: u32,
size: u32,
_flags: VhostUserConfigFlags,
) -> VhostUserResult<Vec<u8>> {
Ok(self.backend.get_config(offset, size))
}
fn set_config(
&mut self,
offset: u32,
buf: &[u8],
_flags: VhostUserConfigFlags,
) -> VhostUserResult<()> {
self.backend
.set_config(offset, buf)
.map_err(VhostUserError::ReqHandlerError)
}
fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) {
if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 {
vu_req.set_reply_ack_flag(true);
}
self.backend.set_slave_req_fd(vu_req);
}
fn get_inflight_fd(
&mut self,
_inflight: &vhost::vhost_user::message::VhostUserInflight,
) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> {
// Assume the backend hasn't negotiated the inflight feature; it
// wouldn't be correct for the backend to do so, as we don't (yet)
// provide a way for it to handle such requests.
Err(VhostUserError::InvalidOperation("not supported"))
}
fn set_inflight_fd(
&mut self,
_inflight: &vhost::vhost_user::message::VhostUserInflight,
_file: File,
) -> VhostUserResult<()> {
Err(VhostUserError::InvalidOperation("not supported"))
}
fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
Ok(MAX_MEM_SLOTS)
}
fn add_mem_region(
&mut self,
region: &VhostUserSingleMemoryRegion,
file: File,
) -> VhostUserResult<()> {
let mmap_region = MmapRegion::from_file(
FileOffset::new(file, region.mmap_offset),
region.memory_size as usize,
)
.map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?;
let guest_region = Arc::new(
GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err(
|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)),
)?,
);
let mem = self
.atomic_mem
.memory()
.insert_region(guest_region)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.atomic_mem.lock().unwrap().replace(mem);
self.backend
.update_memory(self.atomic_mem.clone())
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.mappings.push(AddrMapping {
vmm_addr: region.user_addr,
size: region.memory_size,
gpa_base: region.guest_phys_addr,
});
Ok(())
}
fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> {
let (mem, _) = self
.atomic_mem
.memory()
.remove_region(GuestAddress(region.guest_phys_addr), region.memory_size)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.atomic_mem.lock().unwrap().replace(mem);
self.backend
.update_memory(self.atomic_mem.clone())
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.mappings
.retain(|mapping| mapping.gpa_base != region.guest_phys_addr);
Ok(())
}
}
impl<S, V, B: Bitmap> Drop for VhostUserHandler<S, V, B> {
fn drop(&mut self) {
// Signal all working threads to exit.
self.send_exit_event();
for thread in self.worker_threads.drain(..) {
if let Err(e) = thread.join() {
error!("Error in vring worker: {:?}", e);
}
}
}
}

View file

@ -0,0 +1,270 @@
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//! A simple framework to run a vhost-user backend service.
#[macro_use]
extern crate log;
use std::fmt::{Display, Formatter};
use std::sync::{Arc, Mutex};
use std::thread;
use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler};
use vm_memory::bitmap::Bitmap;
use vm_memory::mmap::NewBitmap;
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
use self::handler::VhostUserHandler;
mod backend;
pub use self::backend::{VhostUserBackend, VhostUserBackendMut};
mod event_loop;
pub use self::event_loop::VringEpollHandler;
mod handler;
pub use self::handler::VhostUserHandlerError;
mod vring;
pub use self::vring::{
VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT,
};
/// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code.
type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>;
#[derive(Debug)]
/// Errors related to vhost-user daemon.
pub enum Error {
/// Failed to create a new vhost-user handler.
NewVhostUserHandler(VhostUserHandlerError),
/// Failed creating vhost-user slave listener.
CreateSlaveListener(VhostUserError),
/// Failed creating vhost-user slave handler.
CreateSlaveReqHandler(VhostUserError),
/// Failed starting daemon thread.
StartDaemon(std::io::Error),
/// Failed waiting for daemon thread.
WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
/// Failed handling a vhost-user request.
HandleRequest(VhostUserError),
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e),
Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e),
Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e),
Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e),
Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"),
Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e),
}
}
}
/// Result of vhost-user daemon operations.
pub type Result<T> = std::result::Result<T, Error>;
/// Implement a simple framework to run a vhost-user service daemon.
///
/// This structure is the public API the backend is allowed to interact with in order to run
/// a fully functional vhost-user daemon.
pub struct VhostUserDaemon<S, V, B: Bitmap + 'static = ()> {
name: String,
handler: Arc<Mutex<VhostUserHandler<S, V, B>>>,
main_thread: Option<thread::JoinHandle<Result<()>>>,
}
impl<S, V, B> VhostUserDaemon<S, V, B>
where
S: VhostUserBackend<V, B> + Clone + 'static,
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
B: NewBitmap + Clone + Send + Sync,
{
/// Create the daemon instance, providing the backend implementation of `VhostUserBackend`.
///
/// Under the hood, this will start a dedicated thread responsible for listening onto
/// registered event. Those events can be vring events or custom events from the backend,
/// but they get to be registered later during the sequence.
pub fn new(
name: String,
backend: S,
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>,
) -> Result<Self> {
let handler = Arc::new(Mutex::new(
VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?,
));
Ok(VhostUserDaemon {
name,
handler,
main_thread: None,
})
}
/// Run a dedicated thread handling all requests coming through the socket.
/// This runs in an infinite loop that should be terminating once the other
/// end of the socket (the VMM) hangs up.
///
/// This function is the common code for starting a new daemon, no matter if
/// it acts as a client or a server.
fn start_daemon(
&mut self,
mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>,
) -> Result<()> {
let handle = thread::Builder::new()
.name(self.name.clone())
.spawn(move || loop {
handler.handle_request().map_err(Error::HandleRequest)?;
})
.map_err(Error::StartDaemon)?;
self.main_thread = Some(handle);
Ok(())
}
/// Connect to the vhost-user socket and run a dedicated thread handling
/// all requests coming through this socket. This runs in an infinite loop
/// that should be terminating once the other end of the socket (the VMM)
/// hangs up.
pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone())
.map_err(Error::CreateSlaveReqHandler)?;
self.start_daemon(slave_handler)
}
/// Listen to the vhost-user socket and run a dedicated thread handling all requests coming
/// through this socket.
///
/// This runs in an infinite loop that should be terminating once the other end of the socket
/// (the VMM) disconnects.
// TODO: the current implementation has limitations that only one incoming connection will be
// handled from the listener. Should it be enhanced to support reconnection?
pub fn start(&mut self, listener: Listener) -> Result<()> {
let mut slave_listener = SlaveListener::new(listener, self.handler.clone())
.map_err(Error::CreateSlaveListener)?;
let slave_handler = self.accept(&mut slave_listener)?;
self.start_daemon(slave_handler)
}
fn accept(
&self,
slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>,
) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>> {
loop {
match slave_listener.accept() {
Err(e) => return Err(Error::CreateSlaveListener(e)),
Ok(Some(v)) => return Ok(v),
Ok(None) => continue,
}
}
}
/// Wait for the thread handling the vhost-user socket connection to terminate.
pub fn wait(&mut self) -> Result<()> {
if let Some(handle) = self.main_thread.take() {
match handle.join().map_err(Error::WaitDaemon)? {
Ok(()) => Ok(()),
Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
Err(e) => Err(e),
}
} else {
Ok(())
}
}
/// Retrieve the vring epoll handler.
///
/// This is necessary to perform further actions like registering and unregistering some extra
/// event file descriptors.
pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> {
// Do not expect poisoned lock.
self.handler.lock().unwrap().get_epoll_handlers()
}
}
#[cfg(test)]
mod tests {
use super::backend::tests::MockVhostBackend;
use super::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::sync::Barrier;
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
#[test]
fn test_new_daemon() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let handlers = daemon.get_epoll_handlers();
assert_eq!(handlers.len(), 2);
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let mut path = tmpdir.path().to_path_buf();
path.push("socket");
let barrier2 = barrier.clone();
let path1 = path.clone();
let thread = thread::spawn(move || {
barrier2.wait();
let socket = UnixStream::connect(&path1).unwrap();
barrier2.wait();
drop(socket)
});
let listener = Listener::new(&path, false).unwrap();
barrier.wait();
daemon.start(listener).unwrap();
barrier.wait();
// Above process generates a `HandleRequest(PartialMessage)` error.
daemon.wait().unwrap_err();
daemon.wait().unwrap();
thread.join().unwrap();
}
#[test]
fn test_new_daemon_client() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let handlers = daemon.get_epoll_handlers();
assert_eq!(handlers.len(), 2);
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let mut path = tmpdir.path().to_path_buf();
path.push("socket");
let barrier2 = barrier.clone();
let path1 = path.clone();
let thread = thread::spawn(move || {
let listener = UnixListener::bind(&path1).unwrap();
barrier2.wait();
let (stream, _) = listener.accept().unwrap();
barrier2.wait();
drop(stream)
});
barrier.wait();
daemon
.start_client(path.as_path().to_str().unwrap())
.unwrap();
barrier.wait();
// Above process generates a `HandleRequest(PartialMessage)` error.
daemon.wait().unwrap_err();
daemon.wait().unwrap();
thread.join().unwrap();
}
}

View file

@ -0,0 +1,547 @@
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2021 Alibaba Cloud Computing. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//! Struct to maintain state information and manipulate vhost-user queues.
use std::fs::File;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::result::Result;
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use virtio_queue::{Error as VirtQueError, Queue, QueueT};
use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
use vmm_sys_util::eventfd::EventFd;
/// Trait for objects returned by `VringT::get_ref()`.
pub trait VringStateGuard<'a, M: GuestAddressSpace> {
/// Type for guard returned by `VringT::get_ref()`.
type G: Deref<Target = VringState<M>>;
}
/// Trait for objects returned by `VringT::get_mut()`.
pub trait VringStateMutGuard<'a, M: GuestAddressSpace> {
/// Type for guard returned by `VringT::get_mut()`.
type G: DerefMut<Target = VringState<M>>;
}
pub trait VringT<M: GuestAddressSpace>:
for<'a> VringStateGuard<'a, M> + for<'a> VringStateMutGuard<'a, M>
{
/// Create a new instance of Vring.
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError>
where
Self: Sized;
/// Get an immutable reference to the kick event fd.
fn get_ref(&self) -> <Self as VringStateGuard<M>>::G;
/// Get a mutable reference to the kick event fd.
fn get_mut(&self) -> <Self as VringStateMutGuard<M>>::G;
/// Add an used descriptor into the used queue.
fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError>;
/// Notify the vhost-user master that used descriptors have been put into the used queue.
fn signal_used_queue(&self) -> io::Result<()>;
/// Enable event notification for queue.
fn enable_notification(&self) -> Result<bool, VirtQueError>;
/// Disable event notification for queue.
fn disable_notification(&self) -> Result<(), VirtQueError>;
/// Check whether a notification to the guest is needed.
fn needs_notification(&self) -> Result<bool, VirtQueError>;
/// Set vring enabled state.
fn set_enabled(&self, enabled: bool);
/// Set queue addresses for descriptor table, available ring and used ring.
fn set_queue_info(
&self,
desc_table: u64,
avail_ring: u64,
used_ring: u64,
) -> Result<(), VirtQueError>;
/// Get queue next avail head.
fn queue_next_avail(&self) -> u16;
/// Set queue next avail head.
fn set_queue_next_avail(&self, base: u16);
/// Set configured queue size.
fn set_queue_size(&self, num: u16);
/// Enable/disable queue event index feature.
fn set_queue_event_idx(&self, enabled: bool);
/// Set queue enabled state.
fn set_queue_ready(&self, ready: bool);
/// Set `EventFd` for kick.
fn set_kick(&self, file: Option<File>);
/// Read event from the kick `EventFd`.
fn read_kick(&self) -> io::Result<bool>;
/// Set `EventFd` for call.
fn set_call(&self, file: Option<File>);
/// Set `EventFd` for err.
fn set_err(&self, file: Option<File>);
}
/// Struct to maintain raw state information for a vhost-user queue.
///
/// This struct maintains all information of a virito queue, and could be used as an `VringT`
/// object for single-threaded context.
pub struct VringState<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
queue: Queue,
kick: Option<EventFd>,
call: Option<EventFd>,
err: Option<EventFd>,
enabled: bool,
mem: M,
}
impl<M: GuestAddressSpace> VringState<M> {
/// Create a new instance of Vring.
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError> {
Ok(VringState {
queue: Queue::new(max_queue_size)?,
kick: None,
call: None,
err: None,
enabled: false,
mem,
})
}
/// Get an immutable reference to the underlying raw `Queue` object.
pub fn get_queue(&self) -> &Queue {
&self.queue
}
/// Get a mutable reference to the underlying raw `Queue` object.
pub fn get_queue_mut(&mut self) -> &mut Queue {
&mut self.queue
}
/// Add an used descriptor into the used queue.
pub fn add_used(&mut self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
self.queue
.add_used(self.mem.memory().deref(), desc_index, len)
}
/// Notify the vhost-user master that used descriptors have been put into the used queue.
pub fn signal_used_queue(&self) -> io::Result<()> {
if let Some(call) = self.call.as_ref() {
call.write(1)
} else {
Ok(())
}
}
/// Enable event notification for queue.
pub fn enable_notification(&mut self) -> Result<bool, VirtQueError> {
self.queue.enable_notification(self.mem.memory().deref())
}
/// Disable event notification for queue.
pub fn disable_notification(&mut self) -> Result<(), VirtQueError> {
self.queue.disable_notification(self.mem.memory().deref())
}
/// Check whether a notification to the guest is needed.
pub fn needs_notification(&mut self) -> Result<bool, VirtQueError> {
self.queue.needs_notification(self.mem.memory().deref())
}
/// Set vring enabled state.
pub fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
}
/// Set queue addresses for descriptor table, available ring and used ring.
pub fn set_queue_info(
&mut self,
desc_table: u64,
avail_ring: u64,
used_ring: u64,
) -> Result<(), VirtQueError> {
self.queue
.try_set_desc_table_address(GuestAddress(desc_table))?;
self.queue
.try_set_avail_ring_address(GuestAddress(avail_ring))?;
self.queue
.try_set_used_ring_address(GuestAddress(used_ring))
}
/// Get queue next avail head.
fn queue_next_avail(&self) -> u16 {
self.queue.next_avail()
}
/// Set queue next avail head.
fn set_queue_next_avail(&mut self, base: u16) {
self.queue.set_next_avail(base);
}
/// Set configured queue size.
fn set_queue_size(&mut self, num: u16) {
self.queue.set_size(num);
}
/// Enable/disable queue event index feature.
fn set_queue_event_idx(&mut self, enabled: bool) {
self.queue.set_event_idx(enabled);
}
/// Set queue enabled state.
fn set_queue_ready(&mut self, ready: bool) {
self.queue.set_ready(ready);
}
/// Get the `EventFd` for kick.
pub fn get_kick(&self) -> &Option<EventFd> {
&self.kick
}
/// Set `EventFd` for kick.
fn set_kick(&mut self, file: Option<File>) {
// SAFETY:
// EventFd requires that it has sole ownership of its fd. So does File, so this is safe.
// Ideally, we'd have a generic way to refer to a uniquely-owned fd, such as that proposed
// by Rust RFC #3128.
self.kick = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
}
/// Read event from the kick `EventFd`.
fn read_kick(&self) -> io::Result<bool> {
if let Some(kick) = &self.kick {
kick.read()?;
}
Ok(self.enabled)
}
/// Set `EventFd` for call.
fn set_call(&mut self, file: Option<File>) {
// SAFETY: see comment in set_kick()
self.call = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
}
/// Get the `EventFd` for call.
pub fn get_call(&self) -> &Option<EventFd> {
&self.call
}
/// Set `EventFd` for err.
fn set_err(&mut self, file: Option<File>) {
// SAFETY: see comment in set_kick()
self.err = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
}
}
/// A `VringState` object protected by Mutex for multi-threading context.
#[derive(Clone)]
pub struct VringMutex<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
state: Arc<Mutex<VringState<M>>>,
}
impl<M: GuestAddressSpace> VringMutex<M> {
/// Get a mutable guard to the underlying raw `VringState` object.
fn lock(&self) -> MutexGuard<VringState<M>> {
self.state.lock().unwrap()
}
}
impl<'a, M: 'a + GuestAddressSpace> VringStateGuard<'a, M> for VringMutex<M> {
type G = MutexGuard<'a, VringState<M>>;
}
impl<'a, M: 'a + GuestAddressSpace> VringStateMutGuard<'a, M> for VringMutex<M> {
type G = MutexGuard<'a, VringState<M>>;
}
impl<M: 'static + GuestAddressSpace> VringT<M> for VringMutex<M> {
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError> {
Ok(VringMutex {
state: Arc::new(Mutex::new(VringState::new(mem, max_queue_size)?)),
})
}
fn get_ref(&self) -> <Self as VringStateGuard<M>>::G {
self.state.lock().unwrap()
}
fn get_mut(&self) -> <Self as VringStateMutGuard<M>>::G {
self.lock()
}
fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
self.lock().add_used(desc_index, len)
}
fn signal_used_queue(&self) -> io::Result<()> {
self.get_ref().signal_used_queue()
}
fn enable_notification(&self) -> Result<bool, VirtQueError> {
self.lock().enable_notification()
}
fn disable_notification(&self) -> Result<(), VirtQueError> {
self.lock().disable_notification()
}
fn needs_notification(&self) -> Result<bool, VirtQueError> {
self.lock().needs_notification()
}
fn set_enabled(&self, enabled: bool) {
self.lock().set_enabled(enabled)
}
fn set_queue_info(
&self,
desc_table: u64,
avail_ring: u64,
used_ring: u64,
) -> Result<(), VirtQueError> {
self.lock()
.set_queue_info(desc_table, avail_ring, used_ring)
}
fn queue_next_avail(&self) -> u16 {
self.get_ref().queue_next_avail()
}
fn set_queue_next_avail(&self, base: u16) {
self.lock().set_queue_next_avail(base)
}
fn set_queue_size(&self, num: u16) {
self.lock().set_queue_size(num);
}
fn set_queue_event_idx(&self, enabled: bool) {
self.lock().set_queue_event_idx(enabled);
}
fn set_queue_ready(&self, ready: bool) {
self.lock().set_queue_ready(ready);
}
fn set_kick(&self, file: Option<File>) {
self.lock().set_kick(file);
}
fn read_kick(&self) -> io::Result<bool> {
self.get_ref().read_kick()
}
fn set_call(&self, file: Option<File>) {
self.lock().set_call(file)
}
fn set_err(&self, file: Option<File>) {
self.lock().set_err(file)
}
}
/// A `VringState` object protected by RwLock for multi-threading context.
#[derive(Clone)]
pub struct VringRwLock<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
state: Arc<RwLock<VringState<M>>>,
}
impl<M: GuestAddressSpace> VringRwLock<M> {
/// Get a mutable guard to the underlying raw `VringState` object.
fn write_lock(&self) -> RwLockWriteGuard<VringState<M>> {
self.state.write().unwrap()
}
}
impl<'a, M: 'a + GuestAddressSpace> VringStateGuard<'a, M> for VringRwLock<M> {
type G = RwLockReadGuard<'a, VringState<M>>;
}
impl<'a, M: 'a + GuestAddressSpace> VringStateMutGuard<'a, M> for VringRwLock<M> {
type G = RwLockWriteGuard<'a, VringState<M>>;
}
impl<M: 'static + GuestAddressSpace> VringT<M> for VringRwLock<M> {
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError> {
Ok(VringRwLock {
state: Arc::new(RwLock::new(VringState::new(mem, max_queue_size)?)),
})
}
fn get_ref(&self) -> <Self as VringStateGuard<M>>::G {
self.state.read().unwrap()
}
fn get_mut(&self) -> <Self as VringStateMutGuard<M>>::G {
self.write_lock()
}
fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
self.write_lock().add_used(desc_index, len)
}
fn signal_used_queue(&self) -> io::Result<()> {
self.get_ref().signal_used_queue()
}
fn enable_notification(&self) -> Result<bool, VirtQueError> {
self.write_lock().enable_notification()
}
fn disable_notification(&self) -> Result<(), VirtQueError> {
self.write_lock().disable_notification()
}
fn needs_notification(&self) -> Result<bool, VirtQueError> {
self.write_lock().needs_notification()
}
fn set_enabled(&self, enabled: bool) {
self.write_lock().set_enabled(enabled)
}
fn set_queue_info(
&self,
desc_table: u64,
avail_ring: u64,
used_ring: u64,
) -> Result<(), VirtQueError> {
self.write_lock()
.set_queue_info(desc_table, avail_ring, used_ring)
}
fn queue_next_avail(&self) -> u16 {
self.get_ref().queue_next_avail()
}
fn set_queue_next_avail(&self, base: u16) {
self.write_lock().set_queue_next_avail(base)
}
fn set_queue_size(&self, num: u16) {
self.write_lock().set_queue_size(num);
}
fn set_queue_event_idx(&self, enabled: bool) {
self.write_lock().set_queue_event_idx(enabled);
}
fn set_queue_ready(&self, ready: bool) {
self.write_lock().set_queue_ready(ready);
}
fn set_kick(&self, file: Option<File>) {
self.write_lock().set_kick(file);
}
fn read_kick(&self) -> io::Result<bool> {
self.get_ref().read_kick()
}
fn set_call(&self, file: Option<File>) {
self.write_lock().set_call(file)
}
fn set_err(&self, file: Option<File>) {
self.write_lock().set_err(file)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::io::AsRawFd;
use vm_memory::bitmap::AtomicBitmap;
use vmm_sys_util::eventfd::EventFd;
#[test]
fn test_new_vring() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<AtomicBitmap>::from_ranges(&[(GuestAddress(0x100000), 0x10000)])
.unwrap(),
);
let vring = VringMutex::new(mem, 0x1000).unwrap();
assert!(vring.get_ref().get_kick().is_none());
assert!(!vring.get_mut().enabled);
assert!(!vring.lock().queue.ready());
assert!(!vring.lock().queue.event_idx_enabled());
vring.set_enabled(true);
assert!(vring.get_ref().enabled);
vring.set_queue_info(0x100100, 0x100200, 0x100300).unwrap();
assert_eq!(vring.lock().get_queue().desc_table(), 0x100100);
assert_eq!(vring.lock().get_queue().avail_ring(), 0x100200);
assert_eq!(vring.lock().get_queue().used_ring(), 0x100300);
assert_eq!(vring.queue_next_avail(), 0);
vring.set_queue_next_avail(0x20);
assert_eq!(vring.queue_next_avail(), 0x20);
vring.set_queue_size(0x200);
assert_eq!(vring.lock().queue.size(), 0x200);
vring.set_queue_event_idx(true);
assert!(vring.lock().queue.event_idx_enabled());
vring.set_queue_ready(true);
assert!(vring.lock().queue.ready());
}
#[test]
fn test_vring_set_fd() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let vring = VringMutex::new(mem, 0x1000).unwrap();
vring.set_enabled(true);
assert!(vring.get_ref().enabled);
let eventfd = EventFd::new(0).unwrap();
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
assert!(vring.get_mut().kick.is_none());
assert!(vring.read_kick().unwrap());
vring.set_kick(Some(file));
eventfd.write(1).unwrap();
assert!(vring.read_kick().unwrap());
assert!(vring.get_ref().kick.is_some());
vring.set_kick(None);
assert!(vring.get_ref().kick.is_none());
std::mem::forget(eventfd);
let eventfd = EventFd::new(0).unwrap();
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
assert!(vring.get_ref().call.is_none());
vring.set_call(Some(file));
assert!(vring.get_ref().call.is_some());
vring.set_call(None);
assert!(vring.get_ref().call.is_none());
std::mem::forget(eventfd);
let eventfd = EventFd::new(0).unwrap();
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
assert!(vring.get_ref().err.is_none());
vring.set_err(Some(file));
assert!(vring.get_ref().err.is_some());
vring.set_err(None);
assert!(vring.get_ref().err.is_none());
std::mem::forget(eventfd);
}
}

View file

@ -0,0 +1,297 @@
use std::ffi::CString;
use std::fs::File;
use std::io::Result;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use vhost::vhost_user::message::{
VhostUserConfigFlags, VhostUserHeaderFlag, VhostUserInflight, VhostUserProtocolFeatures,
};
use vhost::vhost_user::{Listener, Master, SlaveFsCacheReq, VhostUserMaster};
use vhost::{VhostBackend, VhostUserMemoryRegionInfo, VringConfigData};
use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock};
use vm_memory::{
FileOffset, GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap,
};
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;
struct MockVhostBackend {
events: u64,
event_idx: bool,
acked_features: u64,
}
impl MockVhostBackend {
fn new() -> Self {
MockVhostBackend {
events: 0,
event_idx: false,
acked_features: 0,
}
}
}
impl VhostUserBackendMut<VringRwLock, ()> for MockVhostBackend {
fn num_queues(&self) -> usize {
2
}
fn max_queue_size(&self) -> usize {
256
}
fn features(&self) -> u64 {
0xffff_ffff_ffff_ffff
}
fn acked_features(&mut self, features: u64) {
self.acked_features = features;
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
VhostUserProtocolFeatures::all()
}
fn set_event_idx(&mut self, enabled: bool) {
self.event_idx = enabled;
}
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
assert_eq!(offset, 0x200);
assert_eq!(size, 8);
vec![0xa5u8; 8]
}
fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<()> {
assert_eq!(offset, 0x200);
assert_eq!(buf, &[0xa5u8; 8]);
Ok(())
}
fn update_memory(&mut self, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>) -> Result<()> {
let mem = atomic_mem.memory();
let region = mem.find_region(GuestAddress(0x100000)).unwrap();
assert_eq!(region.size(), 0x100000);
Ok(())
}
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
fn queues_per_thread(&self) -> Vec<u64> {
vec![1, 1]
}
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
let event_fd = EventFd::new(0).unwrap();
Some(event_fd)
}
fn handle_event(
&mut self,
_device_event: u16,
_evset: EventSet,
_vrings: &[VringRwLock],
_thread_id: usize,
) -> Result<bool> {
self.events += 1;
Ok(false)
}
}
fn setup_master(path: &Path, barrier: Arc<Barrier>) -> Master {
barrier.wait();
let mut master = Master::connect(path, 1).unwrap();
master.set_hdr_flags(VhostUserHeaderFlag::NEED_REPLY);
// Wait before issue service requests.
barrier.wait();
let features = master.get_features().unwrap();
let proto = master.get_protocol_features().unwrap();
master.set_features(features).unwrap();
master.set_protocol_features(proto).unwrap();
assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK));
master
}
fn vhost_user_client(path: &Path, barrier: Arc<Barrier>) {
barrier.wait();
let mut master = Master::connect(path, 1).unwrap();
master.set_hdr_flags(VhostUserHeaderFlag::NEED_REPLY);
// Wait before issue service requests.
barrier.wait();
let features = master.get_features().unwrap();
let proto = master.get_protocol_features().unwrap();
master.set_features(features).unwrap();
master.set_protocol_features(proto).unwrap();
assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK));
let queue_num = master.get_queue_num().unwrap();
assert_eq!(queue_num, 2);
master.set_owner().unwrap();
//master.set_owner().unwrap_err();
master.reset_owner().unwrap();
master.reset_owner().unwrap();
master.set_owner().unwrap();
master.set_features(features).unwrap();
master.set_protocol_features(proto).unwrap();
assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK));
let memfd = nix::sys::memfd::memfd_create(
&CString::new("test").unwrap(),
nix::sys::memfd::MemFdCreateFlag::empty(),
)
.unwrap();
let file = unsafe { File::from_raw_fd(memfd) };
file.set_len(0x100000).unwrap();
let file_offset = FileOffset::new(file, 0);
let mem = GuestMemoryMmap::<()>::from_ranges_with_files(&[(
GuestAddress(0x100000),
0x100000,
Some(file_offset),
)])
.unwrap();
let addr = mem.get_host_address(GuestAddress(0x100000)).unwrap() as u64;
let reg = mem.find_region(GuestAddress(0x100000)).unwrap();
let fd = reg.file_offset().unwrap();
let regions = [VhostUserMemoryRegionInfo {
guest_phys_addr: 0x100000,
memory_size: 0x100000,
userspace_addr: addr,
mmap_offset: 0,
mmap_handle: fd.file().as_raw_fd(),
}];
master.set_mem_table(&regions).unwrap();
master.set_vring_num(0, 256).unwrap();
let config = VringConfigData {
queue_max_size: 256,
queue_size: 256,
flags: 0,
desc_table_addr: addr,
used_ring_addr: addr + 0x10000,
avail_ring_addr: addr + 0x20000,
log_addr: None,
};
master.set_vring_addr(0, &config).unwrap();
let eventfd = EventFd::new(0).unwrap();
master.set_vring_kick(0, &eventfd).unwrap();
master.set_vring_call(0, &eventfd).unwrap();
master.set_vring_err(0, &eventfd).unwrap();
master.set_vring_enable(0, true).unwrap();
let buf = [0u8; 8];
let (_cfg, data) = master
.get_config(0x200, 8, VhostUserConfigFlags::empty(), &buf)
.unwrap();
assert_eq!(&data, &[0xa5u8; 8]);
master
.set_config(0x200, VhostUserConfigFlags::empty(), &data)
.unwrap();
let (tx, _rx) = UnixStream::pair().unwrap();
master.set_slave_request_fd(&tx).unwrap();
let state = master.get_vring_base(0).unwrap();
master.set_vring_base(0, state as u16).unwrap();
assert_eq!(master.get_max_mem_slots().unwrap(), 32);
let region = VhostUserMemoryRegionInfo {
guest_phys_addr: 0x800000,
memory_size: 0x100000,
userspace_addr: addr,
mmap_offset: 0,
mmap_handle: fd.file().as_raw_fd(),
};
master.add_mem_region(&region).unwrap();
master.remove_mem_region(&region).unwrap();
}
fn vhost_user_server(cb: fn(&Path, Arc<Barrier>)) {
let mem = GuestMemoryAtomic::new(GuestMemoryMmap::<()>::new());
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let mut path = tmpdir.path().to_path_buf();
path.push("socket");
let barrier2 = barrier.clone();
let path1 = path.clone();
let thread = thread::spawn(move || cb(&path1, barrier2));
let listener = Listener::new(&path, false).unwrap();
barrier.wait();
daemon.start(listener).unwrap();
barrier.wait();
// handle service requests from clients.
thread.join().unwrap();
}
#[test]
fn test_vhost_user_server() {
vhost_user_server(vhost_user_client);
}
fn vhost_user_enable(path: &Path, barrier: Arc<Barrier>) {
let master = setup_master(path, barrier);
master.set_owner().unwrap();
master.set_owner().unwrap_err();
}
#[test]
fn test_vhost_user_enable() {
vhost_user_server(vhost_user_enable);
}
fn vhost_user_set_inflight(path: &Path, barrier: Arc<Barrier>) {
let mut master = setup_master(path, barrier);
let eventfd = EventFd::new(0).unwrap();
// No implementation for inflight_fd yet.
let inflight = VhostUserInflight {
mmap_size: 0x100000,
mmap_offset: 0,
num_queues: 1,
queue_size: 256,
};
master
.set_inflight_fd(&inflight, eventfd.as_raw_fd())
.unwrap_err();
}
#[test]
fn test_vhost_user_set_inflight() {
vhost_user_server(vhost_user_set_inflight);
}
fn vhost_user_get_inflight(path: &Path, barrier: Arc<Barrier>) {
let mut master = setup_master(path, barrier);
// No implementation for inflight_fd yet.
let inflight = VhostUserInflight {
mmap_size: 0x100000,
mmap_offset: 0,
num_queues: 1,
queue_size: 256,
};
assert!(master.get_inflight_fd(&inflight).is_err());
}
#[test]
fn test_vhost_user_get_inflight() {
vhost_user_server(vhost_user_get_inflight);
}

36
crates/vhost/Cargo.toml Normal file
View file

@ -0,0 +1,36 @@
[package]
name = "vhost"
version = "0.5.0"
keywords = ["vhost", "vhost-user", "virtio", "vdpa"]
description = "a pure rust library for vdpa, vhost and vhost-user"
authors = ["Liu Jiang <gerry@linux.alibaba.com>"]
repository = "https://github.com/rust-vmm/vhost"
documentation = "https://docs.rs/vhost"
readme = "README.md"
license = "Apache-2.0 OR BSD-3-Clause"
edition = "2018"
[package.metadata.docs.rs]
all-features = true
[features]
default = []
vhost-vsock = []
vhost-kern = []
vhost-vdpa = ["vhost-kern"]
vhost-net = ["vhost-kern"]
vhost-user = []
vhost-user-master = ["vhost-user"]
vhost-user-slave = ["vhost-user"]
[dependencies]
bitflags = "1.0"
libc = "0.2.39"
vmm-sys-util = "0.10.0"
vm-memory = "0.9.0"
[dev-dependencies]
tempfile = "3.2.0"
vm-memory = { version = "0.9.0", features=["backend-mmap"] }
serial_test = "0.5"

32
crates/vhost/README.md Normal file
View file

@ -0,0 +1,32 @@
# vHost
A pure rust library for vDPA, vhost and vhost-user.
The `vhost` crate aims to help implementing dataplane for virtio backend drivers. It supports three different types of dataplane drivers:
- vhost: the dataplane is implemented by linux kernel
- vhost-user: the dataplane is implemented by dedicated vhost-user servers
- vDPA(vhost DataPath Accelerator): the dataplane is implemented by hardwares
The main relationship among Traits and Structs exported by the `vhost` crate is as below:
![vhost Architecture](/docs/vhost_architecture.png)
## Kernel-based vHost Backend Drivers
The vhost drivers in Linux provide in-kernel virtio device emulation. Normally
the hypervisor userspace process emulates I/O accesses from the guest.
Vhost puts virtio emulation code into the kernel, taking hypervisor userspace
out of the picture. This allows device emulation code to directly call into
kernel subsystems instead of performing system calls from userspace.
The hypervisor relies on ioctl based interfaces to control those in-kernel
vhost drivers, such as vhost-net, vhost-scsi and vhost-vsock etc.
## vHost-user Backend Drivers
The [vhost-user protocol](https://qemu.readthedocs.io/en/latest/interop/vhost-user.html#communication) aims to implement vhost backend drivers in
userspace, which complements the ioctl interface used to control the vhost
implementation in the Linux kernel. It implements the control plane needed
to establish virtqueue sharing with a user space process on the same host.
It uses communication over a Unix domain socket to share file descriptors in
the ancillary data of the message.
The protocol defines two sides of the communication, master and slave.
Master is the application that shares its virtqueues, slave is the consumer
of the virtqueues. Master and slave can be either a client (i.e. connecting)
or server (listening) in the socket communication.

View file

Before

Width:  |  Height:  |  Size: 143 KiB

After

Width:  |  Height:  |  Size: 143 KiB

Before After
Before After