From aaeda340abb11e27a93bc6fdebcc762befa4cb25 Mon Sep 17 00:00:00 2001 From: Ershaad Basheer Date: Mon, 11 Aug 2025 15:02:38 -0700 Subject: [PATCH] aya/maps/ring_buf: fix producer position initialization The RingBuf caches the last value it read of the producer so it doesn't need to constantly contend on the actual producer cache line if lots of messages have yet to be consumed. It was bogus to initialize this cache at 0. This patch initializes it properly and adds testing. Fixes: #1309 --- aya/src/maps/ring_buf.rs | 29 ++++--- test/integration-test/src/tests/ring_buf.rs | 88 ++++++++++++++++++++- 2 files changed, 104 insertions(+), 13 deletions(-) diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c6642411..3932ac3f 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -303,6 +303,10 @@ impl ProducerData { let len = page_size + 2 * usize::try_from(byte_size).unwrap(); let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + // Load the initial value of the producer position from the shared memory mmap. + let pos = unsafe { mmap.ptr().cast::().as_ref() }; + let pos_cache = load_producer_pos(pos); + // byte_size is required to be a power of two multiple of page_size (which implicitly is a // power of 2), so subtracting one will create a bitmask for values less than byte_size. debug_assert!(byte_size.is_power_of_two()); @@ -310,7 +314,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } @@ -348,16 +352,14 @@ impl ProducerData { fn data_available( producer: &AtomicUsize, - cache: &mut usize, + producer_cache: &mut usize, consumer: &ConsumerPos, ) -> bool { let ConsumerPos { pos: consumer, .. } = consumer; - if consumer == cache { - // This value is written using Release by the kernel [1], and should be read with - // Acquire to ensure that the prior writes to the entry header are visible. - // - // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 - *cache = producer.load(Ordering::Acquire); + // Refresh the producer position cache if it appears that the consumer is caught up + // with the producer position. + if consumer == producer_cache { + *producer_cache = load_producer_pos(producer); } // Note that we don't compare the order of the values because the producer position may @@ -369,7 +371,7 @@ impl ProducerData { // producer position has wrapped around. // // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 - consumer != cache + consumer != producer_cache } fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> { @@ -402,3 +404,12 @@ impl ProducerData { } } } + +// Loads the producer position from the shared memory mmap. +fn load_producer_pos(producer: &AtomicUsize) -> usize { + // This value is written using Release by the kernel [1], and should be read with + // Acquire to ensure that the prior writes to the entry header are visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 + producer.load(Ordering::Acquire) +} diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 4089bcbe..c5d95207 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,6 +1,7 @@ use std::{ mem, os::fd::AsRawFd as _, + path::Path, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -19,6 +20,7 @@ use aya::{ use aya_obj::generated::BPF_RINGBUF_HDR_SZ; use integration_common::ring_buf::Registers; use rand::Rng as _; +use scopeguard::defer; use tokio::io::{Interest, unix::AsyncFd}; struct RingBufTest { @@ -35,14 +37,25 @@ const RING_BUF_MAX_ENTRIES: usize = 512; impl RingBufTest { fn new() -> Self { + Self::new_with_mutators(|_loader| {}, |_bpf| {}) + } + + // Allows the test to mutate the EbpfLoader before it loads the object file from disk, and to + // mutate the loaded Ebpf object after it has been loaded from disk but before it is loaded + // into the kernel. + fn new_with_mutators<'loader>( + loader_fn: impl FnOnce(&mut EbpfLoader<'loader>), + bpf_fn: impl FnOnce(&mut Ebpf), + ) -> Self { const RING_BUF_BYTE_SIZE: u32 = (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; // Use the loader API to control the size of the ring_buf. - let mut bpf = EbpfLoader::new() - .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) - .load(crate::RING_BUF) - .unwrap(); + let mut loader = EbpfLoader::new(); + loader.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE); + loader_fn(&mut loader); + let mut bpf = loader.load(crate::RING_BUF).unwrap(); + bpf_fn(&mut bpf); let ring_buf = bpf.take_map("RING_BUF").unwrap(); let ring_buf = RingBuf::try_from(ring_buf).unwrap(); let regs = bpf.take_map("REGISTERS").unwrap(); @@ -412,3 +425,70 @@ impl WriterThread { thread.join().unwrap(); } } + +// This tests that a ring buffer can be pinned and then re-opened and attached to a subsequent +// program. It ensures that the producer position is properly synchronized between the two +// programs, and that no unread data is lost. +#[tokio::test(flavor = "multi_thread")] +#[test_log::test] +async fn ring_buf_pinned() { + let pin_path = + Path::new("/sys/fs/bpf/").join(format!("ring_buf_{}", rand::rng().random::())); + + let RingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = RingBufTest::new_with_mutators( + |_loader| {}, + |bpf| { + let ring_buf = bpf.map_mut("RING_BUF").unwrap(); + ring_buf.pin(&pin_path).unwrap(); + }, + ); + defer! { std::fs::remove_file(&pin_path).unwrap() } + + // Write a few items to the ring buffer. + let to_write_before_reopen = [2, 4, 6, 8]; + for v in to_write_before_reopen { + ring_buf_trigger_ebpf_program(v); + } + let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2); + for v in to_read_before_reopen { + let item = ring_buf.next().unwrap(); + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + } + drop(ring_buf); + drop(_bpf); + + // Reopen the ring buffer using the pinned map. + let RingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = RingBufTest::new_with_mutators( + |loader| { + loader.set_map_pin_path("RING_BUF", &pin_path); + }, + |_bpf| {}, + ); + let to_write_after_reopen = [10, 12]; + + // Write some more data. + for v in to_write_after_reopen { + ring_buf_trigger_ebpf_program(v); + } + // Read both the data that was written before the ring buffer was reopened and the data that + // was written after it was reopened. + for v in to_read_after_reopen + .iter() + .chain(to_write_after_reopen.iter()) + { + let item = ring_buf.next().unwrap(); + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + } + // Make sure there is nothing else in the ring buffer. + assert_matches!(ring_buf.next(), None); +}