feat: cork PulseAudio streams on pause to reduce CPU wakeups

When the guest VM stops audio playback/capture, the PCM worker was
continuously writing silence to PulseAudio every ~10ms, keeping the
PA mainloop and audio hardware clock active. This adds PulseAudio
stream corking: on pause the stream is corked so the PA server stops
requesting data, and on resume it is uncorked. This eliminates idle
CPU wakeups and improves battery life.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Davíð Steinn Geirsson 2026-02-25 22:54:07 +00:00
parent 6c0da08597
commit acf39cf91d
6 changed files with 461 additions and 124 deletions

View file

@ -331,6 +331,17 @@ pub trait AsyncPlaybackBufferStream: Send {
&'a mut self,
_ex: &dyn AudioStreamsExecutor,
) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
/// Cork (pause) the stream. The audio server stops requesting data,
/// reducing CPU wakeups when no audio is being played.
async fn cork(&mut self) -> Result<(), BoxError> {
Ok(())
}
/// Uncork (resume) the stream. The audio server resumes requesting data.
async fn uncork(&mut self) -> Result<(), BoxError> {
Ok(())
}
}
#[async_trait(?Send)]
@ -341,6 +352,14 @@ impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S
) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
(**self).next_playback_buffer(ex).await
}
async fn cork(&mut self) -> Result<(), BoxError> {
(**self).cork().await
}
async fn uncork(&mut self) -> Result<(), BoxError> {
(**self).uncork().await
}
}
/// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should

View file

@ -78,6 +78,17 @@ pub trait AsyncCaptureBufferStream: Send {
&'a mut self,
_ex: &dyn AudioStreamsExecutor,
) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
/// Cork (pause) the capture stream. The audio server stops sending data,
/// reducing CPU wakeups when no audio is being captured.
async fn cork(&mut self) -> Result<(), BoxError> {
Ok(())
}
/// Uncork (resume) the capture stream. The audio server resumes sending data.
async fn uncork(&mut self) -> Result<(), BoxError> {
Ok(())
}
}
#[async_trait(?Send)]
@ -88,6 +99,14 @@ impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
(**self).next_capture_buffer(ex).await
}
async fn cork(&mut self) -> Result<(), BoxError> {
(**self).cork().await
}
async fn uncork(&mut self) -> Result<(), BoxError> {
(**self).uncork().await
}
}
/// `CaptureBuffer` contains a block of audio samples got from capture stream. It provides

View file

