Implement the ability to pause and resume outbound transmission

This commit is contained in:
Jakub Hlusička 2026-02-27 01:14:03 +01:00
parent 583c91add0
commit 8d3763a190
4 changed files with 282 additions and 69 deletions

View file

@ -3,6 +3,6 @@
"editor.formatOnSave": true,
},
// helps remove rust-analyser errors due to requiring a feature flag and target
"rust-analyzer.cargo.features": ["esp32s3"],
"rust-analyzer.cargo.features": ["esp32s3", "log"],
"rust-analyzer.cargo.target": "xtensa-esp32s3-none-elf",
}

View file

@ -13,7 +13,7 @@
"RUSTUP_TOOLCHAIN": "esp",
},
"extraArgs": ["-Zbuild-std=core,alloc"],
"features": ["esp32s3", "esp-hal-unstable"],
"features": ["esp32s3", "esp-hal-unstable", "log"],
},
},
},

View file

@ -14,6 +14,8 @@ targets = ["xtensa-esp32s3-none-elf"]
document-features = "^0.2.12"
esp-hal = { version = "~1.0", features = ["requires-unstable"] }
ouroboros = "0.18.5"
embassy-sync = "0.7.2"
log = { version = "0.4.29", optional = true }
[features]
# A workaround for the Zed editor not being able to use the `esp-hal/unstable` feature.
@ -22,6 +24,11 @@ esp-hal-unstable = ["esp-hal/unstable"]
#! ### Chip Support Feature Flags
## Target the ESP32-S3.
esp32s3 = ["esp-hal/esp32s3"]
# Enable logging via the `log` crate.
# This crate performs logging from within an interrupt handler.
# This can be useful for debugging, but it can have an impact on performance unless
# this feature is disabled or `log`'s `STATIC_MAX_LEVEL` is set to `INFO`.
log = ["dep:log"]
# TODO: Add support for the following targets:
# ## Target the ESP32.

View file

