block, virtio-devices: Support request submission in batch

Cache and batch IO requests after parsing all
items in the queue, improving performance—especially
for small block sizes—by reducing per-request overhead.

Introduced two methods in the AsyncIo trait for batch
submission, with implementation in the raw disk backend.
This method should be called during/after parsing all block IO requests
in the available queue. If the batch submission is not enabled, by
default it does the old way of submitting requests.

Signed-off-by: Bo Chen <bchen@crusoe.ai>
Signed-off-by: Muminul Islam <muislam@microsoft.com>
This commit is contained in:
Muminul Islam 2025-08-13 00:43:36 +00:00 committed by Bo Chen
parent 67ab81874a
commit 245bce23fa
3 changed files with 81 additions and 10 deletions

View file

@ -8,7 +8,7 @@ use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use thiserror::Error;
use vmm_sys_util::eventfd::EventFd;
use crate::DiskTopology;
use crate::{BatchRequest, DiskTopology};
#[derive(Error, Debug)]
pub enum DiskFileError {
@ -99,4 +99,10 @@ pub trait AsyncIo: Send {
) -> AsyncIoResult<()>;
fn fsync(&mut self, user_data: Option<u64>) -> AsyncIoResult<()>;
fn next_completed_request(&mut self) -> Option<(u64, i32)>;
fn batch_requests_enabled(&self) -> bool {
false
}
fn submit_batch_requests(&mut self, _batch_request: &[BatchRequest]) -> AsyncIoResult<()> {
Ok(())
}
}

View file

@ -232,9 +232,18 @@ pub struct AlignedOperation {
layout: Layout,
}
pub struct BatchRequest {
pub offset: libc::off_t,
pub iovecs: SmallVec<[libc::iovec; DEFAULT_DESCRIPTOR_VEC_SIZE]>,
pub user_data: u64,
pub request_type: RequestType,
}
pub struct ExecuteAsync {
// `true` if the execution will complete asynchronously
pub async_complete: bool,
// request need to be batched for submission if any
pub batch_request: Option<BatchRequest>,
}
#[derive(Debug)]
@ -480,6 +489,7 @@ impl Request {
let mut ret = ExecuteAsync {
async_complete: true,
batch_request: None,
};
// Queue operations expected to be submitted.
match request_type {
@ -490,14 +500,32 @@ impl Request {
.bitmap()
.mark_dirty(0, *data_len as usize);
}
disk_image
.read_vectored(offset, &iovecs, user_data)
.map_err(ExecuteError::AsyncRead)?;
if disk_image.batch_requests_enabled() {
ret.batch_request = Some(BatchRequest {
offset,
iovecs,
user_data,
request_type,
});
} else {
disk_image
.read_vectored(offset, &iovecs, user_data)
.map_err(ExecuteError::AsyncRead)?;
}
}
RequestType::Out => {
disk_image
.write_vectored(offset, &iovecs, user_data)
.map_err(ExecuteError::AsyncWrite)?;
if disk_image.batch_requests_enabled() {
ret.batch_request = Some(BatchRequest {
offset,
iovecs,
user_data,
request_type,
});
} else {
disk_image
.write_vectored(offset, &iovecs, user_data)
.map_err(ExecuteError::AsyncWrite)?;
}
}
RequestType::Flush => {
disk_image

View file

@ -167,6 +167,8 @@ impl BlockEpollHandler {
fn process_queue_submit(&mut self) -> Result<()> {
let queue = &mut self.queue;
let mut batch_requests = Vec::new();
let mut batch_inflight_requests = Vec::new();
while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
@ -236,11 +238,21 @@ impl BlockEpollHandler {
if let Ok(ExecuteAsync {
async_complete: true,
..
batch_request,
}) = result
{
self.inflight_requests
.push_back((desc_chain.head_index(), request));
if let Some(batch_request) = batch_request {
match batch_request.request_type {
RequestType::In | RequestType::Out => batch_requests.push(batch_request),
_ => {
unreachable!(
"Unexpected batch request type: {:?}",
request.request_type
)
}
}
}
batch_inflight_requests.push((desc_chain.head_index(), request));
} else {
let status = match result {
Ok(_) => VIRTIO_BLK_S_OK,
@ -266,6 +278,31 @@ impl BlockEpollHandler {
}
}
match self.disk_image.submit_batch_requests(&batch_requests) {
Ok(()) => {
self.inflight_requests.extend(batch_inflight_requests);
}
Err(e) => {
// If batch submission fails, report VIRTIO_BLK_S_IOERR for all requests.
for (user_data, request) in batch_inflight_requests {
warn!(
"Request failed with batch submission: {:x?} {:?}",
request, e
);
let desc_index = user_data;
let mem = self.mem.memory();
mem.write_obj(VIRTIO_BLK_S_IOERR as u8, request.status_addr)
.map_err(Error::RequestStatus)?;
queue
.add_used(mem.deref(), desc_index, 0)
.map_err(Error::QueueAddUsed)?;
queue
.enable_notification(mem.deref())
.map_err(Error::QueueEnableNotification)?;
}
}
}
Ok(())
}