@ -58,6 +58,16 @@ pub trait CaptureBufferReader {
&mut self,
ex: &Executor,
) -> Result<AsyncCaptureBuffer, BoxError>;
/// Cork (pause) the underlying capture stream.
async fn cork(&mut self) -> Result<(), BoxError> {
Ok(())
}
/// Uncork (resume) the underlying capture stream.
async fn uncork(&mut self) -> Result<(), BoxError> {
Ok(())
}
}
/// Trait to wrap system specific helpers for writing to endpoint playback buffers.
@ -381,140 +391,315 @@ async fn pcm_worker_loop(
pin_mut!(on_release);
match dstream {
DirectionalStream::Output(mut sys_direction_output) => loop {
#[cfg(windows)]
let (mut stream, mut buffer_writer_lock) = (
sys_direction_output
.async_playback_buffer_stream
.lock()
.await,
sys_direction_output.buffer_writer.lock().await,
);
#[cfg(windows)]
let buffer_writer = &mut buffer_writer_lock;
#[cfg(any(target_os = "android", target_os = "linux"))]
let (stream, buffer_writer) = (
&mut sys_direction_output.async_playback_buffer_stream,
&mut sys_direction_output.buffer_writer,
);
DirectionalStream::Output(mut sys_direction_output) => {
let mut is_corked = false;
loop {
// While corked, avoid calling next_playback_buffer() which would
// wake the PulseAudio mainloop. Instead, poll for status changes.
if is_corked {
let wait_for_uncork = async {
loop {
let status = status_mutex.lock().await;
if *status != WorkerStatus::Pause {
return *status;
}
drop(status);
if let Err(e) = TimerAsync::sleep(&ex, period_dur).await {
error!(
"[Card {}] Sleep error while corked: {}",
card_index, e
);
}
}
}
.fuse();
pin_mut!(wait_for_uncork);
let next_buf = stream.next_playback_buffer(&ex).fuse();
pin_mut!(next_buf);
let new_status = select! {
_ = on_release => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
},
status = wait_for_uncork => status,
};
let dst_buf = select! {
_ = on_release => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
},
buf = next_buf => buf.map_err(Error::FetchBuffer)?,
};
let worker_status = status_mutex.lock().await;
match *worker_status {
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
if let Err(e) = write_data(dst_buf, None, buffer_writer).await {
error!(
"[Card {}] Error on write_data after worker quit: {}",
card_index, e
)
match new_status {
WorkerStatus::Running => {
#[cfg(any(target_os = "android", target_os = "linux"))]
if let Err(e) = sys_direction_output
.async_playback_buffer_stream
.uncork()
.await
{
error!(
"[Card {}] Failed to uncork playback stream: {}",
card_index, e
);
}
is_corked = false;
continue;
}
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
}
WorkerStatus::Pause => unreachable!(),
}
break Ok(());
}
WorkerStatus::Pause => {
write_data(dst_buf, None, buffer_writer).await?;
// Use an enum to communicate cork request out of the borrow scope.
// The stream borrow from next_playback_buffer() prevents calling
// cork() in the same scope, so we defer it.
enum LoopAction {
Continue,
Cork,
Break(Result<(), Error>),
}
WorkerStatus::Running => match desc_receiver.try_next() {
Err(e) => {
error!(
"[Card {}] Underrun. No new DescriptorChain while running: {}",
card_index, e
);
write_data(dst_buf, None, buffer_writer).await?;
let action = {
#[cfg(windows)]
let (mut stream, mut buffer_writer_lock) = (
sys_direction_output
.async_playback_buffer_stream
.lock()
.await,
sys_direction_output.buffer_writer.lock().await,
);
#[cfg(windows)]
let buffer_writer = &mut buffer_writer_lock;
#[cfg(any(target_os = "android", target_os = "linux"))]
let (stream, buffer_writer) = (
&mut sys_direction_output.async_playback_buffer_stream,
&mut sys_direction_output.buffer_writer,
);
let next_buf = stream.next_playback_buffer(&ex).fuse();
pin_mut!(next_buf);
let dst_buf = select! {
_ = on_release => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
},
buf = next_buf => buf.map_err(Error::FetchBuffer)?,
};
let worker_status = status_mutex.lock().await;
match *worker_status {
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
if let Err(e) = write_data(dst_buf, None, buffer_writer).await
{
error!(
"[Card {}] Error on write_data after worker quit: {}",
card_index, e
)
}
LoopAction::Break(Ok(()))
}
WorkerStatus::Pause => {
write_data(dst_buf, None, buffer_writer).await?;
LoopAction::Cork
}
WorkerStatus::Running => match desc_receiver.try_next() {
Err(e) => {
error!(
"[Card {}] Underrun. No new DescriptorChain while running: {}",
card_index, e
);
write_data(dst_buf, None, buffer_writer).await?;
LoopAction::Continue
}
Ok(None) => {
error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
write_data(dst_buf, None, buffer_writer).await?;
LoopAction::Break(Err(Error::InvalidPCMWorkerState))
}
Ok(Some(mut desc_chain)) => {
let reader = if muted.load(Ordering::Relaxed) {
None
} else {
// stream_id was already read in handle_pcm_queue
Some(&mut desc_chain.reader)
};
let status =
write_data(dst_buf, reader, buffer_writer).await.into();
sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
.await
.map_err(Error::MpscSend)?;
LoopAction::Continue
}
},
}
Ok(None) => {
error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
write_data(dst_buf, None, buffer_writer).await?;
return Err(Error::InvalidPCMWorkerState);
}
Ok(Some(mut desc_chain)) => {
let reader = if muted.load(Ordering::Relaxed) {
None
} else {
// stream_id was already read in handle_pcm_queue
Some(&mut desc_chain.reader)
};
let status = write_data(dst_buf, reader, buffer_writer).await.into();
sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
};
match action {
LoopAction::Continue => {}
LoopAction::Cork => {
// Cork the stream so PulseAudio stops requesting data,
// reducing CPU wakeups while paused.
#[cfg(any(target_os = "android", target_os = "linux"))]
if let Err(e) = sys_direction_output
.async_playback_buffer_stream
.cork()
.await
.map_err(Error::MpscSend)?;
{
error!(
"[Card {}] Failed to cork playback stream: {}",
card_index, e
);
}
is_corked = true;
}
},
}
},
DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {
let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
pin_mut!(next_buf);
let src_buf = select! {
_ = on_release => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
},
buf = next_buf => buf.map_err(Error::FetchBuffer)?,
};
let worker_status = status_mutex.lock().await;
match *worker_status {
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
if let Err(e) = read_data(src_buf, None, period_bytes).await {
error!(
"[Card {}] Error on read_data after worker quit: {}",
card_index, e
)
}
break Ok(());
LoopAction::Break(result) => break result,
}
WorkerStatus::Pause => {
read_data(src_buf, None, period_bytes).await?;
}
WorkerStatus::Running => match desc_receiver.try_next() {
Err(e) => {
error!(
"[Card {}] Overrun. No new DescriptorChain while running: {}",
card_index, e
);
read_data(src_buf, None, period_bytes).await?;
}
Ok(None) => {
error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
read_data(src_buf, None, period_bytes).await?;
return Err(Error::InvalidPCMWorkerState);
}
Ok(Some(mut desc_chain)) => {
let writer = if muted.load(Ordering::Relaxed) {
None
} else {
Some(&mut desc_chain.writer)
};
let status = read_data(src_buf, writer, period_bytes).await.into();
sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
.await
.map_err(Error::MpscSend)?;
}
},
}
},
}
DirectionalStream::Input(period_bytes, mut buffer_reader) => {
let mut is_corked = false;
loop {
// While corked, avoid calling get_next_capture_period() which would
// wake the PulseAudio mainloop. Instead, poll for status changes.
if is_corked {
let wait_for_uncork = async {
loop {
let status = status_mutex.lock().await;
if *status != WorkerStatus::Pause {
return *status;
}
drop(status);
if let Err(e) = TimerAsync::sleep(&ex, period_dur).await {
error!(
"[Card {}] Sleep error while corked: {}",
card_index, e
);
}
}
}
.fuse();
pin_mut!(wait_for_uncork);
let new_status = select! {
_ = on_release => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
},
status = wait_for_uncork => status,
};
match new_status {
WorkerStatus::Running => {
if let Err(e) = buffer_reader.uncork().await {
error!(
"[Card {}] Failed to uncork capture stream: {}",
card_index, e
);
}
is_corked = false;
continue;
}
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
}
WorkerStatus::Pause => unreachable!(),
}
}
// Use an enum to communicate cork request out of the borrow scope.
// The buffer_reader borrow from get_next_capture_period() prevents
// calling cork() in the same scope, so we defer it.
enum CaptureAction {
Continue,
Cork,
Break(Result<(), Error>),
}
let action = {
let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();
pin_mut!(next_buf);
let src_buf = select! {
_ = on_release => {
drain_desc_receiver(desc_receiver, sender).await?;
break Ok(());
},
buf = next_buf => buf.map_err(Error::FetchBuffer)?,
};
let worker_status = status_mutex.lock().await;
match *worker_status {
WorkerStatus::Quit => {
drain_desc_receiver(desc_receiver, sender).await?;
if let Err(e) = read_data(src_buf, None, period_bytes).await {
error!(
"[Card {}] Error on read_data after worker quit: {}",
card_index, e
)
}
CaptureAction::Break(Ok(()))
}
WorkerStatus::Pause => {
read_data(src_buf, None, period_bytes).await?;
CaptureAction::Cork
}
WorkerStatus::Running => match desc_receiver.try_next() {
Err(e) => {
error!(
"[Card {}] Overrun. No new DescriptorChain while running: {}",
card_index, e
);
read_data(src_buf, None, period_bytes).await?;
CaptureAction::Continue
}
Ok(None) => {
error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);
read_data(src_buf, None, period_bytes).await?;
CaptureAction::Break(Err(Error::InvalidPCMWorkerState))
}
Ok(Some(mut desc_chain)) => {
let writer = if muted.load(Ordering::Relaxed) {
None
} else {
Some(&mut desc_chain.writer)
};
let status =
read_data(src_buf, writer, period_bytes).await.into();
sender
.send(PcmResponse {
desc_chain,
status,
done: None,
})
.await
.map_err(Error::MpscSend)?;
CaptureAction::Continue
}
},
}
};
match action {
CaptureAction::Continue => {}
CaptureAction::Cork => {
// Cork the stream so PulseAudio stops sending data,
// reducing CPU wakeups while paused.
if let Err(e) = buffer_reader.cork().await {
error!(
"[Card {}] Failed to cork capture stream: {}",
card_index, e
);
}
is_corked = true;
}
CaptureAction::Break(result) => break result,
}
}
}
}
}