@ -15,12 +15,14 @@
use core::{
alloc::Layout,
cell::UnsafeCell,
cell::RefCell,
fmt::Debug,
ops::{Deref, DerefMut},
sync::atomic::{self, AtomicBool},
};
use alloc::{alloc::Allocator, boxed::Box, sync::Arc, vec};
use embassy_sync::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex};
use esp_hal::{
Blocking,
dma::{
@ -41,14 +43,92 @@ extern crate alloc;
const DMA_CHANNEL_OUTBOUND: usize = 2;
const INTERRUPT_OUTBOUND: Interrupt = Interrupt::DMA_OUT_CH2;
static RUNNING_DMA_BOUNCE: Mutex<CriticalSectionRawMutex, RefCell<Option<DmaBounce>>> =
Mutex::new(RefCell::new(None));
struct DmaPeripheralWithChannel<'a> {
peripheral: AnySpi<'a>,
channel: DMA_CH0<'a>,
}
mod transfer {
use super::*;
// This would be the ideal implementation, but it doesn't work, because
// I'm not sure how I could make lifetime `'d` in `SimpleMem2MemTransfer` invariant.
//
// #[self_referencing]
// struct ReceivingTransfer {
// peripheral: DmaPeripheralWithChannel<'static>,
// #[borrows(mut peripheral)]
// #[covariant]
// mem2mem: SimpleMem2Mem<'this, Blocking>,
// #[borrows(mut mem2mem)]
// #[covariant]
// transfer: Option<SimpleMem2MemTransfer<'this, 'this, Blocking>>,
// }
#[self_referencing]
struct ReceivingTransfer {
struct ReceivingTransferInner {
// This peripheral simultaneously exists cloned in `mem2mem`.
// Care must be taken that it is not accessed before `mem2mem` is dropped.
//
// Implementation note:
// Ideally, this would be referenced by `mem2mem` via `#[borrows(mut peripheral)]`,
// but then ouroboros complains about the invariant lifetime `'d` on `transfer`.
peripheral: DmaPeripheralWithChannel<'static>,
mem2mem: SimpleMem2Mem<'static, Blocking>,
#[borrows(mut mem2mem)]
#[covariant]
transfer: Option<SimpleMem2MemTransfer<'this, 'static, Blocking>>,
}
pub struct ReceivingTransfer(ReceivingTransferInner);
impl ReceivingTransfer {
pub fn new(
peripheral: DmaPeripheralWithChannel<'static>,
mem2mem_builder: impl FnOnce(
DmaPeripheralWithChannel<'static>,
) -> SimpleMem2Mem<'static, Blocking>,
transfer_builder: impl for<'a> FnOnce(
&'a mut SimpleMem2Mem<'static, Blocking>,
)
-> SimpleMem2MemTransfer<'a, 'static, Blocking>,
) -> Self {
let inner = ReceivingTransferInnerBuilder {
// Safety:
// These peripherals are not used until `mem2mem` is dropped.
// This is ensured by making it a private field.
peripheral: unsafe {
DmaPeripheralWithChannel {
peripheral: peripheral.peripheral.clone_unchecked(),
channel: peripheral.channel.clone_unchecked(),
}
},
mem2mem: (mem2mem_builder)(peripheral),
transfer_builder: move |mem2mem| Some((transfer_builder)(mem2mem)),
}
.build();
Self(inner)
}
pub fn with_transfer_mut<R>(
&mut self,
callback: impl FnOnce(&mut Option<SimpleMem2MemTransfer<'_, 'static, Blocking>>) -> R,
) -> R {
self.0.with_transfer_mut(callback)
}
pub fn into_peripheral(self) -> DmaPeripheralWithChannel<'static> {
self.0.into_heads().peripheral
}
}
}
use transfer::*;
pub struct Swapchain {
pub framebuffers: [&'static mut [u8]; 2],
}
@ -156,10 +236,8 @@ impl<'a> DerefMut for SwapchainWriteGuard<'a> {
pub struct DmaBounce {
// TODO: Make these generic.
// They currently cannot be generic, because they lacks a `reborrow` method.
channel: DMA_CH0<'static>,
// This can also be more generic, see `DmaEligible` in `Mem2Mem::new`.
peripheral_src: AnySpi<'static>,
// These currently cannot be generic, because they lack a `reborrow` method.
peripheral_src: Option<DmaPeripheralWithChannel<'static>>,
// This can also be more generic, see `DmaEligible` in `Mem2Mem::new`.
peripheral_dst: Option<Dpi<'static, Blocking>>,
// TODO: Combine with peripheral_dst using an enum?
@ -192,6 +270,8 @@ pub struct DmaBounce {
receiving_transfer: Option<ReceivingTransfer>,
}
unsafe impl Send for DmaBounce {}
impl DmaBounce {
/// * `allocator` - The allocator used to allocate the bounce buffers.
/// * `channel` - The DMA channel used to transfer data from the source buffer to the bounce buffers.
@ -289,8 +369,10 @@ impl DmaBounce {
);
Self {
peripheral_src: Some(DmaPeripheralWithChannel {
channel,
peripheral_src,
peripheral: peripheral_src,
}),
peripheral_dst: Some(peripheral_dst),
transfer_dst: None,
burst_config,
@ -598,6 +680,14 @@ impl DmaBounce {
);
}
let peripheral = self
.peripheral_src
.take()
.expect("the source DMA peripheral should be available");
ReceivingTransfer::new(
peripheral,
|peripheral| {
// Extend the lifetime to 'static because it is required by Mem2Mem.
//
// Safety:
@ -605,31 +695,59 @@ impl DmaBounce {
// this is because we `SimpleMem2MemTransfer::wait()` on the transfer to finish.
let bounce_dst_descs: &'static mut [DmaDescriptor] =
unsafe { &mut *(self.bounce_dst_descs as *mut _) };
let src_descs: &'static mut [DmaDescriptor] = unsafe { &mut *(self.src_descs as *mut _) };
let src_descs: &'static mut [DmaDescriptor] =
unsafe { &mut *(self.src_descs as *mut _) };
let mem2mem = unsafe {
Mem2Mem::new(
self.channel.clone_unchecked(),
self.peripheral_src.clone_unchecked(),
)
}
Mem2Mem::new(peripheral.channel, peripheral.peripheral)
.with_descriptors(bounce_dst_descs, src_descs, self.burst_config)
.unwrap();
ReceivingTransferBuilder {
mem2mem,
transfer_builder: |mem2mem| {
Some(
.unwrap()
},
|mem2mem| {
mem2mem
.start_transfer(self.bounce_buffer_dst, buffer_src_window)
.unwrap(),
)
.unwrap()
},
)
}
.build()
fn receive_window_wait(
&mut self,
mut receiving_transfer: ReceivingTransfer,
increase_window_counter: bool,
) {
receiving_transfer.with_transfer_mut(|receiving_transfer| {
// TODO: Async
let receiving_transfer = receiving_transfer
.take()
.expect("no ongoing inner transfer to a bounce buffer present");
if !receiving_transfer.is_done() {
#[cfg(feature = "log")]
log::debug!("Inbound transfer not yet finished, waiting via spinlock...");
receiving_transfer.wait().unwrap();
#[cfg(feature = "log")]
log::debug!("Inbound transfer not finished.");
}
});
if increase_window_counter {
self.increase_window_counter(1);
}
let existing_peripheral_src = self
.peripheral_src
.replace(receiving_transfer.into_peripheral());
assert!(
existing_peripheral_src.is_none(),
"no idle source DMA peripheral should be present, it should be receiving data instead"
);
}
fn receive_window(&mut self, increase_window_counter: bool) {
let receiving_transfer = unsafe { self.receive_window_start() };
self.receive_window_wait(receiving_transfer, increase_window_counter);
}
fn increase_window_counter(&mut self, windows: isize) {
// TODO: Updating `self.frame_index_next` without calling this function is error prone.
// Index into an array with `self.window_index_next % 2` instead.
if windows.rem_euclid(2) == 1 {
core::mem::swap(&mut self.bounce_buffer_dst, &mut self.bounce_buffer_src);
}
@ -641,17 +759,24 @@ impl DmaBounce {
self.window_index_next = window_index_next.rem_euclid(self.windows_len as isize) as usize;
}
pub fn launch_interrupt_driven_task(mut self) {
pub async fn launch_interrupt_driven_task(mut self) -> RunningDmaBounceHandle {
Self::enable_interrupts();
// Receive the first 2 windows, so that the outbound transfer can read valid data.
let mut receiving_transfer = unsafe { self.receive_window_start() };
let receiving_transfer = receiving_transfer
.with_mut(|x| x.transfer.take())
.expect("no ongoing inner transfer to a bounce buffer present");
// Reset the existing transfer left over since the outbound transfer was paused.
if let Some(receiving_transfer) = self.receiving_transfer.take() {
self.receive_window_wait(receiving_transfer, false);
}
receiving_transfer.wait().unwrap();
self.increase_window_counter(1);
// Reset window index.
if self.window_index_next != 0 {
self.increase_window_counter((self.windows_len - self.window_index_next) as isize);
}
// Fully receive the first windows, so that the outbound transfer can read valid data.
self.receive_window(true);
#[cfg(feature = "log")]
log::debug!("DmaBounce: Starting outbound transfer.");
let dma_tx_buffer = self.get_dma_tx_buffer();
let transfer = self
@ -663,11 +788,11 @@ impl DmaBounce {
panic!("failed to begin the transmission of the first frame: {error:?}");
});
self.transfer_dst = Some(transfer);
// Start receiving the second window.
self.receiving_transfer = Some(unsafe { self.receive_window_start() });
unsafe {
*DMA_STATE.0.get() = Some(self);
}
RunningDmaBounceHandle::new(self).await
}
fn get_dma_tx_buffer(&mut self) -> DmaTxBounceBuf {
@ -714,12 +839,86 @@ unsafe impl DmaTxBuffer for DmaTxBounceBuf {
}
}
static DMA_STATE: SyncUnsafeCell<Option<DmaBounce>> = SyncUnsafeCell(UnsafeCell::new(None));
#[must_use = "the `Drop` implementation uses a spinlock, which can result in a deadlock; use `RunningDmaBounceHandle::stop` instead of letting it be dropped"]
pub struct RunningDmaBounceHandle {
perform_drop: bool,
// Prevent construction.
_marker: (),
}
#[repr(transparent)]
pub struct SyncUnsafeCell<T>(UnsafeCell<T>);
impl RunningDmaBounceHandle {
async fn new(dma_bounce: DmaBounce) -> Self {
let dma_state = RUNNING_DMA_BOUNCE.lock().await;
*dma_state.borrow_mut() = Some(dma_bounce);
unsafe impl<T> Sync for SyncUnsafeCell<T> {}
Self {
perform_drop: true,
_marker: (),
}
}
pub async fn stop(mut self) -> DmaBounce {
#[cfg(feature = "log")]
log::debug!("DmaBounce: Stopping outbound transfer due to `stop`.");
let mut dma_bounce = RUNNING_DMA_BOUNCE
.lock()
.await
.borrow_mut()
.take()
.expect("an instance of a running `DmaBounce` should be available");
let transfer_dst = dma_bounce
.transfer_dst
.take()
.expect("an instance of a transfer to the destination peripheral should be available");
let (peripheral_dst, _dma_tx_buffer) = transfer_dst.stop();
let previous_peripheral_dst = dma_bounce.peripheral_dst.replace(peripheral_dst);
self.perform_drop = false;
assert!(
previous_peripheral_dst.is_none(),
"there should be no existing destination peripheral in `DmaBounce`"
);
dma_bounce
}
}
impl Drop for RunningDmaBounceHandle {
fn drop(&mut self) {
if !self.perform_drop {
return;
}
#[cfg(feature = "log")]
log::debug!("DmaBounce: Stopping outbound transfer due to `drop`.");
let dma_bounce = loop {
if let Ok(dma_bounce) = RUNNING_DMA_BOUNCE.try_lock() {
break dma_bounce;
}
};
let mut dma_bounce = dma_bounce
.borrow_mut()
.take()
.expect("an instance of a running `DmaBounce` should be available");
let transfer_dst = dma_bounce
.transfer_dst
.take()
.expect("an instance of a transfer to the destination peripheral should be available");
let (peripheral_dst, _dma_tx_buffer) = transfer_dst.stop();
dma_bounce.peripheral_dst = Some(peripheral_dst);
drop(dma_bounce);
}
}
impl Debug for RunningDmaBounceHandle {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("RunningDmaBounceHandle")
.finish_non_exhaustive()
}
}
#[handler(priority = Priority::Priority3)]
#[ram] // Improves performance.
@ -728,16 +927,27 @@ fn dma_outbound_interrupt_handler() {
let bounce_buffer_sent = interrupt.st().read().out_eof().bit_is_set();
if !bounce_buffer_sent {
// This should never happen.
#[cfg(feature = "log")]
log::warn!("DMA interrupt handler invoked without `OUT_EOF` bit having been set.");
return;
}
// Clear the bit by writing 1 to the clear bits.
interrupt.clr().write(|w| w.out_eof().bit(true));
// SAFETY: This value is only ever read in our interrupt handler,
// and interrupts are disabled, and we only use this in one thread.
let Some(dma_state) = unsafe { &mut *DMA_STATE.0.get() }.as_mut() else {
panic!("no DMA state available when executing DMA interrupt handler");
let Ok(dma_state) = RUNNING_DMA_BOUNCE.try_lock() else {
// If we can't acquire a lock guard, just give up.
#[cfg(feature = "log")]
log::trace!("Failed to lock `RUNNING_DMA_BOUNCE`.");
return;
};
let mut dma_state = dma_state.borrow_mut();
let Some(dma_state) = dma_state.as_mut() else {
// The outbound transmission was stopped.
#[cfg(feature = "log")]
log::trace!("`RUNNING_DMA_BOUNCE` is empty.");
return;
};
// The descriptor of the buffer with an EOF flag that just finished being sent.
@ -766,20 +976,16 @@ fn dma_outbound_interrupt_handler() {
dma_state.window_index_next = window_index_next;
let mut receiving_transfer = dma_state
let receiving_transfer = dma_state
.receiving_transfer
.take()
.expect("no ongoing transfer to a bounce buffer present");
let receiving_transfer = receiving_transfer
.with_mut(|x| x.transfer.take())
.expect("no ongoing inner transfer to a bounce buffer present");
if receiving_transfer.is_done() {
drop(receiving_transfer);
} else {
// error!("the transfer to a bounce buffer has not finished yet, waiting");
receiving_transfer.wait().unwrap();
}
dma_state.receive_window_wait(
receiving_transfer,
// `window_index_next` is already updated above.
false,
);
// If there is any ongoing transfer, cancel it and start a new one.
dma_state.receiving_transfer = Some(unsafe { dma_state.receive_window_start() });