diff --git a/Cargo.lock b/Cargo.lock index 0026b5ce5..6a5175e64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1114,6 +1114,8 @@ dependencies = [ "epoll", "libc", "once_cell", + "serde", + "serde_json", "ssh2", "vmm-sys-util", "wait-timeout", diff --git a/performance-metrics/src/main.rs b/performance-metrics/src/main.rs index 67969794f..d078eaa64 100644 --- a/performance-metrics/src/main.rs +++ b/performance-metrics/src/main.rs @@ -20,6 +20,7 @@ use std::{ thread, time::Duration, }; +use test_infra::FioOps; use thiserror::Error; #[derive(Error, Debug)] diff --git a/performance-metrics/src/performance_tests.rs b/performance-metrics/src/performance_tests.rs index eb6388211..69aa47df3 100644 --- a/performance-metrics/src/performance_tests.rs +++ b/performance-metrics/src/performance_tests.rs @@ -6,40 +6,23 @@ // Performance tests use crate::{mean, PerformanceTestControl}; -use serde_json::Value; -use std::path::{Path, PathBuf}; -use std::process::{Child, Command, Stdio}; +use std::fs; +use std::path::PathBuf; use std::string::String; use std::thread; use std::time::Duration; -use std::{fmt, fs}; use test_infra::Error as InfraError; use test_infra::*; -use wait_timeout::ChildExt; #[cfg(target_arch = "x86_64")] pub const FOCAL_IMAGE_NAME: &str = "focal-server-cloudimg-amd64-custom-20210609-0.raw"; #[cfg(target_arch = "aarch64")] pub const FOCAL_IMAGE_NAME: &str = "focal-server-cloudimg-arm64-custom-20210929-0-update-tool.raw"; -#[derive(Debug)] -enum WaitTimeoutError { - Timedout, - ExitStatus, - General(std::io::Error), -} - #[derive(Debug)] enum Error { BootTimeParse, - EthrLogFile(std::io::Error), - EthrLogParse, - FioOutputParse, - Iperf3Parse, Infra(InfraError), - Spawn(std::io::Error), - Scp(SshCommandError), - WaitTimeout(WaitTimeoutError), } impl From for Error { @@ -94,128 +77,6 @@ fn direct_kernel_boot_path() -> PathBuf { kernel_path } -// Wait the child process for a given timeout -fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { - match child.wait_timeout(Duration::from_secs(timeout)) { - Err(e) => { - return Err(WaitTimeoutError::General(e)); - } - Ok(s) => match s { - None => { - return Err(WaitTimeoutError::Timedout); - } - Some(s) => { - if !s.success() { - return Err(WaitTimeoutError::ExitStatus); - } - } - }, - } - - Ok(()) -} - -fn parse_iperf3_output(output: &[u8], sender: bool) -> Result { - std::panic::catch_unwind(|| { - let s = String::from_utf8_lossy(output); - let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); - - let bps: f64 = if sender { - v["end"]["sum_sent"]["bits_per_second"] - .as_f64() - .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") - } else { - v["end"]["sum_received"]["bits_per_second"] - .as_f64() - .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'") - }; - - bps - }) - .map_err(|_| { - eprintln!( - "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n", - String::from_utf8_lossy(output) - ); - Error::Iperf3Parse - }) -} - -fn measure_virtio_net_throughput( - test_timeout: u32, - queue_pairs: u32, - guest: &Guest, - receive: bool, -) -> Result { - let default_port = 5201; - - // 1. start the iperf3 server on the guest - for n in 0..queue_pairs { - guest - .ssh_command(&format!("iperf3 -s -p {} -D", default_port + n)) - .map_err(InfraError::SshCommand)?; - } - - thread::sleep(Duration::new(1, 0)); - - // 2. start the iperf3 client on host to measure RX through-put - let mut clients = Vec::new(); - for n in 0..queue_pairs { - let mut cmd = Command::new("iperf3"); - cmd.args(&[ - "-J", // Output in JSON format - "-c", - &guest.network.guest_ip, - "-p", - &format!("{}", default_port + n), - "-t", - &format!("{}", test_timeout), - ]); - // For measuring the guest transmit throughput (as a sender), - // use reverse mode of the iperf3 client on the host - if !receive { - cmd.args(&["-R"]); - } - let client = cmd - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .map_err(Error::Spawn)?; - - clients.push(client); - } - - let mut err: Option = None; - let mut bps = Vec::new(); - let mut failed = false; - for c in clients { - let mut c = c; - if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { - err = Some(Error::WaitTimeout(e)); - failed = true; - } - - if !failed { - // Safe to unwrap as we know the child has terminated succesffully - let output = c.wait_with_output().unwrap(); - bps.push(parse_iperf3_output(&output.stdout, receive)?); - } else { - let _ = c.kill(); - let output = c.wait_with_output().unwrap(); - println!( - "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", - String::from_utf8_lossy(&output.stdout) - ); - } - } - - if let Some(e) = err { - Err(e) - } else { - Ok(bps.iter().sum()) - } -} - pub fn performance_net_throughput(control: &PerformanceTestControl) -> f64 { let test_timeout = control.test_timeout; let rx = control.net_rx.unwrap(); @@ -260,95 +121,6 @@ pub fn performance_net_throughput(control: &PerformanceTestControl) -> f64 { } } -fn parse_ethr_latency_output(output: &[u8]) -> Result, Error> { - std::panic::catch_unwind(|| { - let s = String::from_utf8_lossy(output); - let mut latency = Vec::new(); - for l in s.lines() { - let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); - // Skip header/summary lines - if let Some(avg) = v["Avg"].as_str() { - // Assume the latency unit is always "us" - latency.push( - avg.split("us").collect::>()[0] - .parse::() - .expect("'ethr' parse error: invalid 'Avg' entry"), - ); - } - } - - assert!( - !latency.is_empty(), - "'ethr' parse error: no valid latency data found" - ); - - latency - }) - .map_err(|_| { - eprintln!( - "=============== ethr output ===============\n\n{}\n\n===========end============\n\n", - String::from_utf8_lossy(output) - ); - Error::EthrLogParse - }) -} - -fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result, Error> { - // copy the 'ethr' tool to the guest image - let ethr_path = "/usr/local/bin/ethr"; - let ethr_remote_path = "/tmp/ethr"; - scp_to_guest( - Path::new(ethr_path), - Path::new(ethr_remote_path), - &guest.network.guest_ip, - //DEFAULT_SSH_RETRIES, - 1, - DEFAULT_SSH_TIMEOUT, - ) - .map_err(Error::Scp)?; - - // Start the ethr server on the guest - guest - .ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path)) - .map_err(InfraError::SshCommand)?; - - thread::sleep(Duration::new(10, 0)); - - // Start the ethr client on the host - let log_file = guest - .tmp_dir - .as_path() - .join("ethr.client.log") - .to_str() - .unwrap() - .to_string(); - let mut c = Command::new(ethr_path) - .args(&[ - "-c", - &guest.network.guest_ip, - "-t", - "l", - "-o", - &log_file, // file output is JSON format - "-d", - &format!("{}s", test_timeout), - ]) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .map_err(Error::Spawn)?; - - if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) - { - let _ = c.kill(); - return Err(e); - } - - // Parse the ethr latency test output - let content = fs::read(log_file).map_err(Error::EthrLogFile)?; - parse_ethr_latency_output(&content) -} - pub fn performance_net_latency(control: &PerformanceTestControl) -> f64 { let focal = UbuntuDiskConfig::new(FOCAL_IMAGE_NAME.to_string()); let guest = performance_test_new_guest(Box::new(focal)); @@ -561,78 +333,6 @@ pub fn performance_boot_time_pmem(control: &PerformanceTestControl) -> f64 { } } -pub enum FioOps { - Read, - RandomRead, - Write, - RandomWrite, -} - -impl fmt::Display for FioOps { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - FioOps::Read => write!(f, "read"), - FioOps::RandomRead => write!(f, "randread"), - FioOps::Write => write!(f, "write"), - FioOps::RandomWrite => write!(f, "randwrite"), - } - } -} - -fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result { - std::panic::catch_unwind(|| { - let v: Value = - serde_json::from_str(output).expect("'fio' parse error: invalid json output"); - let jobs = v["jobs"] - .as_array() - .expect("'fio' parse error: missing entry 'jobs'"); - assert_eq!( - jobs.len(), - num_jobs as usize, - "'fio' parse error: Unexpected number of 'fio' jobs." - ); - - let read = match fio_ops { - FioOps::Read | FioOps::RandomRead => true, - FioOps::Write | FioOps::RandomWrite => false, - }; - - let mut total_bps = 0_f64; - for j in jobs { - if read { - let bytes = j["read"]["io_bytes"] - .as_u64() - .expect("'fio' parse error: missing entry 'read.io_bytes'"); - let runtime = j["read"]["runtime"] - .as_u64() - .expect("'fio' parse error: missing entry 'read.runtime'") - as f64 - / 1000_f64; - total_bps += bytes as f64 / runtime as f64; - } else { - let bytes = j["write"]["io_bytes"] - .as_u64() - .expect("'fio' parse error: missing entry 'write.io_bytes'"); - let runtime = j["write"]["runtime"] - .as_u64() - .expect("'fio' parse error: missing entry 'write.runtime'") - as f64 - / 1000_f64; - total_bps += bytes as f64 / runtime as f64; - } - } - - total_bps - }) - .map_err(|_| { - eprintln!( - "=============== Fio output ===============\n\n{}\n\n===========end============\n\n", - output - ); - Error::FioOutputParse - }) -} - pub fn performance_block_io(control: &PerformanceTestControl) -> f64 { let test_timeout = control.test_timeout; let num_queues = control.num_queues.unwrap(); diff --git a/test_infra/Cargo.toml b/test_infra/Cargo.toml index b95b8e3dc..e7473ff4d 100644 --- a/test_infra/Cargo.toml +++ b/test_infra/Cargo.toml @@ -9,6 +9,8 @@ dirs = "4.0.0" epoll = "4.3.1" libc = "0.2.132" once_cell = "1.14.0" +serde = { version = "1.0.144", features = ["rc", "derive"] } +serde_json = "1.0.85" ssh2 = { version = "0.9.3", features = ["vendored-openssl"] } vmm-sys-util = "0.10.0" wait-timeout = "0.2.0" diff --git a/test_infra/src/lib.rs b/test_infra/src/lib.rs index d1de93888..cc10dc6ce 100644 --- a/test_infra/src/lib.rs +++ b/test_infra/src/lib.rs @@ -4,11 +4,10 @@ // use once_cell::sync::Lazy; +use serde_json::Value; use ssh2::Session; use std::env; use std::ffi::OsStr; -use std::fmt::Debug; -use std::fs; use std::io; use std::io::{Read, Write}; use std::net::TcpListener; @@ -20,13 +19,29 @@ use std::process::{Child, Command, ExitStatus, Output, Stdio}; use std::str::FromStr; use std::sync::Mutex; use std::thread; +use std::time::Duration; +use std::{fmt, fs}; use vmm_sys_util::tempdir::TempDir; +use wait_timeout::ChildExt; + +#[derive(Debug)] +pub enum WaitTimeoutError { + Timedout, + ExitStatus, + General(std::io::Error), +} #[derive(Debug)] pub enum Error { Parsing(std::num::ParseIntError), SshCommand(SshCommandError), WaitForBoot(WaitForBootError), + EthrLogFile(std::io::Error), + EthrLogParse, + FioOutputParse, + Iperf3Parse, + Spawn(std::io::Error), + WaitTimeout(WaitTimeoutError), } impl From for Error { @@ -1320,3 +1335,281 @@ pub fn clh_command(cmd: &str) -> String { |target| format!("target/{}/release/{}", target, cmd), ) } + +pub fn parse_iperf3_output(output: &[u8], sender: bool) -> Result { + std::panic::catch_unwind(|| { + let s = String::from_utf8_lossy(output); + let v: Value = serde_json::from_str(&s).expect("'iperf3' parse error: invalid json output"); + + let bps: f64 = if sender { + v["end"]["sum_sent"]["bits_per_second"] + .as_f64() + .expect("'iperf3' parse error: missing entry 'end.sum_sent.bits_per_second'") + } else { + v["end"]["sum_received"]["bits_per_second"] + .as_f64() + .expect("'iperf3' parse error: missing entry 'end.sum_received.bits_per_second'") + }; + + bps + }) + .map_err(|_| { + eprintln!( + "=============== iperf3 output ===============\n\n{}\n\n===========end============\n\n", + String::from_utf8_lossy(output) + ); + Error::Iperf3Parse + }) +} + +pub enum FioOps { + Read, + RandomRead, + Write, + RandomWrite, +} + +impl fmt::Display for FioOps { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + FioOps::Read => write!(f, "read"), + FioOps::RandomRead => write!(f, "randread"), + FioOps::Write => write!(f, "write"), + FioOps::RandomWrite => write!(f, "randwrite"), + } + } +} + +pub fn parse_fio_output(output: &str, fio_ops: &FioOps, num_jobs: u32) -> Result { + std::panic::catch_unwind(|| { + let v: Value = + serde_json::from_str(output).expect("'fio' parse error: invalid json output"); + let jobs = v["jobs"] + .as_array() + .expect("'fio' parse error: missing entry 'jobs'"); + assert_eq!( + jobs.len(), + num_jobs as usize, + "'fio' parse error: Unexpected number of 'fio' jobs." + ); + + let read = match fio_ops { + FioOps::Read | FioOps::RandomRead => true, + FioOps::Write | FioOps::RandomWrite => false, + }; + + let mut total_bps = 0_f64; + for j in jobs { + if read { + let bytes = j["read"]["io_bytes"] + .as_u64() + .expect("'fio' parse error: missing entry 'read.io_bytes'"); + let runtime = j["read"]["runtime"] + .as_u64() + .expect("'fio' parse error: missing entry 'read.runtime'") + as f64 + / 1000_f64; + total_bps += bytes as f64 / runtime as f64; + } else { + let bytes = j["write"]["io_bytes"] + .as_u64() + .expect("'fio' parse error: missing entry 'write.io_bytes'"); + let runtime = j["write"]["runtime"] + .as_u64() + .expect("'fio' parse error: missing entry 'write.runtime'") + as f64 + / 1000_f64; + total_bps += bytes as f64 / runtime as f64; + } + } + + total_bps + }) + .map_err(|_| { + eprintln!( + "=============== Fio output ===============\n\n{}\n\n===========end============\n\n", + output + ); + Error::FioOutputParse + }) +} + +// Wait the child process for a given timeout +fn child_wait_timeout(child: &mut Child, timeout: u64) -> Result<(), WaitTimeoutError> { + match child.wait_timeout(Duration::from_secs(timeout)) { + Err(e) => { + return Err(WaitTimeoutError::General(e)); + } + Ok(s) => match s { + None => { + return Err(WaitTimeoutError::Timedout); + } + Some(s) => { + if !s.success() { + return Err(WaitTimeoutError::ExitStatus); + } + } + }, + } + + Ok(()) +} + +pub fn measure_virtio_net_throughput( + test_timeout: u32, + queue_pairs: u32, + guest: &Guest, + receive: bool, +) -> Result { + let default_port = 5201; + + // 1. start the iperf3 server on the guest + for n in 0..queue_pairs { + guest.ssh_command(&format!("iperf3 -s -p {} -D", default_port + n))?; + } + + thread::sleep(Duration::new(1, 0)); + + // 2. start the iperf3 client on host to measure RX through-put + let mut clients = Vec::new(); + for n in 0..queue_pairs { + let mut cmd = Command::new("iperf3"); + cmd.args(&[ + "-J", // Output in JSON format + "-c", + &guest.network.guest_ip, + "-p", + &format!("{}", default_port + n), + "-t", + &format!("{}", test_timeout), + ]); + // For measuring the guest transmit throughput (as a sender), + // use reverse mode of the iperf3 client on the host + if !receive { + cmd.args(&["-R"]); + } + let client = cmd + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .map_err(Error::Spawn)?; + + clients.push(client); + } + + let mut err: Option = None; + let mut bps = Vec::new(); + let mut failed = false; + for c in clients { + let mut c = c; + if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5) { + err = Some(Error::WaitTimeout(e)); + failed = true; + } + + if !failed { + // Safe to unwrap as we know the child has terminated succesffully + let output = c.wait_with_output().unwrap(); + bps.push(parse_iperf3_output(&output.stdout, receive)?); + } else { + let _ = c.kill(); + let output = c.wait_with_output().unwrap(); + println!( + "=============== Client output [Error] ===============\n\n{}\n\n===========end============\n\n", + String::from_utf8_lossy(&output.stdout) + ); + } + } + + if let Some(e) = err { + Err(e) + } else { + Ok(bps.iter().sum()) + } +} + +pub fn parse_ethr_latency_output(output: &[u8]) -> Result, Error> { + std::panic::catch_unwind(|| { + let s = String::from_utf8_lossy(output); + let mut latency = Vec::new(); + for l in s.lines() { + let v: Value = serde_json::from_str(l).expect("'ethr' parse error: invalid json line"); + // Skip header/summary lines + if let Some(avg) = v["Avg"].as_str() { + // Assume the latency unit is always "us" + latency.push( + avg.split("us").collect::>()[0] + .parse::() + .expect("'ethr' parse error: invalid 'Avg' entry"), + ); + } + } + + assert!( + !latency.is_empty(), + "'ethr' parse error: no valid latency data found" + ); + + latency + }) + .map_err(|_| { + eprintln!( + "=============== ethr output ===============\n\n{}\n\n===========end============\n\n", + String::from_utf8_lossy(output) + ); + Error::EthrLogParse + }) +} + +pub fn measure_virtio_net_latency(guest: &Guest, test_timeout: u32) -> Result, Error> { + // copy the 'ethr' tool to the guest image + let ethr_path = "/usr/local/bin/ethr"; + let ethr_remote_path = "/tmp/ethr"; + scp_to_guest( + Path::new(ethr_path), + Path::new(ethr_remote_path), + &guest.network.guest_ip, + //DEFAULT_SSH_RETRIES, + 1, + DEFAULT_SSH_TIMEOUT, + )?; + + // Start the ethr server on the guest + guest.ssh_command(&format!("{} -s &> /dev/null &", ethr_remote_path))?; + + thread::sleep(Duration::new(10, 0)); + + // Start the ethr client on the host + let log_file = guest + .tmp_dir + .as_path() + .join("ethr.client.log") + .to_str() + .unwrap() + .to_string(); + let mut c = Command::new(ethr_path) + .args(&[ + "-c", + &guest.network.guest_ip, + "-t", + "l", + "-o", + &log_file, // file output is JSON format + "-d", + &format!("{}s", test_timeout), + ]) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .map_err(Error::Spawn)?; + + if let Err(e) = child_wait_timeout(&mut c, test_timeout as u64 + 5).map_err(Error::WaitTimeout) + { + let _ = c.kill(); + return Err(e); + } + + // Parse the ethr latency test output + let content = fs::read(log_file).map_err(Error::EthrLogFile)?; + parse_ethr_latency_output(&content) +}