Merge remote-tracking branch 'vhost-user-backend/main' into workspace
This merges vhost-user-backend into this workspace along with its git history. Signed-off-by: Viresh Kumar <viresh.kumar@linaro.org>
This commit is contained in:
commit
40006d0b39
12 changed files with 2755 additions and 1 deletions
8
.github/dependabot.yml
vendored
8
.github/dependabot.yml
vendored
|
|
@ -1,5 +1,13 @@
|
||||||
version: 2
|
version: 2
|
||||||
updates:
|
updates:
|
||||||
|
- package-ecosystem: cargo
|
||||||
|
directory: "/"
|
||||||
|
schedule:
|
||||||
|
interval: weekly
|
||||||
|
open-pull-requests-limit: 3
|
||||||
|
allow:
|
||||||
|
- dependency-type: direct
|
||||||
|
- dependency-type: indirect
|
||||||
- package-ecosystem: gitsubmodule
|
- package-ecosystem: gitsubmodule
|
||||||
directory: "/"
|
directory: "/"
|
||||||
schedule:
|
schedule:
|
||||||
|
|
|
||||||
|
|
@ -2,4 +2,5 @@
|
||||||
|
|
||||||
members = [
|
members = [
|
||||||
"crates/vhost",
|
"crates/vhost",
|
||||||
|
"crates/vhost-user-backend",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
{
|
{
|
||||||
"coverage_score": 80.5,
|
"coverage_score": 83.3,
|
||||||
"exclude_path": "vhost/src/vhost_kern/",
|
"exclude_path": "vhost/src/vhost_kern/",
|
||||||
"crate_features": "vhost/vhost-user-master,vhost/vhost-user-slave"
|
"crate_features": "vhost/vhost-user-master,vhost/vhost-user-slave"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
66
crates/vhost-user-backend/CHANGELOG.md
Normal file
66
crates/vhost-user-backend/CHANGELOG.md
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
# Changelog
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
### Deprecated
|
||||||
|
|
||||||
|
## v0.7.0
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Started using caret dependencies
|
||||||
|
- Updated dependency nix 0.24 -> 0.25
|
||||||
|
- Updated depepdency log 0.4.6 -> 0.4.17
|
||||||
|
- Updated dependency vhost 0.4 -> 0.5
|
||||||
|
- Updated dependency virtio-queue 0.5.0 -> 0.6
|
||||||
|
- Updated dependency vm-memory 0.7 -> 0.9
|
||||||
|
|
||||||
|
## v0.6.0
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Moved to rust-vmm/virtio-queue v0.5.0
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Fixed vring initialization logic
|
||||||
|
|
||||||
|
## v0.5.1
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- Moved to rust-vmm/vmm-sys-util 0.10.0
|
||||||
|
|
||||||
|
## v0.5.0
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Moved to rust-vmm/virtio-queue v0.4.0
|
||||||
|
|
||||||
|
## v0.4.0
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Moved to rust-vmm/virtio-queue v0.3.0
|
||||||
|
- Relaxed rust-vmm/vm-memory dependency to require ">=0.7"
|
||||||
|
|
||||||
|
## v0.3.0
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Moved to rust-vmm/vhost v0.4.0
|
||||||
|
|
||||||
|
## v0.2.0
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- Ability to run the daemon as a client
|
||||||
|
- VringEpollHandler implements AsRawFd
|
||||||
|
|
||||||
|
## v0.1.0
|
||||||
|
|
||||||
|
First release
|
||||||
23
crates/vhost-user-backend/Cargo.toml
Normal file
23
crates/vhost-user-backend/Cargo.toml
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
[package]
|
||||||
|
name = "vhost-user-backend"
|
||||||
|
version = "0.7.0"
|
||||||
|
authors = ["The Cloud Hypervisor Authors"]
|
||||||
|
keywords = ["vhost-user", "virtio"]
|
||||||
|
description = "A framework to build vhost-user backend service daemon"
|
||||||
|
edition = "2018"
|
||||||
|
license = "Apache-2.0"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
libc = "0.2.39"
|
||||||
|
log = "0.4.17"
|
||||||
|
vhost = { version = "0.5", features = ["vhost-user-slave"] }
|
||||||
|
virtio-bindings = "0.1"
|
||||||
|
virtio-queue = "0.6"
|
||||||
|
vm-memory = { version = "0.9", features = ["backend-mmap", "backend-atomic"] }
|
||||||
|
vmm-sys-util = "0.10"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
nix = "0.25"
|
||||||
|
vhost = { version = "0.5", features = ["vhost-user-master", "vhost-user-slave"] }
|
||||||
|
vm-memory = { version = "0.9", features = ["backend-mmap", "backend-atomic", "backend-bitmap"] }
|
||||||
|
tempfile = "3.2.0"
|
||||||
105
crates/vhost-user-backend/README.md
Normal file
105
crates/vhost-user-backend/README.md
Normal file
|
|
@ -0,0 +1,105 @@
|
||||||
|
# vhost-user-backend
|
||||||
|
|
||||||
|
## Design
|
||||||
|
|
||||||
|
The `vhost-user-backend` crate provides a framework to implement `vhost-user` backend services,
|
||||||
|
which includes following external public APIs:
|
||||||
|
- A daemon control object (`VhostUserDaemon`) to start and stop the service daemon.
|
||||||
|
- A vhost-user backend trait (`VhostUserBackendMut`) to handle vhost-user control messages and virtio
|
||||||
|
messages.
|
||||||
|
- A vring access trait (`VringT`) to access virtio queues, and three implementations of the trait:
|
||||||
|
`VringState`, `VringMutex` and `VringRwLock`.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
The `vhost-user-backend` crate provides a framework to implement vhost-user backend services. The main interface provided by `vhost-user-backend` library is the `struct VhostUserDaemon`:
|
||||||
|
```rust
|
||||||
|
pub struct VhostUserDaemon<S, V, B = ()>
|
||||||
|
where
|
||||||
|
S: VhostUserBackend<V, B>,
|
||||||
|
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
pub fn new(name: String, backend: S, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>) -> Result<Self>;
|
||||||
|
pub fn start(&mut self, listener: Listener) -> Result<()>;
|
||||||
|
pub fn wait(&mut self) -> Result<()>;
|
||||||
|
pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>>;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Create a `VhostUserDaemon` Instance
|
||||||
|
The `VhostUserDaemon::new()` creates an instance of `VhostUserDaemon` object. The client needs to
|
||||||
|
pass in an `VhostUserBackend` object, which will be used to configure the `VhostUserDaemon`
|
||||||
|
instance, handle control messages from the vhost-user master and handle virtio requests from
|
||||||
|
virtio queues. A group of working threads will be created to handle virtio requests from configured
|
||||||
|
virtio queues.
|
||||||
|
|
||||||
|
### Start the `VhostUserDaemon`
|
||||||
|
The `VhostUserDaemon::start()` method waits for an incoming connection from the vhost-user masters
|
||||||
|
on the `listener`. Once a connection is ready, a main thread will be created to handle vhost-user
|
||||||
|
messages from the vhost-user master.
|
||||||
|
|
||||||
|
### Stop the `VhostUserDaemon`
|
||||||
|
The `VhostUserDaemon::stop()` method waits for the main thread to exit. An exit event must be sent
|
||||||
|
to the main thread by writing to the `exit_event` EventFd before waiting for it to exit.
|
||||||
|
|
||||||
|
### Threading Model
|
||||||
|
The main thread and virtio queue working threads will concurrently access the underlying virtio
|
||||||
|
queues, so all virtio queue in multi-threading model. But the main thread only accesses virtio
|
||||||
|
queues for configuration, so client could adopt locking policies to optimize for the virtio queue
|
||||||
|
working threads.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
Example code to handle virtio messages from a virtio queue:
|
||||||
|
```rust
|
||||||
|
impl VhostUserBackendMut for VhostUserService {
|
||||||
|
fn process_queue(&mut self, vring: &VringMutex) -> Result<bool> {
|
||||||
|
let mut used_any = false;
|
||||||
|
let mem = match &self.mem {
|
||||||
|
Some(m) => m.memory(),
|
||||||
|
None => return Err(Error::NoMemoryConfigured),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut vring_state = vring.get_mut();
|
||||||
|
|
||||||
|
while let Some(avail_desc) = vring_state
|
||||||
|
.get_queue_mut()
|
||||||
|
.iter()
|
||||||
|
.map_err(|_| Error::IterateQueue)?
|
||||||
|
.next()
|
||||||
|
{
|
||||||
|
// Process the request...
|
||||||
|
|
||||||
|
if self.event_idx {
|
||||||
|
if vring_state.add_used(head_index, 0).is_err() {
|
||||||
|
warn!("Couldn't return used descriptors to the ring");
|
||||||
|
}
|
||||||
|
|
||||||
|
match vring_state.needs_notification() {
|
||||||
|
Err(_) => {
|
||||||
|
warn!("Couldn't check if queue needs to be notified");
|
||||||
|
vring_state.signal_used_queue().unwrap();
|
||||||
|
}
|
||||||
|
Ok(needs_notification) => {
|
||||||
|
if needs_notification {
|
||||||
|
vring_state.signal_used_queue().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if vring_state.add_used(head_index, 0).is_err() {
|
||||||
|
warn!("Couldn't return used descriptors to the ring");
|
||||||
|
}
|
||||||
|
vring_state.signal_used_queue().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(used_any)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
This project is licensed under
|
||||||
|
|
||||||
|
- [Apache License](http://www.apache.org/licenses/LICENSE-2.0), Version 2.0
|
||||||
555
crates/vhost-user-backend/src/backend.rs
Normal file
555
crates/vhost-user-backend/src/backend.rs
Normal file
|
|
@ -0,0 +1,555 @@
|
||||||
|
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||||
|
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
//! Traits for vhost user backend servers to implement virtio data plain services.
|
||||||
|
//!
|
||||||
|
//! Define two traits for vhost user backend servers to implement virtio data plane services.
|
||||||
|
//! The only difference between the two traits is mutability. The [VhostUserBackend] trait is
|
||||||
|
//! designed with interior mutability, so the implementor may choose the suitable way to protect
|
||||||
|
//! itself from concurrent accesses. The [VhostUserBackendMut] is designed without interior
|
||||||
|
//! mutability, and an implementation of:
|
||||||
|
//! ```ignore
|
||||||
|
//! impl<T: VhostUserBackendMut> VhostUserBackend for RwLock<T> { }
|
||||||
|
//! ```
|
||||||
|
//! is provided for convenience.
|
||||||
|
//!
|
||||||
|
//! [VhostUserBackend]: trait.VhostUserBackend.html
|
||||||
|
//! [VhostUserBackendMut]: trait.VhostUserBackendMut.html
|
||||||
|
|
||||||
|
use std::io::Result;
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
|
|
||||||
|
use vhost::vhost_user::message::VhostUserProtocolFeatures;
|
||||||
|
use vhost::vhost_user::SlaveFsCacheReq;
|
||||||
|
use vm_memory::bitmap::Bitmap;
|
||||||
|
use vmm_sys_util::epoll::EventSet;
|
||||||
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
|
use super::vring::VringT;
|
||||||
|
use super::GM;
|
||||||
|
|
||||||
|
/// Trait with interior mutability for vhost user backend servers to implement concrete services.
|
||||||
|
///
|
||||||
|
/// To support multi-threading and asynchronous IO, we enforce `Send + Sync` bound.
|
||||||
|
pub trait VhostUserBackend<V, B = ()>: Send + Sync
|
||||||
|
where
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
/// Get number of queues supported.
|
||||||
|
fn num_queues(&self) -> usize;
|
||||||
|
|
||||||
|
/// Get maximum queue size supported.
|
||||||
|
fn max_queue_size(&self) -> usize;
|
||||||
|
|
||||||
|
/// Get available virtio features.
|
||||||
|
fn features(&self) -> u64;
|
||||||
|
|
||||||
|
/// Set acknowledged virtio features.
|
||||||
|
fn acked_features(&self, _features: u64) {}
|
||||||
|
|
||||||
|
/// Get available vhost protocol features.
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures;
|
||||||
|
|
||||||
|
/// Enable or disable the virtio EVENT_IDX feature
|
||||||
|
fn set_event_idx(&self, enabled: bool);
|
||||||
|
|
||||||
|
/// Get virtio device configuration.
|
||||||
|
///
|
||||||
|
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||||
|
/// function.
|
||||||
|
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set virtio device configuration.
|
||||||
|
///
|
||||||
|
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||||
|
/// function.
|
||||||
|
fn set_config(&self, _offset: u32, _buf: &[u8]) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update guest memory regions.
|
||||||
|
fn update_memory(&self, mem: GM<B>) -> Result<()>;
|
||||||
|
|
||||||
|
/// Set handler for communicating with the master by the slave communication channel.
|
||||||
|
///
|
||||||
|
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||||
|
/// function.
|
||||||
|
///
|
||||||
|
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
|
||||||
|
fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {}
|
||||||
|
|
||||||
|
/// Get the map to map queue index to worker thread index.
|
||||||
|
///
|
||||||
|
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
|
||||||
|
/// the following two queues will be handled by worker thread 1, and the last four queues will
|
||||||
|
/// be handled by worker thread 2.
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
vec![0xffff_ffff]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Provide an optional exit EventFd for the specified worker thread.
|
||||||
|
///
|
||||||
|
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
|
||||||
|
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
|
||||||
|
/// the worker thread will exit.
|
||||||
|
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle IO events for backend registered file descriptors.
|
||||||
|
///
|
||||||
|
/// This function gets called if the backend registered some additional listeners onto specific
|
||||||
|
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
|
||||||
|
/// do with events happening on custom listeners.
|
||||||
|
fn handle_event(
|
||||||
|
&self,
|
||||||
|
device_event: u16,
|
||||||
|
evset: EventSet,
|
||||||
|
vrings: &[V],
|
||||||
|
thread_id: usize,
|
||||||
|
) -> Result<bool>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trait without interior mutability for vhost user backend servers to implement concrete services.
|
||||||
|
pub trait VhostUserBackendMut<V, B = ()>: Send + Sync
|
||||||
|
where
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
/// Get number of queues supported.
|
||||||
|
fn num_queues(&self) -> usize;
|
||||||
|
|
||||||
|
/// Get maximum queue size supported.
|
||||||
|
fn max_queue_size(&self) -> usize;
|
||||||
|
|
||||||
|
/// Get available virtio features.
|
||||||
|
fn features(&self) -> u64;
|
||||||
|
|
||||||
|
/// Set acknowledged virtio features.
|
||||||
|
fn acked_features(&mut self, _features: u64) {}
|
||||||
|
|
||||||
|
/// Get available vhost protocol features.
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures;
|
||||||
|
|
||||||
|
/// Enable or disable the virtio EVENT_IDX feature
|
||||||
|
fn set_event_idx(&mut self, enabled: bool);
|
||||||
|
|
||||||
|
/// Get virtio device configuration.
|
||||||
|
///
|
||||||
|
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||||
|
/// function.
|
||||||
|
fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set virtio device configuration.
|
||||||
|
///
|
||||||
|
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||||
|
/// function.
|
||||||
|
fn set_config(&mut self, _offset: u32, _buf: &[u8]) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update guest memory regions.
|
||||||
|
fn update_memory(&mut self, mem: GM<B>) -> Result<()>;
|
||||||
|
|
||||||
|
/// Set handler for communicating with the master by the slave communication channel.
|
||||||
|
///
|
||||||
|
/// A default implementation is provided as we cannot expect all backends to implement this
|
||||||
|
/// function.
|
||||||
|
///
|
||||||
|
/// TODO: this interface is designed only for vhost-user-fs, it should be refined.
|
||||||
|
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||||
|
|
||||||
|
/// Get the map to map queue index to worker thread index.
|
||||||
|
///
|
||||||
|
/// A return value of [2, 2, 4] means: the first two queues will be handled by worker thread 0,
|
||||||
|
/// the following two queues will be handled by worker thread 1, and the last four queues will
|
||||||
|
/// be handled by worker thread 2.
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
vec![0xffff_ffff]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Provide an optional exit EventFd for the specified worker thread.
|
||||||
|
///
|
||||||
|
/// If an (`EventFd`, `token`) pair is returned, the returned `EventFd` will be monitored for IO
|
||||||
|
/// events by using epoll with the specified `token`. When the returned EventFd is written to,
|
||||||
|
/// the worker thread will exit.
|
||||||
|
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle IO events for backend registered file descriptors.
|
||||||
|
///
|
||||||
|
/// This function gets called if the backend registered some additional listeners onto specific
|
||||||
|
/// file descriptors. The library can handle virtqueues on its own, but does not know what to
|
||||||
|
/// do with events happening on custom listeners.
|
||||||
|
fn handle_event(
|
||||||
|
&mut self,
|
||||||
|
device_event: u16,
|
||||||
|
evset: EventSet,
|
||||||
|
vrings: &[V],
|
||||||
|
thread_id: usize,
|
||||||
|
) -> Result<bool>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: VhostUserBackend<V, B>, V, B> VhostUserBackend<V, B> for Arc<T>
|
||||||
|
where
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
fn num_queues(&self) -> usize {
|
||||||
|
self.deref().num_queues()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_queue_size(&self) -> usize {
|
||||||
|
self.deref().max_queue_size()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn features(&self) -> u64 {
|
||||||
|
self.deref().features()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acked_features(&self, features: u64) {
|
||||||
|
self.deref().acked_features(features)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||||
|
self.deref().protocol_features()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_event_idx(&self, enabled: bool) {
|
||||||
|
self.deref().set_event_idx(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||||
|
self.deref().get_config(offset, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> {
|
||||||
|
self.deref().set_config(offset, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_memory(&self, mem: GM<B>) -> Result<()> {
|
||||||
|
self.deref().update_memory(mem)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||||
|
self.deref().set_slave_req_fd(vu_req)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
self.deref().queues_per_thread()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
|
||||||
|
self.deref().exit_event(thread_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(
|
||||||
|
&self,
|
||||||
|
device_event: u16,
|
||||||
|
evset: EventSet,
|
||||||
|
vrings: &[V],
|
||||||
|
thread_id: usize,
|
||||||
|
) -> Result<bool> {
|
||||||
|
self.deref()
|
||||||
|
.handle_event(device_event, evset, vrings, thread_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: VhostUserBackendMut<V, B>, V, B> VhostUserBackend<V, B> for Mutex<T>
|
||||||
|
where
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
fn num_queues(&self) -> usize {
|
||||||
|
self.lock().unwrap().num_queues()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_queue_size(&self) -> usize {
|
||||||
|
self.lock().unwrap().max_queue_size()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn features(&self) -> u64 {
|
||||||
|
self.lock().unwrap().features()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acked_features(&self, features: u64) {
|
||||||
|
self.lock().unwrap().acked_features(features)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||||
|
self.lock().unwrap().protocol_features()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_event_idx(&self, enabled: bool) {
|
||||||
|
self.lock().unwrap().set_event_idx(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||||
|
self.lock().unwrap().get_config(offset, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> {
|
||||||
|
self.lock().unwrap().set_config(offset, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_memory(&self, mem: GM<B>) -> Result<()> {
|
||||||
|
self.lock().unwrap().update_memory(mem)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||||
|
self.lock().unwrap().set_slave_req_fd(vu_req)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
self.lock().unwrap().queues_per_thread()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
|
||||||
|
self.lock().unwrap().exit_event(thread_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(
|
||||||
|
&self,
|
||||||
|
device_event: u16,
|
||||||
|
evset: EventSet,
|
||||||
|
vrings: &[V],
|
||||||
|
thread_id: usize,
|
||||||
|
) -> Result<bool> {
|
||||||
|
self.lock()
|
||||||
|
.unwrap()
|
||||||
|
.handle_event(device_event, evset, vrings, thread_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: VhostUserBackendMut<V, B>, V, B> VhostUserBackend<V, B> for RwLock<T>
|
||||||
|
where
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
fn num_queues(&self) -> usize {
|
||||||
|
self.read().unwrap().num_queues()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_queue_size(&self) -> usize {
|
||||||
|
self.read().unwrap().max_queue_size()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn features(&self) -> u64 {
|
||||||
|
self.read().unwrap().features()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acked_features(&self, features: u64) {
|
||||||
|
self.write().unwrap().acked_features(features)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||||
|
self.read().unwrap().protocol_features()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_event_idx(&self, enabled: bool) {
|
||||||
|
self.write().unwrap().set_event_idx(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||||
|
self.read().unwrap().get_config(offset, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_config(&self, offset: u32, buf: &[u8]) -> Result<()> {
|
||||||
|
self.write().unwrap().set_config(offset, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_memory(&self, mem: GM<B>) -> Result<()> {
|
||||||
|
self.write().unwrap().update_memory(mem)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_slave_req_fd(&self, vu_req: SlaveFsCacheReq) {
|
||||||
|
self.write().unwrap().set_slave_req_fd(vu_req)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
self.read().unwrap().queues_per_thread()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
|
||||||
|
self.read().unwrap().exit_event(thread_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(
|
||||||
|
&self,
|
||||||
|
device_event: u16,
|
||||||
|
evset: EventSet,
|
||||||
|
vrings: &[V],
|
||||||
|
thread_id: usize,
|
||||||
|
) -> Result<bool> {
|
||||||
|
self.write()
|
||||||
|
.unwrap()
|
||||||
|
.handle_event(device_event, evset, vrings, thread_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::VringRwLock;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
|
||||||
|
|
||||||
|
pub struct MockVhostBackend {
|
||||||
|
events: u64,
|
||||||
|
event_idx: bool,
|
||||||
|
acked_features: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MockVhostBackend {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
MockVhostBackend {
|
||||||
|
events: 0,
|
||||||
|
event_idx: false,
|
||||||
|
acked_features: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VhostUserBackendMut<VringRwLock, ()> for MockVhostBackend {
|
||||||
|
fn num_queues(&self) -> usize {
|
||||||
|
2
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_queue_size(&self) -> usize {
|
||||||
|
256
|
||||||
|
}
|
||||||
|
|
||||||
|
fn features(&self) -> u64 {
|
||||||
|
0xffff_ffff_ffff_ffff
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acked_features(&mut self, features: u64) {
|
||||||
|
self.acked_features = features;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||||
|
VhostUserProtocolFeatures::all()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_event_idx(&mut self, enabled: bool) {
|
||||||
|
self.event_idx = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||||
|
assert_eq!(offset, 0x200);
|
||||||
|
assert_eq!(size, 8);
|
||||||
|
|
||||||
|
vec![0xa5u8; 8]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<()> {
|
||||||
|
assert_eq!(offset, 0x200);
|
||||||
|
assert_eq!(buf.len(), 8);
|
||||||
|
assert_eq!(buf, &[0xa5u8; 8]);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_memory(&mut self, _atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||||
|
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
vec![1, 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
|
||||||
|
let event_fd = EventFd::new(0).unwrap();
|
||||||
|
|
||||||
|
Some(event_fd)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(
|
||||||
|
&mut self,
|
||||||
|
_device_event: u16,
|
||||||
|
_evset: EventSet,
|
||||||
|
_vrings: &[VringRwLock],
|
||||||
|
_thread_id: usize,
|
||||||
|
) -> Result<bool> {
|
||||||
|
self.events += 1;
|
||||||
|
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_mock_backend_mutex() {
|
||||||
|
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||||
|
|
||||||
|
assert_eq!(backend.num_queues(), 2);
|
||||||
|
assert_eq!(backend.max_queue_size(), 256);
|
||||||
|
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
|
||||||
|
assert_eq!(
|
||||||
|
backend.protocol_features(),
|
||||||
|
VhostUserProtocolFeatures::all()
|
||||||
|
);
|
||||||
|
assert_eq!(backend.queues_per_thread(), [1, 1]);
|
||||||
|
|
||||||
|
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
|
||||||
|
backend.set_config(0x200, &[0xa5; 8]).unwrap();
|
||||||
|
|
||||||
|
backend.acked_features(0xffff);
|
||||||
|
assert_eq!(backend.lock().unwrap().acked_features, 0xffff);
|
||||||
|
|
||||||
|
backend.set_event_idx(true);
|
||||||
|
assert!(backend.lock().unwrap().event_idx);
|
||||||
|
|
||||||
|
let _ = backend.exit_event(0).unwrap();
|
||||||
|
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||||
|
);
|
||||||
|
backend.update_memory(mem).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_mock_backend_rwlock() {
|
||||||
|
let backend = Arc::new(RwLock::new(MockVhostBackend::new()));
|
||||||
|
|
||||||
|
assert_eq!(backend.num_queues(), 2);
|
||||||
|
assert_eq!(backend.max_queue_size(), 256);
|
||||||
|
assert_eq!(backend.features(), 0xffff_ffff_ffff_ffff);
|
||||||
|
assert_eq!(
|
||||||
|
backend.protocol_features(),
|
||||||
|
VhostUserProtocolFeatures::all()
|
||||||
|
);
|
||||||
|
assert_eq!(backend.queues_per_thread(), [1, 1]);
|
||||||
|
|
||||||
|
assert_eq!(backend.get_config(0x200, 8), vec![0xa5; 8]);
|
||||||
|
backend.set_config(0x200, &[0xa5; 8]).unwrap();
|
||||||
|
|
||||||
|
backend.acked_features(0xffff);
|
||||||
|
assert_eq!(backend.read().unwrap().acked_features, 0xffff);
|
||||||
|
|
||||||
|
backend.set_event_idx(true);
|
||||||
|
assert!(backend.read().unwrap().event_idx);
|
||||||
|
|
||||||
|
let _ = backend.exit_event(0).unwrap();
|
||||||
|
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||||
|
);
|
||||||
|
backend.update_memory(mem.clone()).unwrap();
|
||||||
|
|
||||||
|
let vring = VringRwLock::new(mem, 0x1000).unwrap();
|
||||||
|
backend
|
||||||
|
.handle_event(0x1, EventSet::IN, &[vring], 0)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
281
crates/vhost-user-backend/src/event_loop.rs
Normal file
281
crates/vhost-user-backend/src/event_loop.rs
Normal file
|
|
@ -0,0 +1,281 @@
|
||||||
|
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||||
|
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
use std::io::{self, Result};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
|
|
||||||
|
use vm_memory::bitmap::Bitmap;
|
||||||
|
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
|
||||||
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
|
use super::backend::VhostUserBackend;
|
||||||
|
use super::vring::VringT;
|
||||||
|
use super::GM;
|
||||||
|
|
||||||
|
/// Errors related to vring epoll event handling.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum VringEpollError {
|
||||||
|
/// Failed to create epoll file descriptor.
|
||||||
|
EpollCreateFd(io::Error),
|
||||||
|
/// Failed while waiting for events.
|
||||||
|
EpollWait(io::Error),
|
||||||
|
/// Could not register exit event
|
||||||
|
RegisterExitEvent(io::Error),
|
||||||
|
/// Failed to read the event from kick EventFd.
|
||||||
|
HandleEventReadKick(io::Error),
|
||||||
|
/// Failed to handle the event from the backend.
|
||||||
|
HandleEventBackendHandling(io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for VringEpollError {
|
||||||
|
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
VringEpollError::EpollCreateFd(e) => write!(f, "cannot create epoll fd: {}", e),
|
||||||
|
VringEpollError::EpollWait(e) => write!(f, "failed to wait for epoll event: {}", e),
|
||||||
|
VringEpollError::RegisterExitEvent(e) => write!(f, "cannot register exit event: {}", e),
|
||||||
|
VringEpollError::HandleEventReadKick(e) => {
|
||||||
|
write!(f, "cannot read vring kick event: {}", e)
|
||||||
|
}
|
||||||
|
VringEpollError::HandleEventBackendHandling(e) => {
|
||||||
|
write!(f, "failed to handle epoll event: {}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for VringEpollError {}
|
||||||
|
|
||||||
|
/// Result of vring epoll operations.
|
||||||
|
pub type VringEpollResult<T> = std::result::Result<T, VringEpollError>;
|
||||||
|
|
||||||
|
/// Epoll event handler to manage and process epoll events for registered file descriptor.
|
||||||
|
///
|
||||||
|
/// The `VringEpollHandler` structure provides interfaces to:
|
||||||
|
/// - add file descriptors to be monitored by the epoll fd
|
||||||
|
/// - remove registered file descriptors from the epoll fd
|
||||||
|
/// - run the event loop to handle pending events on the epoll fd
|
||||||
|
pub struct VringEpollHandler<S, V, B> {
|
||||||
|
epoll: Epoll,
|
||||||
|
backend: S,
|
||||||
|
vrings: Vec<V>,
|
||||||
|
thread_id: usize,
|
||||||
|
exit_event_fd: Option<EventFd>,
|
||||||
|
phantom: PhantomData<B>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B> VringEpollHandler<S, V, B> {
|
||||||
|
/// Send `exit event` to break the event loop.
|
||||||
|
pub fn send_exit_event(&self) {
|
||||||
|
if let Some(eventfd) = self.exit_event_fd.as_ref() {
|
||||||
|
let _ = eventfd.write(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B> VringEpollHandler<S, V, B>
|
||||||
|
where
|
||||||
|
S: VhostUserBackend<V, B>,
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap + 'static,
|
||||||
|
{
|
||||||
|
/// Create a `VringEpollHandler` instance.
|
||||||
|
pub(crate) fn new(backend: S, vrings: Vec<V>, thread_id: usize) -> VringEpollResult<Self> {
|
||||||
|
let epoll = Epoll::new().map_err(VringEpollError::EpollCreateFd)?;
|
||||||
|
|
||||||
|
let handler = match backend.exit_event(thread_id) {
|
||||||
|
Some(exit_event_fd) => {
|
||||||
|
let id = backend.num_queues();
|
||||||
|
epoll
|
||||||
|
.ctl(
|
||||||
|
ControlOperation::Add,
|
||||||
|
exit_event_fd.as_raw_fd(),
|
||||||
|
EpollEvent::new(EventSet::IN, id as u64),
|
||||||
|
)
|
||||||
|
.map_err(VringEpollError::RegisterExitEvent)?;
|
||||||
|
|
||||||
|
VringEpollHandler {
|
||||||
|
epoll,
|
||||||
|
backend,
|
||||||
|
vrings,
|
||||||
|
thread_id,
|
||||||
|
exit_event_fd: Some(exit_event_fd),
|
||||||
|
phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => VringEpollHandler {
|
||||||
|
epoll,
|
||||||
|
backend,
|
||||||
|
vrings,
|
||||||
|
thread_id,
|
||||||
|
exit_event_fd: None,
|
||||||
|
phantom: PhantomData,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register an event into the epoll fd.
|
||||||
|
///
|
||||||
|
/// When this event is later triggered, the backend implementation of `handle_event` will be
|
||||||
|
/// called.
|
||||||
|
pub fn register_listener(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
|
||||||
|
// `data` range [0...num_queues] is reserved for queues and exit event.
|
||||||
|
if data <= self.backend.num_queues() as u64 {
|
||||||
|
Err(io::Error::from_raw_os_error(libc::EINVAL))
|
||||||
|
} else {
|
||||||
|
self.register_event(fd, ev_type, data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unregister an event from the epoll fd.
|
||||||
|
///
|
||||||
|
/// If the event is triggered after this function has been called, the event will be silently
|
||||||
|
/// dropped.
|
||||||
|
pub fn unregister_listener(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
|
||||||
|
// `data` range [0...num_queues] is reserved for queues and exit event.
|
||||||
|
if data <= self.backend.num_queues() as u64 {
|
||||||
|
Err(io::Error::from_raw_os_error(libc::EINVAL))
|
||||||
|
} else {
|
||||||
|
self.unregister_event(fd, ev_type, data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn register_event(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
|
||||||
|
self.epoll
|
||||||
|
.ctl(ControlOperation::Add, fd, EpollEvent::new(ev_type, data))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn unregister_event(&self, fd: RawFd, ev_type: EventSet, data: u64) -> Result<()> {
|
||||||
|
self.epoll
|
||||||
|
.ctl(ControlOperation::Delete, fd, EpollEvent::new(ev_type, data))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run the event poll loop to handle all pending events on registered fds.
|
||||||
|
///
|
||||||
|
/// The event loop will be terminated once an event is received from the `exit event fd`
|
||||||
|
/// associated with the backend.
|
||||||
|
pub(crate) fn run(&self) -> VringEpollResult<()> {
|
||||||
|
const EPOLL_EVENTS_LEN: usize = 100;
|
||||||
|
let mut events = vec![EpollEvent::new(EventSet::empty(), 0); EPOLL_EVENTS_LEN];
|
||||||
|
|
||||||
|
'epoll: loop {
|
||||||
|
let num_events = match self.epoll.wait(-1, &mut events[..]) {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(e) => {
|
||||||
|
if e.kind() == io::ErrorKind::Interrupted {
|
||||||
|
// It's well defined from the epoll_wait() syscall
|
||||||
|
// documentation that the epoll loop can be interrupted
|
||||||
|
// before any of the requested events occurred or the
|
||||||
|
// timeout expired. In both those cases, epoll_wait()
|
||||||
|
// returns an error of type EINTR, but this should not
|
||||||
|
// be considered as a regular error. Instead it is more
|
||||||
|
// appropriate to retry, by calling into epoll_wait().
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return Err(VringEpollError::EpollWait(e));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for event in events.iter().take(num_events) {
|
||||||
|
let evset = match EventSet::from_bits(event.events) {
|
||||||
|
Some(evset) => evset,
|
||||||
|
None => {
|
||||||
|
let evbits = event.events;
|
||||||
|
println!("epoll: ignoring unknown event set: 0x{:x}", evbits);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let ev_type = event.data() as u16;
|
||||||
|
|
||||||
|
// handle_event() returns true if an event is received from the exit event fd.
|
||||||
|
if self.handle_event(ev_type, evset)? {
|
||||||
|
break 'epoll;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(&self, device_event: u16, evset: EventSet) -> VringEpollResult<bool> {
|
||||||
|
if self.exit_event_fd.is_some() && device_event as usize == self.backend.num_queues() {
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (device_event as usize) < self.vrings.len() {
|
||||||
|
let vring = &self.vrings[device_event as usize];
|
||||||
|
let enabled = vring
|
||||||
|
.read_kick()
|
||||||
|
.map_err(VringEpollError::HandleEventReadKick)?;
|
||||||
|
|
||||||
|
// If the vring is not enabled, it should not be processed.
|
||||||
|
if !enabled {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.backend
|
||||||
|
.handle_event(device_event, evset, &self.vrings, self.thread_id)
|
||||||
|
.map_err(VringEpollError::HandleEventBackendHandling)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B> AsRawFd for VringEpollHandler<S, V, B> {
|
||||||
|
fn as_raw_fd(&self) -> RawFd {
|
||||||
|
self.epoll.as_raw_fd()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::super::backend::tests::MockVhostBackend;
|
||||||
|
use super::super::vring::VringRwLock;
|
||||||
|
use super::*;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
|
||||||
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_vring_epoll_handler() {
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||||
|
);
|
||||||
|
let vring = VringRwLock::new(mem, 0x1000).unwrap();
|
||||||
|
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||||
|
|
||||||
|
let handler = VringEpollHandler::new(backend, vec![vring], 0x1).unwrap();
|
||||||
|
|
||||||
|
let eventfd = EventFd::new(0).unwrap();
|
||||||
|
handler
|
||||||
|
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
|
||||||
|
.unwrap();
|
||||||
|
// Register an already registered fd.
|
||||||
|
handler
|
||||||
|
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
|
||||||
|
.unwrap_err();
|
||||||
|
// Register an invalid data.
|
||||||
|
handler
|
||||||
|
.register_listener(eventfd.as_raw_fd(), EventSet::IN, 1)
|
||||||
|
.unwrap_err();
|
||||||
|
|
||||||
|
handler
|
||||||
|
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
|
||||||
|
.unwrap();
|
||||||
|
// unregister an already unregistered fd.
|
||||||
|
handler
|
||||||
|
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 3)
|
||||||
|
.unwrap_err();
|
||||||
|
// unregister an invalid data.
|
||||||
|
handler
|
||||||
|
.unregister_listener(eventfd.as_raw_fd(), EventSet::IN, 1)
|
||||||
|
.unwrap_err();
|
||||||
|
// Check we retrieve the correct file descriptor
|
||||||
|
assert_eq!(handler.as_raw_fd(), handler.epoll.as_raw_fd());
|
||||||
|
}
|
||||||
|
}
|
||||||
601
crates/vhost-user-backend/src/handler.rs
Normal file
601
crates/vhost-user-backend/src/handler.rs
Normal file
|
|
@ -0,0 +1,601 @@
|
||||||
|
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||||
|
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use std::error;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io;
|
||||||
|
use std::os::unix::io::AsRawFd;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use vhost::vhost_user::message::{
|
||||||
|
VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures,
|
||||||
|
VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags,
|
||||||
|
VhostUserVringState,
|
||||||
|
};
|
||||||
|
use vhost::vhost_user::{
|
||||||
|
Error as VhostUserError, Result as VhostUserResult, SlaveFsCacheReq,
|
||||||
|
VhostUserSlaveReqHandlerMut,
|
||||||
|
};
|
||||||
|
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
|
||||||
|
use virtio_queue::{Error as VirtQueError, QueueT};
|
||||||
|
use vm_memory::bitmap::Bitmap;
|
||||||
|
use vm_memory::mmap::NewBitmap;
|
||||||
|
use vm_memory::{
|
||||||
|
FileOffset, GuestAddress, GuestAddressSpace, GuestMemoryMmap, GuestRegionMmap, MmapRegion,
|
||||||
|
};
|
||||||
|
use vmm_sys_util::epoll::EventSet;
|
||||||
|
|
||||||
|
use super::backend::VhostUserBackend;
|
||||||
|
use super::event_loop::VringEpollHandler;
|
||||||
|
use super::event_loop::{VringEpollError, VringEpollResult};
|
||||||
|
use super::vring::VringT;
|
||||||
|
use super::GM;
|
||||||
|
|
||||||
|
const MAX_MEM_SLOTS: u64 = 32;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
/// Errors related to vhost-user handler.
|
||||||
|
pub enum VhostUserHandlerError {
|
||||||
|
/// Failed to create a `Vring`.
|
||||||
|
CreateVring(VirtQueError),
|
||||||
|
/// Failed to create vring worker.
|
||||||
|
CreateEpollHandler(VringEpollError),
|
||||||
|
/// Failed to spawn vring worker.
|
||||||
|
SpawnVringWorker(io::Error),
|
||||||
|
/// Could not find the mapping from memory regions.
|
||||||
|
MissingMemoryMapping,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for VhostUserHandlerError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
VhostUserHandlerError::CreateVring(e) => {
|
||||||
|
write!(f, "failed to create vring: {}", e)
|
||||||
|
}
|
||||||
|
VhostUserHandlerError::CreateEpollHandler(e) => {
|
||||||
|
write!(f, "failed to create vring epoll handler: {}", e)
|
||||||
|
}
|
||||||
|
VhostUserHandlerError::SpawnVringWorker(e) => {
|
||||||
|
write!(f, "failed spawning the vring worker: {}", e)
|
||||||
|
}
|
||||||
|
VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl error::Error for VhostUserHandlerError {}
|
||||||
|
|
||||||
|
/// Result of vhost-user handler operations.
|
||||||
|
pub type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
|
||||||
|
|
||||||
|
struct AddrMapping {
|
||||||
|
vmm_addr: u64,
|
||||||
|
size: u64,
|
||||||
|
gpa_base: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct VhostUserHandler<S, V, B: Bitmap + 'static> {
|
||||||
|
backend: S,
|
||||||
|
handlers: Vec<Arc<VringEpollHandler<S, V, B>>>,
|
||||||
|
owned: bool,
|
||||||
|
features_acked: bool,
|
||||||
|
acked_features: u64,
|
||||||
|
acked_protocol_features: u64,
|
||||||
|
num_queues: usize,
|
||||||
|
max_queue_size: usize,
|
||||||
|
queues_per_thread: Vec<u64>,
|
||||||
|
mappings: Vec<AddrMapping>,
|
||||||
|
atomic_mem: GM<B>,
|
||||||
|
vrings: Vec<V>,
|
||||||
|
worker_threads: Vec<thread::JoinHandle<VringEpollResult<()>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure VhostUserHandler: Clone + Send + Sync + 'static.
|
||||||
|
impl<S, V, B> VhostUserHandler<S, V, B>
|
||||||
|
where
|
||||||
|
S: VhostUserBackend<V, B> + Clone + 'static,
|
||||||
|
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
|
||||||
|
B: Bitmap + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
pub(crate) fn new(backend: S, atomic_mem: GM<B>) -> VhostUserHandlerResult<Self> {
|
||||||
|
let num_queues = backend.num_queues();
|
||||||
|
let max_queue_size = backend.max_queue_size();
|
||||||
|
let queues_per_thread = backend.queues_per_thread();
|
||||||
|
|
||||||
|
let mut vrings = Vec::new();
|
||||||
|
for _ in 0..num_queues {
|
||||||
|
let vring = V::new(atomic_mem.clone(), max_queue_size as u16)
|
||||||
|
.map_err(VhostUserHandlerError::CreateVring)?;
|
||||||
|
vrings.push(vring);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut handlers = Vec::new();
|
||||||
|
let mut worker_threads = Vec::new();
|
||||||
|
for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
|
||||||
|
let mut thread_vrings = Vec::new();
|
||||||
|
for (index, vring) in vrings.iter().enumerate() {
|
||||||
|
if (queues_mask >> index) & 1u64 == 1u64 {
|
||||||
|
thread_vrings.push(vring.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let handler = Arc::new(
|
||||||
|
VringEpollHandler::new(backend.clone(), thread_vrings, thread_id)
|
||||||
|
.map_err(VhostUserHandlerError::CreateEpollHandler)?,
|
||||||
|
);
|
||||||
|
let handler2 = handler.clone();
|
||||||
|
let worker_thread = thread::Builder::new()
|
||||||
|
.name("vring_worker".to_string())
|
||||||
|
.spawn(move || handler2.run())
|
||||||
|
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
|
||||||
|
|
||||||
|
handlers.push(handler);
|
||||||
|
worker_threads.push(worker_thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(VhostUserHandler {
|
||||||
|
backend,
|
||||||
|
handlers,
|
||||||
|
owned: false,
|
||||||
|
features_acked: false,
|
||||||
|
acked_features: 0,
|
||||||
|
acked_protocol_features: 0,
|
||||||
|
num_queues,
|
||||||
|
max_queue_size,
|
||||||
|
queues_per_thread,
|
||||||
|
mappings: Vec::new(),
|
||||||
|
atomic_mem,
|
||||||
|
vrings,
|
||||||
|
worker_threads,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B: Bitmap> VhostUserHandler<S, V, B> {
|
||||||
|
pub(crate) fn send_exit_event(&self) {
|
||||||
|
for handler in self.handlers.iter() {
|
||||||
|
handler.send_exit_event();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
|
||||||
|
for mapping in self.mappings.iter() {
|
||||||
|
if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size {
|
||||||
|
return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(VhostUserHandlerError::MissingMemoryMapping)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B> VhostUserHandler<S, V, B>
|
||||||
|
where
|
||||||
|
S: VhostUserBackend<V, B>,
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: Bitmap,
|
||||||
|
{
|
||||||
|
pub(crate) fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> {
|
||||||
|
self.handlers.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vring_needs_init(&self, vring: &V) -> bool {
|
||||||
|
let vring_state = vring.get_ref();
|
||||||
|
|
||||||
|
// If the vring wasn't initialized and we already have an EventFd for
|
||||||
|
// VRING_KICK, initialize it now.
|
||||||
|
!vring_state.get_queue().ready() && vring_state.get_kick().is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn initialize_vring(&self, vring: &V, index: u8) -> VhostUserResult<()> {
|
||||||
|
assert!(vring.get_ref().get_kick().is_some());
|
||||||
|
|
||||||
|
if let Some(fd) = vring.get_ref().get_kick() {
|
||||||
|
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
|
||||||
|
let shifted_queues_mask = queues_mask >> index;
|
||||||
|
if shifted_queues_mask & 1u64 == 1u64 {
|
||||||
|
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
|
||||||
|
self.handlers[thread_index]
|
||||||
|
.register_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
|
||||||
|
.map_err(VhostUserError::ReqHandlerError)?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.vrings[index as usize].set_queue_ready(true);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B> VhostUserSlaveReqHandlerMut for VhostUserHandler<S, V, B>
|
||||||
|
where
|
||||||
|
S: VhostUserBackend<V, B>,
|
||||||
|
V: VringT<GM<B>>,
|
||||||
|
B: NewBitmap + Clone,
|
||||||
|
{
|
||||||
|
fn set_owner(&mut self) -> VhostUserResult<()> {
|
||||||
|
if self.owned {
|
||||||
|
return Err(VhostUserError::InvalidOperation("already claimed"));
|
||||||
|
}
|
||||||
|
self.owned = true;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset_owner(&mut self) -> VhostUserResult<()> {
|
||||||
|
self.owned = false;
|
||||||
|
self.features_acked = false;
|
||||||
|
self.acked_features = 0;
|
||||||
|
self.acked_protocol_features = 0;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_features(&mut self) -> VhostUserResult<u64> {
|
||||||
|
Ok(self.backend.features())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
|
||||||
|
if (features & !self.backend.features()) != 0 {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.acked_features = features;
|
||||||
|
self.features_acked = true;
|
||||||
|
|
||||||
|
// If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated,
|
||||||
|
// the ring is initialized in an enabled state.
|
||||||
|
// If VHOST_USER_F_PROTOCOL_FEATURES has been negotiated,
|
||||||
|
// the ring is initialized in a disabled state. Client must not
|
||||||
|
// pass data to/from the backend until ring is enabled by
|
||||||
|
// VHOST_USER_SET_VRING_ENABLE with parameter 1, or after it has
|
||||||
|
// been disabled by VHOST_USER_SET_VRING_ENABLE with parameter 0.
|
||||||
|
let vring_enabled =
|
||||||
|
self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0;
|
||||||
|
for vring in self.vrings.iter_mut() {
|
||||||
|
vring.set_enabled(vring_enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.backend.acked_features(self.acked_features);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_mem_table(
|
||||||
|
&mut self,
|
||||||
|
ctx: &[VhostUserMemoryRegion],
|
||||||
|
files: Vec<File>,
|
||||||
|
) -> VhostUserResult<()> {
|
||||||
|
// We need to create tuple of ranges from the list of VhostUserMemoryRegion
|
||||||
|
// that we get from the caller.
|
||||||
|
let mut regions: Vec<(GuestAddress, usize, Option<FileOffset>)> = Vec::new();
|
||||||
|
let mut mappings: Vec<AddrMapping> = Vec::new();
|
||||||
|
|
||||||
|
for (region, file) in ctx.iter().zip(files) {
|
||||||
|
let g_addr = GuestAddress(region.guest_phys_addr);
|
||||||
|
let len = region.memory_size as usize;
|
||||||
|
let f_off = FileOffset::new(file, region.mmap_offset);
|
||||||
|
|
||||||
|
regions.push((g_addr, len, Some(f_off)));
|
||||||
|
mappings.push(AddrMapping {
|
||||||
|
vmm_addr: region.user_addr,
|
||||||
|
size: region.memory_size,
|
||||||
|
gpa_base: region.guest_phys_addr,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mem = GuestMemoryMmap::from_ranges_with_files(regions).map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Updating the inner GuestMemory object here will cause all our vrings to
|
||||||
|
// see the new one the next time they call to `atomic_mem.memory()`.
|
||||||
|
self.atomic_mem.lock().unwrap().replace(mem);
|
||||||
|
|
||||||
|
self.backend
|
||||||
|
.update_memory(self.atomic_mem.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
self.mappings = mappings;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> {
|
||||||
|
if index as usize >= self.num_queues || num == 0 || num as usize > self.max_queue_size {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
self.vrings[index as usize].set_queue_size(num as u16);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_addr(
|
||||||
|
&mut self,
|
||||||
|
index: u32,
|
||||||
|
_flags: VhostUserVringAddrFlags,
|
||||||
|
descriptor: u64,
|
||||||
|
used: u64,
|
||||||
|
available: u64,
|
||||||
|
_log: u64,
|
||||||
|
) -> VhostUserResult<()> {
|
||||||
|
if index as usize >= self.num_queues {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.mappings.is_empty() {
|
||||||
|
let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
let used_ring = self.vmm_va_to_gpa(used).map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
self.vrings[index as usize]
|
||||||
|
.set_queue_info(desc_table, avail_ring, used_ring)
|
||||||
|
.map_err(|_| VhostUserError::InvalidParam)?;
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(VhostUserError::InvalidParam)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
|
||||||
|
let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0;
|
||||||
|
|
||||||
|
self.vrings[index as usize].set_queue_next_avail(base as u16);
|
||||||
|
self.vrings[index as usize].set_queue_event_idx(event_idx);
|
||||||
|
self.backend.set_event_idx(event_idx);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_vring_base(&mut self, index: u32) -> VhostUserResult<VhostUserVringState> {
|
||||||
|
if index as usize >= self.num_queues {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
// Quote from vhost-user specification:
|
||||||
|
// Client must start ring upon receiving a kick (that is, detecting
|
||||||
|
// that file descriptor is readable) on the descriptor specified by
|
||||||
|
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
|
||||||
|
// VHOST_USER_GET_VRING_BASE.
|
||||||
|
self.vrings[index as usize].set_queue_ready(false);
|
||||||
|
if let Some(fd) = self.vrings[index as usize].get_ref().get_kick() {
|
||||||
|
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
|
||||||
|
let shifted_queues_mask = queues_mask >> index;
|
||||||
|
if shifted_queues_mask & 1u64 == 1u64 {
|
||||||
|
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
|
||||||
|
self.handlers[thread_index]
|
||||||
|
.unregister_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
|
||||||
|
.map_err(VhostUserError::ReqHandlerError)?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.vrings[index as usize].set_kick(None);
|
||||||
|
self.vrings[index as usize].set_call(None);
|
||||||
|
|
||||||
|
// Strictly speaking, we should do this upon receiving the first kick,
|
||||||
|
// but it's actually easier to just do it here so we're ready in case
|
||||||
|
// the vring gets re-initialized by the guest.
|
||||||
|
self.vrings[index as usize]
|
||||||
|
.get_mut()
|
||||||
|
.get_queue_mut()
|
||||||
|
.reset();
|
||||||
|
|
||||||
|
let next_avail = self.vrings[index as usize].queue_next_avail();
|
||||||
|
|
||||||
|
Ok(VhostUserVringState::new(index, u32::from(next_avail)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_kick(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
|
||||||
|
if index as usize >= self.num_queues {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY: EventFd requires that it has sole ownership of its fd. So
|
||||||
|
// does File, so this is safe.
|
||||||
|
// Ideally, we'd have a generic way to refer to a uniquely-owned fd,
|
||||||
|
// such as that proposed by Rust RFC #3128.
|
||||||
|
self.vrings[index as usize].set_kick(file);
|
||||||
|
|
||||||
|
if self.vring_needs_init(&self.vrings[index as usize]) {
|
||||||
|
self.initialize_vring(&self.vrings[index as usize], index)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_call(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
|
||||||
|
if index as usize >= self.num_queues {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.vrings[index as usize].set_call(file);
|
||||||
|
|
||||||
|
if self.vring_needs_init(&self.vrings[index as usize]) {
|
||||||
|
self.initialize_vring(&self.vrings[index as usize], index)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_err(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
|
||||||
|
if index as usize >= self.num_queues {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.vrings[index as usize].set_err(file);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
|
||||||
|
Ok(self.backend.protocol_features())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> {
|
||||||
|
// Note: slave that reported VHOST_USER_F_PROTOCOL_FEATURES must
|
||||||
|
// support this message even before VHOST_USER_SET_FEATURES was
|
||||||
|
// called.
|
||||||
|
self.acked_protocol_features = features;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_queue_num(&mut self) -> VhostUserResult<u64> {
|
||||||
|
Ok(self.num_queues as u64)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> {
|
||||||
|
// This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES
|
||||||
|
// has been negotiated.
|
||||||
|
if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 {
|
||||||
|
return Err(VhostUserError::InvalidOperation(
|
||||||
|
"protocol features not set",
|
||||||
|
));
|
||||||
|
} else if index as usize >= self.num_queues {
|
||||||
|
return Err(VhostUserError::InvalidParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slave must not pass data to/from the backend until ring is
|
||||||
|
// enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1,
|
||||||
|
// or after it has been disabled by VHOST_USER_SET_VRING_ENABLE
|
||||||
|
// with parameter 0.
|
||||||
|
self.vrings[index as usize].set_enabled(enable);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_config(
|
||||||
|
&mut self,
|
||||||
|
offset: u32,
|
||||||
|
size: u32,
|
||||||
|
_flags: VhostUserConfigFlags,
|
||||||
|
) -> VhostUserResult<Vec<u8>> {
|
||||||
|
Ok(self.backend.get_config(offset, size))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_config(
|
||||||
|
&mut self,
|
||||||
|
offset: u32,
|
||||||
|
buf: &[u8],
|
||||||
|
_flags: VhostUserConfigFlags,
|
||||||
|
) -> VhostUserResult<()> {
|
||||||
|
self.backend
|
||||||
|
.set_config(offset, buf)
|
||||||
|
.map_err(VhostUserError::ReqHandlerError)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_slave_req_fd(&mut self, vu_req: SlaveFsCacheReq) {
|
||||||
|
if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 {
|
||||||
|
vu_req.set_reply_ack_flag(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.backend.set_slave_req_fd(vu_req);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_inflight_fd(
|
||||||
|
&mut self,
|
||||||
|
_inflight: &vhost::vhost_user::message::VhostUserInflight,
|
||||||
|
) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> {
|
||||||
|
// Assume the backend hasn't negotiated the inflight feature; it
|
||||||
|
// wouldn't be correct for the backend to do so, as we don't (yet)
|
||||||
|
// provide a way for it to handle such requests.
|
||||||
|
Err(VhostUserError::InvalidOperation("not supported"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_inflight_fd(
|
||||||
|
&mut self,
|
||||||
|
_inflight: &vhost::vhost_user::message::VhostUserInflight,
|
||||||
|
_file: File,
|
||||||
|
) -> VhostUserResult<()> {
|
||||||
|
Err(VhostUserError::InvalidOperation("not supported"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
|
||||||
|
Ok(MAX_MEM_SLOTS)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_mem_region(
|
||||||
|
&mut self,
|
||||||
|
region: &VhostUserSingleMemoryRegion,
|
||||||
|
file: File,
|
||||||
|
) -> VhostUserResult<()> {
|
||||||
|
let mmap_region = MmapRegion::from_file(
|
||||||
|
FileOffset::new(file, region.mmap_offset),
|
||||||
|
region.memory_size as usize,
|
||||||
|
)
|
||||||
|
.map_err(|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)))?;
|
||||||
|
let guest_region = Arc::new(
|
||||||
|
GuestRegionMmap::new(mmap_region, GuestAddress(region.guest_phys_addr)).map_err(
|
||||||
|
|e| VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e)),
|
||||||
|
)?,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mem = self
|
||||||
|
.atomic_mem
|
||||||
|
.memory()
|
||||||
|
.insert_region(guest_region)
|
||||||
|
.map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.atomic_mem.lock().unwrap().replace(mem);
|
||||||
|
|
||||||
|
self.backend
|
||||||
|
.update_memory(self.atomic_mem.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.mappings.push(AddrMapping {
|
||||||
|
vmm_addr: region.user_addr,
|
||||||
|
size: region.memory_size,
|
||||||
|
gpa_base: region.guest_phys_addr,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> {
|
||||||
|
let (mem, _) = self
|
||||||
|
.atomic_mem
|
||||||
|
.memory()
|
||||||
|
.remove_region(GuestAddress(region.guest_phys_addr), region.memory_size)
|
||||||
|
.map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.atomic_mem.lock().unwrap().replace(mem);
|
||||||
|
|
||||||
|
self.backend
|
||||||
|
.update_memory(self.atomic_mem.clone())
|
||||||
|
.map_err(|e| {
|
||||||
|
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.mappings
|
||||||
|
.retain(|mapping| mapping.gpa_base != region.guest_phys_addr);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B: Bitmap> Drop for VhostUserHandler<S, V, B> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// Signal all working threads to exit.
|
||||||
|
self.send_exit_event();
|
||||||
|
|
||||||
|
for thread in self.worker_threads.drain(..) {
|
||||||
|
if let Err(e) = thread.join() {
|
||||||
|
error!("Error in vring worker: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
270
crates/vhost-user-backend/src/lib.rs
Normal file
270
crates/vhost-user-backend/src/lib.rs
Normal file
|
|
@ -0,0 +1,270 @@
|
||||||
|
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||||
|
// Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
//! A simple framework to run a vhost-user backend service.
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
|
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler};
|
||||||
|
use vm_memory::bitmap::Bitmap;
|
||||||
|
use vm_memory::mmap::NewBitmap;
|
||||||
|
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
|
||||||
|
|
||||||
|
use self::handler::VhostUserHandler;
|
||||||
|
|
||||||
|
mod backend;
|
||||||
|
pub use self::backend::{VhostUserBackend, VhostUserBackendMut};
|
||||||
|
|
||||||
|
mod event_loop;
|
||||||
|
pub use self::event_loop::VringEpollHandler;
|
||||||
|
|
||||||
|
mod handler;
|
||||||
|
pub use self::handler::VhostUserHandlerError;
|
||||||
|
|
||||||
|
mod vring;
|
||||||
|
pub use self::vring::{
|
||||||
|
VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code.
|
||||||
|
type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
/// Errors related to vhost-user daemon.
|
||||||
|
pub enum Error {
|
||||||
|
/// Failed to create a new vhost-user handler.
|
||||||
|
NewVhostUserHandler(VhostUserHandlerError),
|
||||||
|
/// Failed creating vhost-user slave listener.
|
||||||
|
CreateSlaveListener(VhostUserError),
|
||||||
|
/// Failed creating vhost-user slave handler.
|
||||||
|
CreateSlaveReqHandler(VhostUserError),
|
||||||
|
/// Failed starting daemon thread.
|
||||||
|
StartDaemon(std::io::Error),
|
||||||
|
/// Failed waiting for daemon thread.
|
||||||
|
WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
|
||||||
|
/// Failed handling a vhost-user request.
|
||||||
|
HandleRequest(VhostUserError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Error {
|
||||||
|
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e),
|
||||||
|
Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e),
|
||||||
|
Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e),
|
||||||
|
Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e),
|
||||||
|
Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"),
|
||||||
|
Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Result of vhost-user daemon operations.
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
/// Implement a simple framework to run a vhost-user service daemon.
|
||||||
|
///
|
||||||
|
/// This structure is the public API the backend is allowed to interact with in order to run
|
||||||
|
/// a fully functional vhost-user daemon.
|
||||||
|
pub struct VhostUserDaemon<S, V, B: Bitmap + 'static = ()> {
|
||||||
|
name: String,
|
||||||
|
handler: Arc<Mutex<VhostUserHandler<S, V, B>>>,
|
||||||
|
main_thread: Option<thread::JoinHandle<Result<()>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V, B> VhostUserDaemon<S, V, B>
|
||||||
|
where
|
||||||
|
S: VhostUserBackend<V, B> + Clone + 'static,
|
||||||
|
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
|
||||||
|
B: NewBitmap + Clone + Send + Sync,
|
||||||
|
{
|
||||||
|
/// Create the daemon instance, providing the backend implementation of `VhostUserBackend`.
|
||||||
|
///
|
||||||
|
/// Under the hood, this will start a dedicated thread responsible for listening onto
|
||||||
|
/// registered event. Those events can be vring events or custom events from the backend,
|
||||||
|
/// but they get to be registered later during the sequence.
|
||||||
|
pub fn new(
|
||||||
|
name: String,
|
||||||
|
backend: S,
|
||||||
|
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let handler = Arc::new(Mutex::new(
|
||||||
|
VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?,
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(VhostUserDaemon {
|
||||||
|
name,
|
||||||
|
handler,
|
||||||
|
main_thread: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run a dedicated thread handling all requests coming through the socket.
|
||||||
|
/// This runs in an infinite loop that should be terminating once the other
|
||||||
|
/// end of the socket (the VMM) hangs up.
|
||||||
|
///
|
||||||
|
/// This function is the common code for starting a new daemon, no matter if
|
||||||
|
/// it acts as a client or a server.
|
||||||
|
fn start_daemon(
|
||||||
|
&mut self,
|
||||||
|
mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let handle = thread::Builder::new()
|
||||||
|
.name(self.name.clone())
|
||||||
|
.spawn(move || loop {
|
||||||
|
handler.handle_request().map_err(Error::HandleRequest)?;
|
||||||
|
})
|
||||||
|
.map_err(Error::StartDaemon)?;
|
||||||
|
|
||||||
|
self.main_thread = Some(handle);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connect to the vhost-user socket and run a dedicated thread handling
|
||||||
|
/// all requests coming through this socket. This runs in an infinite loop
|
||||||
|
/// that should be terminating once the other end of the socket (the VMM)
|
||||||
|
/// hangs up.
|
||||||
|
pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
|
||||||
|
let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone())
|
||||||
|
.map_err(Error::CreateSlaveReqHandler)?;
|
||||||
|
self.start_daemon(slave_handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Listen to the vhost-user socket and run a dedicated thread handling all requests coming
|
||||||
|
/// through this socket.
|
||||||
|
///
|
||||||
|
/// This runs in an infinite loop that should be terminating once the other end of the socket
|
||||||
|
/// (the VMM) disconnects.
|
||||||
|
// TODO: the current implementation has limitations that only one incoming connection will be
|
||||||
|
// handled from the listener. Should it be enhanced to support reconnection?
|
||||||
|
pub fn start(&mut self, listener: Listener) -> Result<()> {
|
||||||
|
let mut slave_listener = SlaveListener::new(listener, self.handler.clone())
|
||||||
|
.map_err(Error::CreateSlaveListener)?;
|
||||||
|
let slave_handler = self.accept(&mut slave_listener)?;
|
||||||
|
self.start_daemon(slave_handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn accept(
|
||||||
|
&self,
|
||||||
|
slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>,
|
||||||
|
) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>> {
|
||||||
|
loop {
|
||||||
|
match slave_listener.accept() {
|
||||||
|
Err(e) => return Err(Error::CreateSlaveListener(e)),
|
||||||
|
Ok(Some(v)) => return Ok(v),
|
||||||
|
Ok(None) => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for the thread handling the vhost-user socket connection to terminate.
|
||||||
|
pub fn wait(&mut self) -> Result<()> {
|
||||||
|
if let Some(handle) = self.main_thread.take() {
|
||||||
|
match handle.join().map_err(Error::WaitDaemon)? {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieve the vring epoll handler.
|
||||||
|
///
|
||||||
|
/// This is necessary to perform further actions like registering and unregistering some extra
|
||||||
|
/// event file descriptors.
|
||||||
|
pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> {
|
||||||
|
// Do not expect poisoned lock.
|
||||||
|
self.handler.lock().unwrap().get_epoll_handlers()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::backend::tests::MockVhostBackend;
|
||||||
|
use super::*;
|
||||||
|
use std::os::unix::net::{UnixListener, UnixStream};
|
||||||
|
use std::sync::Barrier;
|
||||||
|
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_daemon() {
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||||
|
);
|
||||||
|
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||||
|
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
|
||||||
|
|
||||||
|
let handlers = daemon.get_epoll_handlers();
|
||||||
|
assert_eq!(handlers.len(), 2);
|
||||||
|
|
||||||
|
let barrier = Arc::new(Barrier::new(2));
|
||||||
|
let tmpdir = tempfile::tempdir().unwrap();
|
||||||
|
let mut path = tmpdir.path().to_path_buf();
|
||||||
|
path.push("socket");
|
||||||
|
|
||||||
|
let barrier2 = barrier.clone();
|
||||||
|
let path1 = path.clone();
|
||||||
|
let thread = thread::spawn(move || {
|
||||||
|
barrier2.wait();
|
||||||
|
let socket = UnixStream::connect(&path1).unwrap();
|
||||||
|
barrier2.wait();
|
||||||
|
drop(socket)
|
||||||
|
});
|
||||||
|
|
||||||
|
let listener = Listener::new(&path, false).unwrap();
|
||||||
|
barrier.wait();
|
||||||
|
daemon.start(listener).unwrap();
|
||||||
|
barrier.wait();
|
||||||
|
// Above process generates a `HandleRequest(PartialMessage)` error.
|
||||||
|
daemon.wait().unwrap_err();
|
||||||
|
daemon.wait().unwrap();
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_daemon_client() {
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||||
|
);
|
||||||
|
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||||
|
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
|
||||||
|
|
||||||
|
let handlers = daemon.get_epoll_handlers();
|
||||||
|
assert_eq!(handlers.len(), 2);
|
||||||
|
|
||||||
|
let barrier = Arc::new(Barrier::new(2));
|
||||||
|
let tmpdir = tempfile::tempdir().unwrap();
|
||||||
|
let mut path = tmpdir.path().to_path_buf();
|
||||||
|
path.push("socket");
|
||||||
|
|
||||||
|
let barrier2 = barrier.clone();
|
||||||
|
let path1 = path.clone();
|
||||||
|
let thread = thread::spawn(move || {
|
||||||
|
let listener = UnixListener::bind(&path1).unwrap();
|
||||||
|
barrier2.wait();
|
||||||
|
let (stream, _) = listener.accept().unwrap();
|
||||||
|
barrier2.wait();
|
||||||
|
drop(stream)
|
||||||
|
});
|
||||||
|
|
||||||
|
barrier.wait();
|
||||||
|
daemon
|
||||||
|
.start_client(path.as_path().to_str().unwrap())
|
||||||
|
.unwrap();
|
||||||
|
barrier.wait();
|
||||||
|
// Above process generates a `HandleRequest(PartialMessage)` error.
|
||||||
|
daemon.wait().unwrap_err();
|
||||||
|
daemon.wait().unwrap();
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
547
crates/vhost-user-backend/src/vring.rs
Normal file
547
crates/vhost-user-backend/src/vring.rs
Normal file
|
|
@ -0,0 +1,547 @@
|
||||||
|
// Copyright 2019 Intel Corporation. All Rights Reserved.
|
||||||
|
// Copyright 2021 Alibaba Cloud Computing. All rights reserved.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
//! Struct to maintain state information and manipulate vhost-user queues.
|
||||||
|
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io;
|
||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
|
use std::os::unix::io::{FromRawFd, IntoRawFd};
|
||||||
|
use std::result::Result;
|
||||||
|
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
|
|
||||||
|
use virtio_queue::{Error as VirtQueError, Queue, QueueT};
|
||||||
|
use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
|
||||||
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
|
/// Trait for objects returned by `VringT::get_ref()`.
|
||||||
|
pub trait VringStateGuard<'a, M: GuestAddressSpace> {
|
||||||
|
/// Type for guard returned by `VringT::get_ref()`.
|
||||||
|
type G: Deref<Target = VringState<M>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trait for objects returned by `VringT::get_mut()`.
|
||||||
|
pub trait VringStateMutGuard<'a, M: GuestAddressSpace> {
|
||||||
|
/// Type for guard returned by `VringT::get_mut()`.
|
||||||
|
type G: DerefMut<Target = VringState<M>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait VringT<M: GuestAddressSpace>:
|
||||||
|
for<'a> VringStateGuard<'a, M> + for<'a> VringStateMutGuard<'a, M>
|
||||||
|
{
|
||||||
|
/// Create a new instance of Vring.
|
||||||
|
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError>
|
||||||
|
where
|
||||||
|
Self: Sized;
|
||||||
|
|
||||||
|
/// Get an immutable reference to the kick event fd.
|
||||||
|
fn get_ref(&self) -> <Self as VringStateGuard<M>>::G;
|
||||||
|
|
||||||
|
/// Get a mutable reference to the kick event fd.
|
||||||
|
fn get_mut(&self) -> <Self as VringStateMutGuard<M>>::G;
|
||||||
|
|
||||||
|
/// Add an used descriptor into the used queue.
|
||||||
|
fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError>;
|
||||||
|
|
||||||
|
/// Notify the vhost-user master that used descriptors have been put into the used queue.
|
||||||
|
fn signal_used_queue(&self) -> io::Result<()>;
|
||||||
|
|
||||||
|
/// Enable event notification for queue.
|
||||||
|
fn enable_notification(&self) -> Result<bool, VirtQueError>;
|
||||||
|
|
||||||
|
/// Disable event notification for queue.
|
||||||
|
fn disable_notification(&self) -> Result<(), VirtQueError>;
|
||||||
|
|
||||||
|
/// Check whether a notification to the guest is needed.
|
||||||
|
fn needs_notification(&self) -> Result<bool, VirtQueError>;
|
||||||
|
|
||||||
|
/// Set vring enabled state.
|
||||||
|
fn set_enabled(&self, enabled: bool);
|
||||||
|
|
||||||
|
/// Set queue addresses for descriptor table, available ring and used ring.
|
||||||
|
fn set_queue_info(
|
||||||
|
&self,
|
||||||
|
desc_table: u64,
|
||||||
|
avail_ring: u64,
|
||||||
|
used_ring: u64,
|
||||||
|
) -> Result<(), VirtQueError>;
|
||||||
|
|
||||||
|
/// Get queue next avail head.
|
||||||
|
fn queue_next_avail(&self) -> u16;
|
||||||
|
|
||||||
|
/// Set queue next avail head.
|
||||||
|
fn set_queue_next_avail(&self, base: u16);
|
||||||
|
|
||||||
|
/// Set configured queue size.
|
||||||
|
fn set_queue_size(&self, num: u16);
|
||||||
|
|
||||||
|
/// Enable/disable queue event index feature.
|
||||||
|
fn set_queue_event_idx(&self, enabled: bool);
|
||||||
|
|
||||||
|
/// Set queue enabled state.
|
||||||
|
fn set_queue_ready(&self, ready: bool);
|
||||||
|
|
||||||
|
/// Set `EventFd` for kick.
|
||||||
|
fn set_kick(&self, file: Option<File>);
|
||||||
|
|
||||||
|
/// Read event from the kick `EventFd`.
|
||||||
|
fn read_kick(&self) -> io::Result<bool>;
|
||||||
|
|
||||||
|
/// Set `EventFd` for call.
|
||||||
|
fn set_call(&self, file: Option<File>);
|
||||||
|
|
||||||
|
/// Set `EventFd` for err.
|
||||||
|
fn set_err(&self, file: Option<File>);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Struct to maintain raw state information for a vhost-user queue.
|
||||||
|
///
|
||||||
|
/// This struct maintains all information of a virito queue, and could be used as an `VringT`
|
||||||
|
/// object for single-threaded context.
|
||||||
|
pub struct VringState<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
|
||||||
|
queue: Queue,
|
||||||
|
kick: Option<EventFd>,
|
||||||
|
call: Option<EventFd>,
|
||||||
|
err: Option<EventFd>,
|
||||||
|
enabled: bool,
|
||||||
|
mem: M,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: GuestAddressSpace> VringState<M> {
|
||||||
|
/// Create a new instance of Vring.
|
||||||
|
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError> {
|
||||||
|
Ok(VringState {
|
||||||
|
queue: Queue::new(max_queue_size)?,
|
||||||
|
kick: None,
|
||||||
|
call: None,
|
||||||
|
err: None,
|
||||||
|
enabled: false,
|
||||||
|
mem,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an immutable reference to the underlying raw `Queue` object.
|
||||||
|
pub fn get_queue(&self) -> &Queue {
|
||||||
|
&self.queue
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a mutable reference to the underlying raw `Queue` object.
|
||||||
|
pub fn get_queue_mut(&mut self) -> &mut Queue {
|
||||||
|
&mut self.queue
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add an used descriptor into the used queue.
|
||||||
|
pub fn add_used(&mut self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
|
||||||
|
self.queue
|
||||||
|
.add_used(self.mem.memory().deref(), desc_index, len)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notify the vhost-user master that used descriptors have been put into the used queue.
|
||||||
|
pub fn signal_used_queue(&self) -> io::Result<()> {
|
||||||
|
if let Some(call) = self.call.as_ref() {
|
||||||
|
call.write(1)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable event notification for queue.
|
||||||
|
pub fn enable_notification(&mut self) -> Result<bool, VirtQueError> {
|
||||||
|
self.queue.enable_notification(self.mem.memory().deref())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disable event notification for queue.
|
||||||
|
pub fn disable_notification(&mut self) -> Result<(), VirtQueError> {
|
||||||
|
self.queue.disable_notification(self.mem.memory().deref())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether a notification to the guest is needed.
|
||||||
|
pub fn needs_notification(&mut self) -> Result<bool, VirtQueError> {
|
||||||
|
self.queue.needs_notification(self.mem.memory().deref())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set vring enabled state.
|
||||||
|
pub fn set_enabled(&mut self, enabled: bool) {
|
||||||
|
self.enabled = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set queue addresses for descriptor table, available ring and used ring.
|
||||||
|
pub fn set_queue_info(
|
||||||
|
&mut self,
|
||||||
|
desc_table: u64,
|
||||||
|
avail_ring: u64,
|
||||||
|
used_ring: u64,
|
||||||
|
) -> Result<(), VirtQueError> {
|
||||||
|
self.queue
|
||||||
|
.try_set_desc_table_address(GuestAddress(desc_table))?;
|
||||||
|
self.queue
|
||||||
|
.try_set_avail_ring_address(GuestAddress(avail_ring))?;
|
||||||
|
self.queue
|
||||||
|
.try_set_used_ring_address(GuestAddress(used_ring))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get queue next avail head.
|
||||||
|
fn queue_next_avail(&self) -> u16 {
|
||||||
|
self.queue.next_avail()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set queue next avail head.
|
||||||
|
fn set_queue_next_avail(&mut self, base: u16) {
|
||||||
|
self.queue.set_next_avail(base);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set configured queue size.
|
||||||
|
fn set_queue_size(&mut self, num: u16) {
|
||||||
|
self.queue.set_size(num);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable/disable queue event index feature.
|
||||||
|
fn set_queue_event_idx(&mut self, enabled: bool) {
|
||||||
|
self.queue.set_event_idx(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set queue enabled state.
|
||||||
|
fn set_queue_ready(&mut self, ready: bool) {
|
||||||
|
self.queue.set_ready(ready);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the `EventFd` for kick.
|
||||||
|
pub fn get_kick(&self) -> &Option<EventFd> {
|
||||||
|
&self.kick
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set `EventFd` for kick.
|
||||||
|
fn set_kick(&mut self, file: Option<File>) {
|
||||||
|
// SAFETY:
|
||||||
|
// EventFd requires that it has sole ownership of its fd. So does File, so this is safe.
|
||||||
|
// Ideally, we'd have a generic way to refer to a uniquely-owned fd, such as that proposed
|
||||||
|
// by Rust RFC #3128.
|
||||||
|
self.kick = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read event from the kick `EventFd`.
|
||||||
|
fn read_kick(&self) -> io::Result<bool> {
|
||||||
|
if let Some(kick) = &self.kick {
|
||||||
|
kick.read()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set `EventFd` for call.
|
||||||
|
fn set_call(&mut self, file: Option<File>) {
|
||||||
|
// SAFETY: see comment in set_kick()
|
||||||
|
self.call = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the `EventFd` for call.
|
||||||
|
pub fn get_call(&self) -> &Option<EventFd> {
|
||||||
|
&self.call
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set `EventFd` for err.
|
||||||
|
fn set_err(&mut self, file: Option<File>) {
|
||||||
|
// SAFETY: see comment in set_kick()
|
||||||
|
self.err = file.map(|f| unsafe { EventFd::from_raw_fd(f.into_raw_fd()) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `VringState` object protected by Mutex for multi-threading context.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct VringMutex<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
|
||||||
|
state: Arc<Mutex<VringState<M>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: GuestAddressSpace> VringMutex<M> {
|
||||||
|
/// Get a mutable guard to the underlying raw `VringState` object.
|
||||||
|
fn lock(&self) -> MutexGuard<VringState<M>> {
|
||||||
|
self.state.lock().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: 'a + GuestAddressSpace> VringStateGuard<'a, M> for VringMutex<M> {
|
||||||
|
type G = MutexGuard<'a, VringState<M>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: 'a + GuestAddressSpace> VringStateMutGuard<'a, M> for VringMutex<M> {
|
||||||
|
type G = MutexGuard<'a, VringState<M>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: 'static + GuestAddressSpace> VringT<M> for VringMutex<M> {
|
||||||
|
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError> {
|
||||||
|
Ok(VringMutex {
|
||||||
|
state: Arc::new(Mutex::new(VringState::new(mem, max_queue_size)?)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_ref(&self) -> <Self as VringStateGuard<M>>::G {
|
||||||
|
self.state.lock().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_mut(&self) -> <Self as VringStateMutGuard<M>>::G {
|
||||||
|
self.lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
|
||||||
|
self.lock().add_used(desc_index, len)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signal_used_queue(&self) -> io::Result<()> {
|
||||||
|
self.get_ref().signal_used_queue()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn enable_notification(&self) -> Result<bool, VirtQueError> {
|
||||||
|
self.lock().enable_notification()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disable_notification(&self) -> Result<(), VirtQueError> {
|
||||||
|
self.lock().disable_notification()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn needs_notification(&self) -> Result<bool, VirtQueError> {
|
||||||
|
self.lock().needs_notification()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_enabled(&self, enabled: bool) {
|
||||||
|
self.lock().set_enabled(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_info(
|
||||||
|
&self,
|
||||||
|
desc_table: u64,
|
||||||
|
avail_ring: u64,
|
||||||
|
used_ring: u64,
|
||||||
|
) -> Result<(), VirtQueError> {
|
||||||
|
self.lock()
|
||||||
|
.set_queue_info(desc_table, avail_ring, used_ring)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queue_next_avail(&self) -> u16 {
|
||||||
|
self.get_ref().queue_next_avail()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_next_avail(&self, base: u16) {
|
||||||
|
self.lock().set_queue_next_avail(base)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_size(&self, num: u16) {
|
||||||
|
self.lock().set_queue_size(num);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_event_idx(&self, enabled: bool) {
|
||||||
|
self.lock().set_queue_event_idx(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_ready(&self, ready: bool) {
|
||||||
|
self.lock().set_queue_ready(ready);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_kick(&self, file: Option<File>) {
|
||||||
|
self.lock().set_kick(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_kick(&self) -> io::Result<bool> {
|
||||||
|
self.get_ref().read_kick()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_call(&self, file: Option<File>) {
|
||||||
|
self.lock().set_call(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_err(&self, file: Option<File>) {
|
||||||
|
self.lock().set_err(file)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `VringState` object protected by RwLock for multi-threading context.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct VringRwLock<M: GuestAddressSpace = GuestMemoryAtomic<GuestMemoryMmap>> {
|
||||||
|
state: Arc<RwLock<VringState<M>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: GuestAddressSpace> VringRwLock<M> {
|
||||||
|
/// Get a mutable guard to the underlying raw `VringState` object.
|
||||||
|
fn write_lock(&self) -> RwLockWriteGuard<VringState<M>> {
|
||||||
|
self.state.write().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: 'a + GuestAddressSpace> VringStateGuard<'a, M> for VringRwLock<M> {
|
||||||
|
type G = RwLockReadGuard<'a, VringState<M>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, M: 'a + GuestAddressSpace> VringStateMutGuard<'a, M> for VringRwLock<M> {
|
||||||
|
type G = RwLockWriteGuard<'a, VringState<M>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M: 'static + GuestAddressSpace> VringT<M> for VringRwLock<M> {
|
||||||
|
fn new(mem: M, max_queue_size: u16) -> Result<Self, VirtQueError> {
|
||||||
|
Ok(VringRwLock {
|
||||||
|
state: Arc::new(RwLock::new(VringState::new(mem, max_queue_size)?)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_ref(&self) -> <Self as VringStateGuard<M>>::G {
|
||||||
|
self.state.read().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_mut(&self) -> <Self as VringStateMutGuard<M>>::G {
|
||||||
|
self.write_lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_used(&self, desc_index: u16, len: u32) -> Result<(), VirtQueError> {
|
||||||
|
self.write_lock().add_used(desc_index, len)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signal_used_queue(&self) -> io::Result<()> {
|
||||||
|
self.get_ref().signal_used_queue()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn enable_notification(&self) -> Result<bool, VirtQueError> {
|
||||||
|
self.write_lock().enable_notification()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disable_notification(&self) -> Result<(), VirtQueError> {
|
||||||
|
self.write_lock().disable_notification()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn needs_notification(&self) -> Result<bool, VirtQueError> {
|
||||||
|
self.write_lock().needs_notification()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_enabled(&self, enabled: bool) {
|
||||||
|
self.write_lock().set_enabled(enabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_info(
|
||||||
|
&self,
|
||||||
|
desc_table: u64,
|
||||||
|
avail_ring: u64,
|
||||||
|
used_ring: u64,
|
||||||
|
) -> Result<(), VirtQueError> {
|
||||||
|
self.write_lock()
|
||||||
|
.set_queue_info(desc_table, avail_ring, used_ring)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn queue_next_avail(&self) -> u16 {
|
||||||
|
self.get_ref().queue_next_avail()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_next_avail(&self, base: u16) {
|
||||||
|
self.write_lock().set_queue_next_avail(base)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_size(&self, num: u16) {
|
||||||
|
self.write_lock().set_queue_size(num);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_event_idx(&self, enabled: bool) {
|
||||||
|
self.write_lock().set_queue_event_idx(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queue_ready(&self, ready: bool) {
|
||||||
|
self.write_lock().set_queue_ready(ready);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_kick(&self, file: Option<File>) {
|
||||||
|
self.write_lock().set_kick(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_kick(&self) -> io::Result<bool> {
|
||||||
|
self.get_ref().read_kick()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_call(&self, file: Option<File>) {
|
||||||
|
self.write_lock().set_call(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_err(&self, file: Option<File>) {
|
||||||
|
self.write_lock().set_err(file)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::os::unix::io::AsRawFd;
|
||||||
|
use vm_memory::bitmap::AtomicBitmap;
|
||||||
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_vring() {
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<AtomicBitmap>::from_ranges(&[(GuestAddress(0x100000), 0x10000)])
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
|
let vring = VringMutex::new(mem, 0x1000).unwrap();
|
||||||
|
|
||||||
|
assert!(vring.get_ref().get_kick().is_none());
|
||||||
|
assert!(!vring.get_mut().enabled);
|
||||||
|
assert!(!vring.lock().queue.ready());
|
||||||
|
assert!(!vring.lock().queue.event_idx_enabled());
|
||||||
|
|
||||||
|
vring.set_enabled(true);
|
||||||
|
assert!(vring.get_ref().enabled);
|
||||||
|
|
||||||
|
vring.set_queue_info(0x100100, 0x100200, 0x100300).unwrap();
|
||||||
|
assert_eq!(vring.lock().get_queue().desc_table(), 0x100100);
|
||||||
|
assert_eq!(vring.lock().get_queue().avail_ring(), 0x100200);
|
||||||
|
assert_eq!(vring.lock().get_queue().used_ring(), 0x100300);
|
||||||
|
|
||||||
|
assert_eq!(vring.queue_next_avail(), 0);
|
||||||
|
vring.set_queue_next_avail(0x20);
|
||||||
|
assert_eq!(vring.queue_next_avail(), 0x20);
|
||||||
|
|
||||||
|
vring.set_queue_size(0x200);
|
||||||
|
assert_eq!(vring.lock().queue.size(), 0x200);
|
||||||
|
|
||||||
|
vring.set_queue_event_idx(true);
|
||||||
|
assert!(vring.lock().queue.event_idx_enabled());
|
||||||
|
|
||||||
|
vring.set_queue_ready(true);
|
||||||
|
assert!(vring.lock().queue.ready());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_vring_set_fd() {
|
||||||
|
let mem = GuestMemoryAtomic::new(
|
||||||
|
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
|
||||||
|
);
|
||||||
|
let vring = VringMutex::new(mem, 0x1000).unwrap();
|
||||||
|
|
||||||
|
vring.set_enabled(true);
|
||||||
|
assert!(vring.get_ref().enabled);
|
||||||
|
|
||||||
|
let eventfd = EventFd::new(0).unwrap();
|
||||||
|
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
|
||||||
|
assert!(vring.get_mut().kick.is_none());
|
||||||
|
assert!(vring.read_kick().unwrap());
|
||||||
|
vring.set_kick(Some(file));
|
||||||
|
eventfd.write(1).unwrap();
|
||||||
|
assert!(vring.read_kick().unwrap());
|
||||||
|
assert!(vring.get_ref().kick.is_some());
|
||||||
|
vring.set_kick(None);
|
||||||
|
assert!(vring.get_ref().kick.is_none());
|
||||||
|
std::mem::forget(eventfd);
|
||||||
|
|
||||||
|
let eventfd = EventFd::new(0).unwrap();
|
||||||
|
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
|
||||||
|
assert!(vring.get_ref().call.is_none());
|
||||||
|
vring.set_call(Some(file));
|
||||||
|
assert!(vring.get_ref().call.is_some());
|
||||||
|
vring.set_call(None);
|
||||||
|
assert!(vring.get_ref().call.is_none());
|
||||||
|
std::mem::forget(eventfd);
|
||||||
|
|
||||||
|
let eventfd = EventFd::new(0).unwrap();
|
||||||
|
let file = unsafe { File::from_raw_fd(eventfd.as_raw_fd()) };
|
||||||
|
assert!(vring.get_ref().err.is_none());
|
||||||
|
vring.set_err(Some(file));
|
||||||
|
assert!(vring.get_ref().err.is_some());
|
||||||
|
vring.set_err(None);
|
||||||
|
assert!(vring.get_ref().err.is_none());
|
||||||
|
std::mem::forget(eventfd);
|
||||||
|
}
|
||||||
|
}
|
||||||
297
crates/vhost-user-backend/tests/vhost-user-server.rs
Normal file
297
crates/vhost-user-backend/tests/vhost-user-server.rs
Normal file
|
|
@ -0,0 +1,297 @@
|
||||||
|
use std::ffi::CString;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::Result;
|
||||||
|
use std::os::unix::io::{AsRawFd, FromRawFd};
|
||||||
|
use std::os::unix::net::UnixStream;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::{Arc, Barrier, Mutex};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use vhost::vhost_user::message::{
|
||||||
|
VhostUserConfigFlags, VhostUserHeaderFlag, VhostUserInflight, VhostUserProtocolFeatures,
|
||||||
|
};
|
||||||
|
use vhost::vhost_user::{Listener, Master, SlaveFsCacheReq, VhostUserMaster};
|
||||||
|
use vhost::{VhostBackend, VhostUserMemoryRegionInfo, VringConfigData};
|
||||||
|
use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock};
|
||||||
|
use vm_memory::{
|
||||||
|
FileOffset, GuestAddress, GuestAddressSpace, GuestMemory, GuestMemoryAtomic, GuestMemoryMmap,
|
||||||
|
};
|
||||||
|
use vmm_sys_util::epoll::EventSet;
|
||||||
|
use vmm_sys_util::eventfd::EventFd;
|
||||||
|
|
||||||
|
struct MockVhostBackend {
|
||||||
|
events: u64,
|
||||||
|
event_idx: bool,
|
||||||
|
acked_features: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MockVhostBackend {
|
||||||
|
fn new() -> Self {
|
||||||
|
MockVhostBackend {
|
||||||
|
events: 0,
|
||||||
|
event_idx: false,
|
||||||
|
acked_features: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VhostUserBackendMut<VringRwLock, ()> for MockVhostBackend {
|
||||||
|
fn num_queues(&self) -> usize {
|
||||||
|
2
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_queue_size(&self) -> usize {
|
||||||
|
256
|
||||||
|
}
|
||||||
|
|
||||||
|
fn features(&self) -> u64 {
|
||||||
|
0xffff_ffff_ffff_ffff
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acked_features(&mut self, features: u64) {
|
||||||
|
self.acked_features = features;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||||
|
VhostUserProtocolFeatures::all()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_event_idx(&mut self, enabled: bool) {
|
||||||
|
self.event_idx = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
|
||||||
|
assert_eq!(offset, 0x200);
|
||||||
|
assert_eq!(size, 8);
|
||||||
|
|
||||||
|
vec![0xa5u8; 8]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_config(&mut self, offset: u32, buf: &[u8]) -> Result<()> {
|
||||||
|
assert_eq!(offset, 0x200);
|
||||||
|
assert_eq!(buf, &[0xa5u8; 8]);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_memory(&mut self, atomic_mem: GuestMemoryAtomic<GuestMemoryMmap>) -> Result<()> {
|
||||||
|
let mem = atomic_mem.memory();
|
||||||
|
let region = mem.find_region(GuestAddress(0x100000)).unwrap();
|
||||||
|
assert_eq!(region.size(), 0x100000);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
|
||||||
|
|
||||||
|
fn queues_per_thread(&self) -> Vec<u64> {
|
||||||
|
vec![1, 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
|
||||||
|
let event_fd = EventFd::new(0).unwrap();
|
||||||
|
|
||||||
|
Some(event_fd)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(
|
||||||
|
&mut self,
|
||||||
|
_device_event: u16,
|
||||||
|
_evset: EventSet,
|
||||||
|
_vrings: &[VringRwLock],
|
||||||
|
_thread_id: usize,
|
||||||
|
) -> Result<bool> {
|
||||||
|
self.events += 1;
|
||||||
|
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_master(path: &Path, barrier: Arc<Barrier>) -> Master {
|
||||||
|
barrier.wait();
|
||||||
|
let mut master = Master::connect(path, 1).unwrap();
|
||||||
|
master.set_hdr_flags(VhostUserHeaderFlag::NEED_REPLY);
|
||||||
|
// Wait before issue service requests.
|
||||||
|
barrier.wait();
|
||||||
|
|
||||||
|
let features = master.get_features().unwrap();
|
||||||
|
let proto = master.get_protocol_features().unwrap();
|
||||||
|
master.set_features(features).unwrap();
|
||||||
|
master.set_protocol_features(proto).unwrap();
|
||||||
|
assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK));
|
||||||
|
|
||||||
|
master
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vhost_user_client(path: &Path, barrier: Arc<Barrier>) {
|
||||||
|
barrier.wait();
|
||||||
|
let mut master = Master::connect(path, 1).unwrap();
|
||||||
|
master.set_hdr_flags(VhostUserHeaderFlag::NEED_REPLY);
|
||||||
|
// Wait before issue service requests.
|
||||||
|
barrier.wait();
|
||||||
|
|
||||||
|
let features = master.get_features().unwrap();
|
||||||
|
let proto = master.get_protocol_features().unwrap();
|
||||||
|
master.set_features(features).unwrap();
|
||||||
|
master.set_protocol_features(proto).unwrap();
|
||||||
|
assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK));
|
||||||
|
|
||||||
|
let queue_num = master.get_queue_num().unwrap();
|
||||||
|
assert_eq!(queue_num, 2);
|
||||||
|
|
||||||
|
master.set_owner().unwrap();
|
||||||
|
//master.set_owner().unwrap_err();
|
||||||
|
master.reset_owner().unwrap();
|
||||||
|
master.reset_owner().unwrap();
|
||||||
|
master.set_owner().unwrap();
|
||||||
|
|
||||||
|
master.set_features(features).unwrap();
|
||||||
|
master.set_protocol_features(proto).unwrap();
|
||||||
|
assert!(proto.contains(VhostUserProtocolFeatures::REPLY_ACK));
|
||||||
|
|
||||||
|
let memfd = nix::sys::memfd::memfd_create(
|
||||||
|
&CString::new("test").unwrap(),
|
||||||
|
nix::sys::memfd::MemFdCreateFlag::empty(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let file = unsafe { File::from_raw_fd(memfd) };
|
||||||
|
file.set_len(0x100000).unwrap();
|
||||||
|
let file_offset = FileOffset::new(file, 0);
|
||||||
|
let mem = GuestMemoryMmap::<()>::from_ranges_with_files(&[(
|
||||||
|
GuestAddress(0x100000),
|
||||||
|
0x100000,
|
||||||
|
Some(file_offset),
|
||||||
|
)])
|
||||||
|
.unwrap();
|
||||||
|
let addr = mem.get_host_address(GuestAddress(0x100000)).unwrap() as u64;
|
||||||
|
let reg = mem.find_region(GuestAddress(0x100000)).unwrap();
|
||||||
|
let fd = reg.file_offset().unwrap();
|
||||||
|
let regions = [VhostUserMemoryRegionInfo {
|
||||||
|
guest_phys_addr: 0x100000,
|
||||||
|
memory_size: 0x100000,
|
||||||
|
userspace_addr: addr,
|
||||||
|
mmap_offset: 0,
|
||||||
|
mmap_handle: fd.file().as_raw_fd(),
|
||||||
|
}];
|
||||||
|
master.set_mem_table(®ions).unwrap();
|
||||||
|
|
||||||
|
master.set_vring_num(0, 256).unwrap();
|
||||||
|
|
||||||
|
let config = VringConfigData {
|
||||||
|
queue_max_size: 256,
|
||||||
|
queue_size: 256,
|
||||||
|
flags: 0,
|
||||||
|
desc_table_addr: addr,
|
||||||
|
used_ring_addr: addr + 0x10000,
|
||||||
|
avail_ring_addr: addr + 0x20000,
|
||||||
|
log_addr: None,
|
||||||
|
};
|
||||||
|
master.set_vring_addr(0, &config).unwrap();
|
||||||
|
|
||||||
|
let eventfd = EventFd::new(0).unwrap();
|
||||||
|
master.set_vring_kick(0, &eventfd).unwrap();
|
||||||
|
master.set_vring_call(0, &eventfd).unwrap();
|
||||||
|
master.set_vring_err(0, &eventfd).unwrap();
|
||||||
|
master.set_vring_enable(0, true).unwrap();
|
||||||
|
|
||||||
|
let buf = [0u8; 8];
|
||||||
|
let (_cfg, data) = master
|
||||||
|
.get_config(0x200, 8, VhostUserConfigFlags::empty(), &buf)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(&data, &[0xa5u8; 8]);
|
||||||
|
master
|
||||||
|
.set_config(0x200, VhostUserConfigFlags::empty(), &data)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (tx, _rx) = UnixStream::pair().unwrap();
|
||||||
|
master.set_slave_request_fd(&tx).unwrap();
|
||||||
|
|
||||||
|
let state = master.get_vring_base(0).unwrap();
|
||||||
|
master.set_vring_base(0, state as u16).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(master.get_max_mem_slots().unwrap(), 32);
|
||||||
|
let region = VhostUserMemoryRegionInfo {
|
||||||
|
guest_phys_addr: 0x800000,
|
||||||
|
memory_size: 0x100000,
|
||||||
|
userspace_addr: addr,
|
||||||
|
mmap_offset: 0,
|
||||||
|
mmap_handle: fd.file().as_raw_fd(),
|
||||||
|
};
|
||||||
|
master.add_mem_region(®ion).unwrap();
|
||||||
|
master.remove_mem_region(®ion).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vhost_user_server(cb: fn(&Path, Arc<Barrier>)) {
|
||||||
|
let mem = GuestMemoryAtomic::new(GuestMemoryMmap::<()>::new());
|
||||||
|
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
|
||||||
|
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
|
||||||
|
|
||||||
|
let barrier = Arc::new(Barrier::new(2));
|
||||||
|
let tmpdir = tempfile::tempdir().unwrap();
|
||||||
|
let mut path = tmpdir.path().to_path_buf();
|
||||||
|
path.push("socket");
|
||||||
|
|
||||||
|
let barrier2 = barrier.clone();
|
||||||
|
let path1 = path.clone();
|
||||||
|
let thread = thread::spawn(move || cb(&path1, barrier2));
|
||||||
|
|
||||||
|
let listener = Listener::new(&path, false).unwrap();
|
||||||
|
barrier.wait();
|
||||||
|
daemon.start(listener).unwrap();
|
||||||
|
barrier.wait();
|
||||||
|
|
||||||
|
// handle service requests from clients.
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_vhost_user_server() {
|
||||||
|
vhost_user_server(vhost_user_client);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vhost_user_enable(path: &Path, barrier: Arc<Barrier>) {
|
||||||
|
let master = setup_master(path, barrier);
|
||||||
|
master.set_owner().unwrap();
|
||||||
|
master.set_owner().unwrap_err();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_vhost_user_enable() {
|
||||||
|
vhost_user_server(vhost_user_enable);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vhost_user_set_inflight(path: &Path, barrier: Arc<Barrier>) {
|
||||||
|
let mut master = setup_master(path, barrier);
|
||||||
|
let eventfd = EventFd::new(0).unwrap();
|
||||||
|
// No implementation for inflight_fd yet.
|
||||||
|
let inflight = VhostUserInflight {
|
||||||
|
mmap_size: 0x100000,
|
||||||
|
mmap_offset: 0,
|
||||||
|
num_queues: 1,
|
||||||
|
queue_size: 256,
|
||||||
|
};
|
||||||
|
master
|
||||||
|
.set_inflight_fd(&inflight, eventfd.as_raw_fd())
|
||||||
|
.unwrap_err();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_vhost_user_set_inflight() {
|
||||||
|
vhost_user_server(vhost_user_set_inflight);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vhost_user_get_inflight(path: &Path, barrier: Arc<Barrier>) {
|
||||||
|
let mut master = setup_master(path, barrier);
|
||||||
|
// No implementation for inflight_fd yet.
|
||||||
|
let inflight = VhostUserInflight {
|
||||||
|
mmap_size: 0x100000,
|
||||||
|
mmap_offset: 0,
|
||||||
|
num_queues: 1,
|
||||||
|
queue_size: 256,
|
||||||
|
};
|
||||||
|
assert!(master.get_inflight_fd(&inflight).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_vhost_user_get_inflight() {
|
||||||
|
vhost_user_server(vhost_user_get_inflight);
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue