From fa1fa417334bf63790693845fa1d11c937b74301 Mon Sep 17 00:00:00 2001 From: Ershaad Basheer Date: Mon, 11 Aug 2025 15:02:38 -0700 Subject: [PATCH] aya/maps/ring_buf: fix producer position initialization The RingBuf caches the last value it read of the producer so it doesn't need to constantly contend on the actual producer cache line if lots of messages have yet to be consumed. It was bogus to initialize this cache at 0. This patch initializes it properly and adds testing. Fixes: #1309 --- aya/src/maps/ring_buf.rs | 5 +- test/integration-ebpf/Cargo.toml | 4 + test/integration-ebpf/src/ring_buf_pinned.rs | 26 +++++ test/integration-test/src/lib.rs | 3 +- test/integration-test/src/tests/ring_buf.rs | 117 ++++++++++++++++++- 5 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 test/integration-ebpf/src/ring_buf_pinned.rs diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c6642411..cbaa07b6 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -303,6 +303,9 @@ impl ProducerData { let len = page_size + 2 * usize::try_from(byte_size).unwrap(); let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + let pos = unsafe { mmap.ptr().cast::().as_ref() }; + let pos_cache = pos.load(Ordering::Acquire); + // byte_size is required to be a power of two multiple of page_size (which implicitly is a // power of 2), so subtracting one will create a bitmask for values less than byte_size. debug_assert!(byte_size.is_power_of_two()); @@ -310,7 +313,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index b0573882..a8bba763 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -91,3 +91,7 @@ path = "src/xdp_sec.rs" [[bin]] name = "uprobe_cookie" path = "src/uprobe_cookie.rs" + +[[bin]] +name = "ring_buf_pinned" +path = "src/ring_buf_pinned.rs" diff --git a/test/integration-ebpf/src/ring_buf_pinned.rs b/test/integration-ebpf/src/ring_buf_pinned.rs new file mode 100644 index 00000000..230e7362 --- /dev/null +++ b/test/integration-ebpf/src/ring_buf_pinned.rs @@ -0,0 +1,26 @@ +#![no_std] +#![no_main] + +use aya_ebpf::{ + macros::{map, uprobe}, + maps::RingBuf, + programs::ProbeContext, +}; +#[cfg(not(test))] +extern crate ebpf_panic; + +#[map] +static RING_BUF: RingBuf = RingBuf::pinned(0, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + // Write the first argument to the function back out to RING_BUF if it is even, + // otherwise increment the counter in REJECTED. This exercises discarding data. + let arg: u64 = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if arg % 2 == 0 { + let _: Result<(), i64> = RING_BUF.output(&arg, 0); + } +} diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index f7cc30c6..df1fc87b 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -48,13 +48,14 @@ bpf_file!( REDIRECT => "redirect", RELOCATIONS => "relocations", RING_BUF => "ring_buf", + RING_BUF_PINNED => "ring_buf_pinned", SIMPLE_PROG => "simple_prog", STRNCMP => "strncmp", TCX => "tcx", TEST => "test", TWO_PROGS => "two_progs", - XDP_SEC => "xdp_sec", UPROBE_COOKIE => "uprobe_cookie", + XDP_SEC => "xdp_sec", ); #[cfg(test)] diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 4089bcbe..5e34580a 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,6 +1,8 @@ use std::{ - mem, + collections::VecDeque, + fs, mem, os::fd::AsRawFd as _, + path::Path, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -13,7 +15,7 @@ use anyhow::Context as _; use assert_matches::assert_matches; use aya::{ Ebpf, EbpfLoader, - maps::{MapData, array::PerCpuArray, ring_buf::RingBuf}, + maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf}, programs::UProbe, }; use aya_obj::generated::BPF_RINGBUF_HDR_SZ; @@ -412,3 +414,114 @@ impl WriterThread { thread.join().unwrap(); } } + +// This test checks for a bug that the consumer index always started at position +// 0 of a newly-loaded ring-buffer map. This assumption is not true for a map +// that is pinned to the bpffs filesystem since the map "remembers" the last +// consumer index position even if all processes unloaded it. The structure of +// the test is as follows: +// +// Create the pinned ring buffer, write some items to it, and read some of them. +// Leave some in there, so that we can assert that upon re-opening the map, we +// can still read them. Then, re-open the map, write some more items and read +// both the old and new items. +#[test] +fn pinned_ring_buf() { + let mut rng = rand::rng(); + let pin_path = Path::new("/sys/fs/bpf/").join(format!("{:x}", rng.random::())); + fs::create_dir_all(&pin_path).unwrap(); + let n = RING_BUF_MAX_ENTRIES - 1; // avoid thinking about the capacity + let data = std::iter::repeat_with(|| rng.random()) + .take(n) + .collect::>(); + + let PinnedRingBufTest { mut ring_buf, _bpf } = PinnedRingBufTest::new(&pin_path); + + let to_write_before_reopen = data.len().min(8); + let mut expected = VecDeque::new(); + for &v in data.iter().take(to_write_before_reopen) { + ring_buf_trigger_ebpf_program(v); + if v % 2 == 0 { + expected.push_back(v); + } + } + let to_read_before_reopen = expected.len() / 2; + for _ in 0..to_read_before_reopen { + let read = ring_buf.next().unwrap(); + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + let exp = expected.pop_front().unwrap(); + assert_eq!(exp, arg); + } + + // Close the old pinned map and re-open it. + drop((_bpf, ring_buf)); + let PinnedRingBufTest { mut ring_buf, _bpf } = PinnedRingBufTest::new(&pin_path); + + // Write some more items to the ring buffer. + for &v in data.iter().skip(to_write_before_reopen) { + ring_buf_trigger_ebpf_program(v); + if v % 2 == 0 { + expected.push_back(v); + } + } + for _ in 0..expected.len() { + let read = ring_buf.next().unwrap(); + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + let exp = expected.pop_front().unwrap(); + assert_eq!(exp, arg); + } + + // Make sure that there is nothing else in the ring_buf. + assert_matches!(ring_buf.next(), None); + + // Clean up the pinned map from the filesystem. + fs::remove_dir_all(pin_path).unwrap(); +} + +struct PinnedRingBufTest { + _bpf: Ebpf, + ring_buf: RingBuf, +} + +impl PinnedRingBufTest { + fn new(pin_path: &Path) -> Self { + const RING_BUF_BYTE_SIZE: u32 = + (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; + + let mut bpf = EbpfLoader::new() + .map_pin_path(pin_path) + .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) + .load(crate::RING_BUF_PINNED) + .unwrap(); + let ring_buf_pin_path = pin_path.join("RING_BUF"); + let ring_buf = MapData::from_pin(ring_buf_pin_path).unwrap(); + let ring_buf = Map::RingBuf(ring_buf); + let ring_buf = RingBuf::try_from(ring_buf).unwrap(); + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + "ring_buf_trigger_ebpf_program", + "/proc/self/exe", + None, + None, + ) + .unwrap(); + + Self { + _bpf: bpf, + ring_buf, + } + } +}