From 17171647f7e447698f0d4733a3dbb144ded53466 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 5 Oct 2025 14:36:27 +0200 Subject: [PATCH] aya/maps/ring_buf: fix producer position initialization The RingBuf caches the last value it read of the producer so it doesn't need to constantly contend on the actual producer cache line if lots of messages have yet to be consumed. It was bogus to initialize this cache at 0. This patch initializes it properly and adds testing. Fixes: #1309 --- aya/src/maps/ring_buf.rs | 34 +++++--- test/integration-test/src/tests/ring_buf.rs | 88 ++++++++++++++++++++- 2 files changed, 106 insertions(+), 16 deletions(-) diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c93a7118..4b08eb11 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -303,6 +303,10 @@ impl ProducerData { let len = page_size + 2 * usize::try_from(byte_size).unwrap(); let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + // The producer position may be non-zero if the map is being loaded from a pin, or the map + // has been used previously; load the initial value of the producer position from the mmap. + let pos_cache = load_producer_pos(&mmap); + // byte_size is required to be a power of two multiple of page_size (which implicitly is a // power of 2), so subtracting one will create a bitmask for values less than byte_size. debug_assert!(byte_size.is_power_of_two()); @@ -310,7 +314,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } @@ -322,7 +326,6 @@ impl ProducerData { ref mut pos_cache, ref mut mask, } = self; - let pos = unsafe { mmap.ptr().cast().as_ref() }; let mmap_data = mmap.as_ref(); let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| { panic!( @@ -331,7 +334,7 @@ impl ProducerData { mmap_data.len() ) }); - while data_available(pos, pos_cache, consumer) { + while data_available(mmap, pos_cache, consumer) { match read_item(data_pages, *mask, consumer) { Item::Busy => return None, Item::Discard { len } => consumer.consume(len), @@ -347,17 +350,15 @@ impl ProducerData { } fn data_available( - producer: &AtomicUsize, - cache: &mut usize, + producer: &MMap, + producer_cache: &mut usize, consumer: &ConsumerPos, ) -> bool { let ConsumerPos { pos: consumer, .. } = consumer; - if consumer == cache { - // This value is written using Release by the kernel [1], and should be read with - // Acquire to ensure that the prior writes to the entry header are visible. - // - // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 - *cache = producer.load(Ordering::Acquire); + // Refresh the producer position cache if it appears that the consumer is caught up + // with the producer position. + if consumer == producer_cache { + *producer_cache = load_producer_pos(producer); } // Note that we don't compare the order of the values because the producer position may @@ -369,7 +370,7 @@ impl ProducerData { // producer position has wrapped around. // // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 - consumer != cache + consumer != producer_cache } fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> { @@ -403,3 +404,12 @@ impl ProducerData { } } } + +// Loads the producer position from the shared memory mmap. +fn load_producer_pos(producer: &MMap) -> usize { + // This value is written using Release by the kernel [1], and should be read with + // Acquire to ensure that the prior writes to the entry header are visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 + unsafe { producer.ptr().cast::().as_ref() }.load(Ordering::Acquire) +} diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 2c56f736..f91b8698 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,6 +1,7 @@ use std::{ mem, os::fd::AsRawFd as _, + path::Path, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -19,6 +20,7 @@ use aya::{ use aya_obj::generated::BPF_RINGBUF_HDR_SZ; use integration_common::ring_buf::Registers; use rand::Rng as _; +use scopeguard::defer; use tokio::io::{Interest, unix::AsyncFd}; struct RingBufTest { @@ -35,14 +37,25 @@ const RING_BUF_MAX_ENTRIES: usize = 512; impl RingBufTest { fn new() -> Self { + Self::new_with_mutators(|_loader| {}, |_bpf| {}) + } + + // Allows the test to mutate the EbpfLoader before it loads the object file from disk, and to + // mutate the loaded Ebpf object after it has been loaded from disk but before it is loaded + // into the kernel. + fn new_with_mutators<'loader>( + loader_fn: impl FnOnce(&mut EbpfLoader<'loader>), + bpf_fn: impl FnOnce(&mut Ebpf), + ) -> Self { const RING_BUF_BYTE_SIZE: u32 = (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; // Use the loader API to control the size of the ring_buf. - let mut bpf = EbpfLoader::new() - .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) - .load(crate::RING_BUF) - .unwrap(); + let mut loader = EbpfLoader::new(); + loader.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE); + loader_fn(&mut loader); + let mut bpf = loader.load(crate::RING_BUF).unwrap(); + bpf_fn(&mut bpf); let ring_buf = bpf.take_map("RING_BUF").unwrap(); let ring_buf = RingBuf::try_from(ring_buf).unwrap(); let regs = bpf.take_map("REGISTERS").unwrap(); @@ -412,3 +425,70 @@ impl WriterThread { thread.join().unwrap(); } } + +// This tests that a ring buffer can be pinned and then re-opened and attached to a subsequent +// program. It ensures that the producer position is properly synchronized between the two +// programs, and that no unread data is lost. +#[tokio::test(flavor = "multi_thread")] +#[test_log::test] +async fn ring_buf_pinned() { + let pin_path = + Path::new("/sys/fs/bpf/").join(format!("ring_buf_{}", rand::rng().random::())); + + let RingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = RingBufTest::new_with_mutators( + |_loader| {}, + |bpf| { + let ring_buf = bpf.map_mut("RING_BUF").unwrap(); + ring_buf.pin(&pin_path).unwrap(); + }, + ); + defer! { std::fs::remove_file(&pin_path).unwrap() } + + // Write a few items to the ring buffer. + let to_write_before_reopen = [2, 4, 6, 8]; + for v in to_write_before_reopen { + ring_buf_trigger_ebpf_program(v); + } + let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2); + for v in to_read_before_reopen { + let item = ring_buf.next().unwrap(); + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + } + drop(ring_buf); + drop(_bpf); + + // Reopen the ring buffer using the pinned map. + let RingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = RingBufTest::new_with_mutators( + |loader| { + loader.map_pin_path("RING_BUF", &pin_path); + }, + |_bpf| {}, + ); + let to_write_after_reopen = [10, 12]; + + // Write some more data. + for v in to_write_after_reopen { + ring_buf_trigger_ebpf_program(v); + } + // Read both the data that was written before the ring buffer was reopened and the data that + // was written after it was reopened. + for v in to_read_after_reopen + .iter() + .chain(to_write_after_reopen.iter()) + { + let item = ring_buf.next().unwrap(); + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + } + // Make sure there is nothing else in the ring buffer. + assert_matches!(ring_buf.next(), None); +}