View file

@ -260,6 +260,14 @@ impl CaptureBufferReader for UnixBufferReader {
.await
.map_err(Error::FetchBuffer)?)
}
async fn cork(&mut self) -> Result<(), BoxError> {
self.async_stream.cork().await
}
async fn uncork(&mut self) -> Result<(), BoxError> {
self.async_stream.uncork().await
}
}
pub(crate) struct UnixBufferWriter {

View file

@ -166,6 +166,8 @@ pub struct PulseAudioCaptureStream {
shared_buffer: Arc<Mutex<SharedCaptureBuffer>>,
/// Buffer commit handler
buffer_commit: PulseAudioCaptureBufferCommit,
/// Whether the stream is currently corked (paused at the PA server level)
corked: bool,
}
impl PulseAudioCaptureStream {
@ -213,6 +215,7 @@ impl PulseAudioCaptureStream {
local_buffer,
shared_buffer,
buffer_commit: PulseAudioCaptureBufferCommit,
corked: false,
})
}
@ -270,6 +273,12 @@ impl PulseAudioCaptureStream {
match Self::create_pulse_stream(client, &self.params, self.shared_buffer.clone()) {
Ok(stream) => {
// Re-cork if we were corked before disconnection
if self.corked {
if let Err(e) = block_on(stream.cork()) {
log::warn!("Failed to re-cork capture stream after reconnect: {}", e);
}
}
log::info!("PulseAudio capture stream reconnected");
self.stream = Some(stream);
}
@ -317,6 +326,50 @@ impl AsyncCaptureBufferStream for PulseAudioCaptureStream {
AsyncCaptureBuffer::new(self.params.frame_size, buffer_slice, &mut self.buffer_commit)
.map_err(|e| Box::new(e) as BoxError)
}
async fn cork(&mut self) -> Result<(), BoxError> {
if self.corked {
return Ok(());
}
if let Some(ref stream) = self.stream {
match block_on(stream.cork()) {
Ok(()) => {
log::info!("PulseAudio capture stream corked");
self.corked = true;
self.shared_buffer.lock().unwrap().clear();
}
Err(e) => {
log::warn!("Failed to cork PulseAudio capture stream: {}", e);
self.stream = None;
self.connection.lock().unwrap().mark_disconnected();
}
}
}
Ok(())
}
async fn uncork(&mut self) -> Result<(), BoxError> {
if !self.corked {
return Ok(());
}
if let Some(ref stream) = self.stream {
match block_on(stream.uncork()) {
Ok(()) => {
log::info!("PulseAudio capture stream uncorked");
self.corked = false;
}
Err(e) => {
log::warn!("Failed to uncork PulseAudio capture stream: {}", e);
self.stream = None;
self.connection.lock().unwrap().mark_disconnected();
}
}
} else {
// Stream disconnected; new stream will start uncorked after reconnect.
self.corked = false;
}
Ok(())
}
}
#[cfg(test)]

View file

@ -253,6 +253,8 @@ pub struct PulseAudioPlaybackStream {
shared_buffer: Arc<Mutex<SharedAudioBuffer>>,
/// Buffer commit handler (stored in struct for lifetime management)
buffer_commit: PulseAudioBufferCommit,
/// Whether the stream is currently corked (paused at the PA server level)
corked: bool,
}
impl PulseAudioPlaybackStream {
@ -325,6 +327,7 @@ impl PulseAudioPlaybackStream {
local_buffer,
shared_buffer,
buffer_commit,
corked: false,
})
}
@ -392,6 +395,12 @@ impl PulseAudioPlaybackStream {
match Self::create_pulse_stream(client, &self.params, self.shared_buffer.clone()) {
Ok(stream) => {
// Re-cork if we were corked before disconnection
if self.corked {
if let Err(e) = block_on(stream.cork()) {
log::warn!("Failed to re-cork playback stream after reconnect: {}", e);
}
}
log::info!("PulseAudio stream reconnected");
self.stream = Some(stream);
}
@ -516,6 +525,50 @@ impl AsyncPlaybackBufferStream for PulseAudioPlaybackStream {
AsyncPlaybackBuffer::new(self.params.frame_size, buffer_slice, &mut self.buffer_commit)
.map_err(|e| Box::new(e) as BoxError)
}
async fn cork(&mut self) -> Result<(), BoxError> {
if self.corked {
return Ok(());
}
if let Some(ref stream) = self.stream {
match block_on(stream.cork()) {
Ok(()) => {
log::info!("PulseAudio playback stream corked");
self.corked = true;
self.shared_buffer.lock().unwrap().clear();
}
Err(e) => {
log::warn!("Failed to cork PulseAudio playback stream: {}", e);
self.stream = None;
self.connection.lock().unwrap().mark_disconnected();
}
}
}
Ok(())
}
async fn uncork(&mut self) -> Result<(), BoxError> {
if !self.corked {
return Ok(());
}
if let Some(ref stream) = self.stream {
match block_on(stream.uncork()) {
Ok(()) => {
log::info!("PulseAudio playback stream uncorked");
self.corked = false;
}
Err(e) => {
log::warn!("Failed to uncork PulseAudio playback stream: {}", e);
self.stream = None;
self.connection.lock().unwrap().mark_disconnected();
}
}
} else {
// Stream disconnected; new stream will start uncorked after reconnect.
self.corked = false;
}
Ok(())
}
}
#[cfg(test)]