From d8bd45b84cc840f006bde06f381dce5095f7062b Mon Sep 17 00:00:00 2001 From: Ershaad Basheer Date: Mon, 11 Aug 2025 15:02:38 -0700 Subject: [PATCH] fix consumer index handling for pinned ringbuf map Newly loaded ring buffer maps assumed that the consumer index was at position 0. This is not always true for pinned maps since they are persistent and therefore remember the consumer index position from the last time that they were read from Fixes: #1309 --- aya/src/maps/ring_buf.rs | 5 +- test/integration-ebpf/Cargo.toml | 4 + test/integration-ebpf/src/ring_buf_pinned.rs | 49 +++++++ test/integration-test/src/lib.rs | 1 + test/integration-test/src/tests/ring_buf.rs | 132 ++++++++++++++++++- 5 files changed, 188 insertions(+), 3 deletions(-) create mode 100644 test/integration-ebpf/src/ring_buf_pinned.rs diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c6642411..2cfafd70 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -103,7 +103,7 @@ impl> RingBuf { let byte_size = data.obj.max_entries(); let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?; let consumer = ConsumerPos::new(consumer_metadata); - let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?; + let producer = ProducerData::new(map_fd, page_size, page_size, byte_size, consumer.pos)?; Ok(Self { map, consumer, @@ -270,6 +270,7 @@ impl ProducerData { offset: usize, page_size: usize, byte_size: u32, + pos_cache: usize, ) -> Result { // The producer pages have one page of metadata and then the data pages, all mapped // read-only. Note that the length of the mapping includes the data pages twice as the @@ -310,7 +311,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index b0573882..a8bba763 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -91,3 +91,7 @@ path = "src/xdp_sec.rs" [[bin]] name = "uprobe_cookie" path = "src/uprobe_cookie.rs" + +[[bin]] +name = "ring_buf_pinned" +path = "src/ring_buf_pinned.rs" diff --git a/test/integration-ebpf/src/ring_buf_pinned.rs b/test/integration-ebpf/src/ring_buf_pinned.rs new file mode 100644 index 00000000..769896fa --- /dev/null +++ b/test/integration-ebpf/src/ring_buf_pinned.rs @@ -0,0 +1,49 @@ +#![no_std] +#![no_main] + +use aya_ebpf::{ + macros::{map, uprobe}, + maps::{PerCpuArray, RingBuf}, + programs::ProbeContext, +}; +use integration_common::ring_buf::Registers; +#[cfg(not(test))] +extern crate ebpf_panic; + +#[map] +static RING_BUF: RingBuf = RingBuf::pinned(0, 0); + +// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs +// without needing synchronization. Atomics exist [1], but aren't exposed. +// +// [1]: https://lwn.net/Articles/838884/ +#[map] +static REGISTERS: PerCpuArray = PerCpuArray::with_max_entries(1, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) { + Some(regs) => unsafe { &mut *regs }, + None => return, + }; + let mut entry = match RING_BUF.reserve::(0) { + Some(entry) => entry, + None => { + *dropped += 1; + return; + } + }; + // Write the first argument to the function back out to RING_BUF if it is even, + // otherwise increment the counter in REJECTED. This exercises discarding data. + let arg: u64 = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if arg % 2 == 0 { + entry.write(arg); + entry.submit(0); + } else { + *rejected += 1; + entry.discard(0); + } +} diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index f7cc30c6..20529f6c 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -55,6 +55,7 @@ bpf_file!( TWO_PROGS => "two_progs", XDP_SEC => "xdp_sec", UPROBE_COOKIE => "uprobe_cookie", + RING_BUF_PINNED => "ring_buf_pinned", ); #[cfg(test)] diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 4089bcbe..f47e8e04 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,4 +1,5 @@ use std::{ + fs, mem, os::fd::AsRawFd as _, sync::{ @@ -13,7 +14,7 @@ use anyhow::Context as _; use assert_matches::assert_matches; use aya::{ Ebpf, EbpfLoader, - maps::{MapData, array::PerCpuArray, ring_buf::RingBuf}, + maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf}, programs::UProbe, }; use aya_obj::generated::BPF_RINGBUF_HDR_SZ; @@ -27,11 +28,18 @@ struct RingBufTest { regs: PerCpuArray, } +struct PinnedRingBufTest { + _bpf: Ebpf, + ring_buf: RingBuf, + regs: PerCpuArray, +} + // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if // that's not the case because the actual size will be rounded up, and fewer entries will be dropped // than expected. const RING_BUF_MAX_ENTRIES: usize = 512; +const RING_BUF_PIN_PATH: &str = "/sys/fs/bpf/RING_BUF"; impl RingBufTest { fn new() -> Self { @@ -69,7 +77,45 @@ impl RingBufTest { } } +impl PinnedRingBufTest { + fn new() -> Self { + const RING_BUF_BYTE_SIZE: u32 = + (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; + + let mut bpf = EbpfLoader::new() + .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) + .load(crate::RING_BUF_PINNED) + .unwrap(); + // We assume the map has been pinned as part of the loading process + let ring_buf = MapData::from_pin(RING_BUF_PIN_PATH).unwrap(); + let ring_buf = Map::RingBuf(ring_buf); + let ring_buf = RingBuf::try_from(ring_buf).unwrap(); + let regs = bpf.take_map("REGISTERS").unwrap(); + let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap(); + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + "ring_buf_trigger_ebpf_program", + "/proc/self/exe", + None, + None, + ) + .unwrap(); + + Self { + _bpf: bpf, + ring_buf, + regs, + } + } +} + struct WithData(RingBufTest, Vec); +struct PinnedWithData(PinnedRingBufTest, Vec); impl WithData { fn new(n: usize) -> Self { @@ -80,6 +126,15 @@ impl WithData { } } +impl PinnedWithData { + fn new(n: usize) -> Self { + Self(PinnedRingBufTest::new(), { + let mut rng = rand::rng(); + std::iter::repeat_with(|| rng.random()).take(n).collect() + }) + } +} + #[test_case::test_case(0; "write zero items")] #[test_case::test_case(1; "write one item")] #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")] @@ -138,6 +193,81 @@ fn ring_buf(n: usize) { assert_eq!(rejected, expected_rejected); } +#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")] +// This test checks for a bug that the consumer index always started at position 0 of a +// newly-loaded ring-buffer map. This assumption is not true for a map that is pinned to the bpffs +// filesystem since the map "remembers" the last consumer index position even if all processes +// unloaded it +fn pinned_ring_buf(n: usize) { + let run_test = |mut ring_buf: RingBuf, regs: PerCpuArray, data: Vec, expected_capacity: usize| { + let mut expected = Vec::new(); + let mut expected_rejected = 0u64; + let mut expected_dropped = 0u64; + + for (i, &v) in data.iter().enumerate() { + ring_buf_trigger_ebpf_program(v); + if i >= expected_capacity { + expected_dropped += 1; + } else if v % 2 == 0 { + expected.push(v); + } else { + expected_rejected += 1; + } + } + + let mut seen = Vec::::new(); + while seen.len() < expected.len() { + if let Some(read) = ring_buf.next() { + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + seen.push(arg); + } + } + + assert_matches!(ring_buf.next(), None); + assert_eq!(seen, expected); + + let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum(); + assert_eq!(dropped, expected_dropped); + assert_eq!(rejected, expected_rejected); + }; + + // Note that after expected_capacity has been submitted, reserve calls in the probe will fail + // and the probe will give up. + let expected_capacity = RING_BUF_MAX_ENTRIES - 1; + + let PinnedWithData( + PinnedRingBufTest { + ring_buf, + regs, + _bpf, + }, + data, + ) = PinnedWithData::new(n); + + run_test(ring_buf, regs, data, expected_capacity); + + // Close pinned map and re-open + drop(_bpf); + + let PinnedWithData( + PinnedRingBufTest { + ring_buf, + regs, + _bpf, + }, + data, + ) = PinnedWithData::new(n); + // Clean up the pinned map from the filesystem + let _ = fs::remove_file(RING_BUF_PIN_PATH).unwrap(); + + run_test(ring_buf, regs, data, expected_capacity); +} + #[unsafe(no_mangle)] #[inline(never)] pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {