vmm: refactor live migration receive into state machine

Previously, state that we accumulated during the migration process in
the receiver was kept in `mut Option` variables or HashMaps. The
problem is that it is unclear in the code when these variables can be
safely used. It's also difficult to add new state, such as the state
we need to handle the upcoming feature for performing the migration
using multiple connections.

To solve this, I've refactored the code to use the state machine
pattern. Each state carries the data it needs to. Any state that
didn't arrive yet (memory_files, memory_manager) cannot be accessed
until we are in the proper state.

Some benefits that fall out of this:

- We now respond to all requests, even invalid ones, at least with an
  error message.
- Any error handling a request will result in an error message being
  sent.
- There is only a single place where responses are sent and thus it's
  very hard to mess this up in the code.
- The main protocol state machine fits on a screen.

I would argue that especially the error cases are now much more
consistent. There is still a lot to be done. There is still state
transfer via self.vm and similar. In an ideal world, this would also
be carried by the state machine. I also want to see better handling of
payloads, which are still handled all over the place, but this change
is already too big. :)

Co-authored-by: Philipp Schuster <philipp.schuster@cyberus-technology.de>
On-behalf-of: SAP julian.stecklina@sap.com
On-behalf-of: SAP philipp.schuster@sap.com
Signed-off-by: Julian Stecklina <julian.stecklina@cyberus-technology.de>
Signed-off-by: Philipp Schuster <philipp.schuster@cyberus-technology.de>
This commit is contained in:
Julian Stecklina 2025-10-09 15:48:31 +02:00 committed by Rob Bradford
parent 1861bc49e7
commit dbb148b216

View file

@ -27,7 +27,7 @@ use console_devices::{ConsoleInfo, pre_create_console_devices};
use event_monitor::event;
use landlock::LandlockError;
use libc::{EFD_NONBLOCK, SIGINT, SIGTERM, TCSANOW, tcsetattr, termios};
use log::{error, info, warn};
use log::{error, info, trace, warn};
use memory_manager::MemoryManagerSnapshotData;
use pci::PciBdf;
use seccompiler::{SeccompAction, apply_filter};
@ -668,6 +668,39 @@ pub struct Vmm {
console_info: Option<ConsoleInfo>,
}
/// The receiver's state machine behind the migration protocol.
enum ReceiveMigrationState {
/// The connection is established and we haven't received any commands yet.
Established,
/// We received the start command.
Started,
/// We received file descriptors for memory. This can only happen on UNIX domain sockets.
MemoryFdsReceived(Vec<(u32, File)>),
/// We received the VM configuration. We keep the memory configuration around to populate guest memory. From this point on, the sender can start sending memory updates.
Configured(Arc<Mutex<MemoryManager>>),
/// Memory is populated and we received the state. The VM is ready to go.
StateReceived,
/// The migration is successful.
Completed,
/// The migration couldn't complete, either due to an error or because the sender abandoned the migration.
Aborted,
}
impl ReceiveMigrationState {
fn finished(&self) -> bool {
matches!(
self,
ReceiveMigrationState::Completed | ReceiveMigrationState::Aborted
)
}
}
impl Vmm {
pub const HANDLED_SIGNALS: [i32; 2] = [SIGTERM, SIGINT];
@ -839,6 +872,92 @@ impl Vmm {
}
}
/// Handle a migration command and advance the protocol state machine.
///
/// **Note**: This function is responsible for consuming any payloads! It also must
/// _not_ write any response to the socket.
fn vm_receive_migration_step(
&mut self,
socket: &mut SocketStream,
state: ReceiveMigrationState,
req: &Request,
_receive_data_migration: &VmReceiveMigrationData,
) -> std::result::Result<ReceiveMigrationState, MigratableError> {
use ReceiveMigrationState::*;
let invalid_command = || {
Err(MigratableError::MigrateReceive(anyhow!(
"Can't handle command in current state"
)))
};
let mut configure_vm =
|socket: &mut SocketStream,
memory_files: HashMap<u32, File>|
-> std::result::Result<Arc<Mutex<MemoryManager>>, MigratableError> {
let memory_manager = self.vm_receive_config(req, socket, memory_files)?;
Ok(memory_manager)
};
let recv_memory_fd = |socket: &mut SocketStream,
mut memory_files: Vec<(u32, File)>|
-> std::result::Result<Vec<(u32, File)>, MigratableError> {
let (slot, file) = Self::vm_receive_memory_fd(socket)?;
memory_files.push((slot, file));
Ok(memory_files)
};
if req.command() == Command::Abandon {
info!("Abandon Command Received");
return Ok(Aborted);
}
match state {
Established => match req.command() {
Command::Start => Ok(Started),
_ => invalid_command(),
},
Started => match req.command() {
Command::MemoryFd => recv_memory_fd(socket, Vec::new()).map(MemoryFdsReceived),
Command::Config => configure_vm(socket, Default::default()).map(Configured),
_ => invalid_command(),
},
MemoryFdsReceived(memory_files) => match req.command() {
Command::MemoryFd => recv_memory_fd(socket, memory_files).map(MemoryFdsReceived),
Command::Config => {
configure_vm(socket, HashMap::from_iter(memory_files)).map(Configured)
}
_ => invalid_command(),
},
Configured(memory_manager) => match req.command() {
Command::Memory => {
self.vm_receive_memory(req, socket, &mut memory_manager.lock().unwrap())?;
Ok(Configured(memory_manager))
}
Command::State => {
self.vm_receive_state(req, socket, memory_manager.clone())?;
Ok(StateReceived)
}
_ => invalid_command(),
},
StateReceived => match req.command() {
Command::Complete => {
// The unwrap is safe, because the state machine makes sure we called
// vm_receive_state before, which creates the VM.
let vm = self.vm.as_mut().unwrap();
vm.resume()?;
Ok(Completed)
}
_ => invalid_command(),
},
Completed | Aborted => {
unreachable!("Performed a step on the finished state machine")
}
}
}
fn vm_receive_config<T>(
&mut self,
req: &Request,
@ -846,7 +965,7 @@ impl Vmm {
existing_memory_files: HashMap<u32, File>,
) -> std::result::Result<Arc<Mutex<MemoryManager>>, MigratableError>
where
T: Read + Write,
T: Read,
{
// Read in config data along with memory manager data
let mut data: Vec<u8> = Vec::new();
@ -926,8 +1045,6 @@ impl Vmm {
))
})?;
Response::ok().write_to(socket)?;
Ok(memory_manager)
}
@ -938,7 +1055,7 @@ impl Vmm {
mm: Arc<Mutex<MemoryManager>>,
) -> std::result::Result<(), MigratableError>
where
T: Read + Write,
T: Read,
{
// Read in state data
let mut data: Vec<u8> = Vec::new();
@ -991,13 +1108,10 @@ impl Vmm {
// Create VM
vm.restore().map_err(|e| {
Response::error().write_to(socket).ok();
MigratableError::MigrateReceive(anyhow!("Failed restoring the Vm: {e}"))
})?;
self.vm = Some(vm);
Response::ok().write_to(socket)?;
Ok(())
}
@ -1008,18 +1122,13 @@ impl Vmm {
memory_manager: &mut MemoryManager,
) -> std::result::Result<(), MigratableError>
where
T: Read + ReadVolatile + Write,
T: Read + ReadVolatile,
{
// Read table
let table = MemoryRangeTable::read_from(socket, req.length())?;
// And then read the memory itself
memory_manager
.receive_memory_regions(&table, socket)
.inspect_err(|_| {
Response::error().write_to(socket).ok();
})?;
Response::ok().write_to(socket)?;
memory_manager.receive_memory_regions(&table, socket)?;
Ok(())
}
@ -2143,95 +2252,37 @@ impl RequestHandler for Vmm {
// Accept the connection and get the socket
let mut socket = Vmm::receive_migration_socket(&receive_data_migration.receiver_url)?;
let mut started = false;
let mut memory_manager: Option<Arc<Mutex<MemoryManager>>> = None;
let mut existing_memory_files = vec![];
loop {
let mut state = ReceiveMigrationState::Established;
while !state.finished() {
let req = Request::read_from(&mut socket)?;
match req.command() {
Command::Invalid => info!("Invalid Command Received"),
Command::Start => {
info!("Start Command Received");
started = true;
trace!("Command {:?} received", req.command());
Response::ok().write_to(&mut socket)?;
let (response, new_state) = match self.vm_receive_migration_step(
&mut socket,
state,
&req,
&receive_data_migration,
) {
Ok(next_state) => (Response::ok(), next_state),
Err(err) => {
warn!(
"Migration aborted as migration command {:?} failed: {}",
req.command(),
err
);
(Response::error(), ReceiveMigrationState::Aborted)
}
Command::Config => {
info!("Config Command Received");
};
if !started {
warn!("Migration not started yet");
Response::error().write_to(&mut socket)?;
continue;
}
memory_manager = Some(self.vm_receive_config(
&req,
&mut socket,
HashMap::from_iter(existing_memory_files.drain(..)),
)?);
}
Command::State => {
info!("State Command Received");
state = new_state;
assert_eq!(response.length(), 0);
response.write_to(&mut socket)?;
}
if !started {
warn!("Migration not started yet");
Response::error().write_to(&mut socket)?;
continue;
}
if let Some(mm) = memory_manager.take() {
self.vm_receive_state(&req, &mut socket, mm)?;
} else {
warn!("Configuration not sent yet");
Response::error().write_to(&mut socket)?;
}
}
Command::Memory => {
info!("Memory Command Received");
if !started {
warn!("Migration not started yet");
Response::error().write_to(&mut socket)?;
continue;
}
if let Some(mm) = memory_manager.as_ref() {
self.vm_receive_memory(&req, &mut socket, &mut mm.lock().unwrap())?;
} else {
warn!("Configuration not sent yet");
Response::error().write_to(&mut socket)?;
}
}
Command::MemoryFd => {
info!("MemoryFd Command Received");
if !started {
warn!("Migration not started yet");
Response::error().write_to(&mut socket)?;
continue;
}
existing_memory_files.push(Self::vm_receive_memory_fd(&mut socket)?);
Response::ok().write_to(&mut socket)?;
}
Command::Complete => {
info!("Complete Command Received");
if let Some(ref mut vm) = self.vm.as_mut() {
vm.resume()?;
Response::ok().write_to(&mut socket)?;
} else {
warn!("VM not created yet");
Response::error().write_to(&mut socket)?;
}
break;
}
Command::Abandon => {
info!("Abandon Command Received");
self.vm = None;
self.vm_config = None;
Response::ok().write_to(&mut socket).ok();
break;
}
}
if let ReceiveMigrationState::Aborted = state {
self.vm = None;
self.vm_config = None;
}
Ok(())