diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c6642411..cbaa07b6 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -303,6 +303,9 @@ impl ProducerData { let len = page_size + 2 * usize::try_from(byte_size).unwrap(); let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + let pos = unsafe { mmap.ptr().cast::().as_ref() }; + let pos_cache = pos.load(Ordering::Acquire); + // byte_size is required to be a power of two multiple of page_size (which implicitly is a // power of 2), so subtracting one will create a bitmask for values less than byte_size. debug_assert!(byte_size.is_power_of_two()); @@ -310,7 +313,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index b0573882..a8bba763 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -91,3 +91,7 @@ path = "src/xdp_sec.rs" [[bin]] name = "uprobe_cookie" path = "src/uprobe_cookie.rs" + +[[bin]] +name = "ring_buf_pinned" +path = "src/ring_buf_pinned.rs" diff --git a/test/integration-ebpf/src/ring_buf_pinned.rs b/test/integration-ebpf/src/ring_buf_pinned.rs new file mode 100644 index 00000000..769896fa --- /dev/null +++ b/test/integration-ebpf/src/ring_buf_pinned.rs @@ -0,0 +1,49 @@ +#![no_std] +#![no_main] + +use aya_ebpf::{ + macros::{map, uprobe}, + maps::{PerCpuArray, RingBuf}, + programs::ProbeContext, +}; +use integration_common::ring_buf::Registers; +#[cfg(not(test))] +extern crate ebpf_panic; + +#[map] +static RING_BUF: RingBuf = RingBuf::pinned(0, 0); + +// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs +// without needing synchronization. Atomics exist [1], but aren't exposed. +// +// [1]: https://lwn.net/Articles/838884/ +#[map] +static REGISTERS: PerCpuArray = PerCpuArray::with_max_entries(1, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) { + Some(regs) => unsafe { &mut *regs }, + None => return, + }; + let mut entry = match RING_BUF.reserve::(0) { + Some(entry) => entry, + None => { + *dropped += 1; + return; + } + }; + // Write the first argument to the function back out to RING_BUF if it is even, + // otherwise increment the counter in REJECTED. This exercises discarding data. + let arg: u64 = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if arg % 2 == 0 { + entry.write(arg); + entry.submit(0); + } else { + *rejected += 1; + entry.discard(0); + } +} diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index f7cc30c6..20529f6c 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -55,6 +55,7 @@ bpf_file!( TWO_PROGS => "two_progs", XDP_SEC => "xdp_sec", UPROBE_COOKIE => "uprobe_cookie", + RING_BUF_PINNED => "ring_buf_pinned", ); #[cfg(test)] diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 4089bcbe..d5e99ea4 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,6 +1,8 @@ use std::{ - mem, + collections::VecDeque, + fs, mem, os::fd::AsRawFd as _, + path::Path, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -13,7 +15,7 @@ use anyhow::Context as _; use assert_matches::assert_matches; use aya::{ Ebpf, EbpfLoader, - maps::{MapData, array::PerCpuArray, ring_buf::RingBuf}, + maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf}, programs::UProbe, }; use aya_obj::generated::BPF_RINGBUF_HDR_SZ; @@ -27,6 +29,12 @@ struct RingBufTest { regs: PerCpuArray, } +struct PinnedRingBufTest { + _bpf: Ebpf, + ring_buf: RingBuf, + regs: PerCpuArray, +} + // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if // that's not the case because the actual size will be rounded up, and fewer entries will be dropped @@ -43,6 +51,7 @@ impl RingBufTest { .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) .load(crate::RING_BUF) .unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); let ring_buf = RingBuf::try_from(ring_buf).unwrap(); let regs = bpf.take_map("REGISTERS").unwrap(); @@ -80,6 +89,15 @@ impl WithData { } } +impl PinnedWithData { + fn new(n: usize, pin_path: &Path) -> Self { + Self(PinnedRingBufTest::new(pin_path), { + let mut rng = rand::rng(); + std::iter::repeat_with(|| rng.random()).take(n).collect() + }) + } +} + #[test_case::test_case(0; "write zero items")] #[test_case::test_case(1; "write one item")] #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")] @@ -138,6 +156,123 @@ fn ring_buf(n: usize) { assert_eq!(rejected, expected_rejected); } +// This test checks for a bug that the consumer index always started at position +// 0 of a newly-loaded ring-buffer map. This assumption is not true for a map +// that is pinned to the bpffs filesystem since the map "remembers" the last +// consumer index position even if all processes unloaded it. The structure of +// the test is as follows: +// +// Create the pinned ring buffer, write some items to it, and read some of them. +// Leave some in there, so that we can assert that upon re-opening the map, we +// can still read them. Then, re-open the map, write some more items and read +// both the old and new items. +#[test] +fn pinned_ring_buf() { + let mut rng = rand::rng(); + let pin_path = Path::new("/sys/fs/bpf/").join(format!("{:x}", rng.random::())); + fs::create_dir_all(&pin_path).unwrap(); + let n = RING_BUF_MAX_ENTRIES - 1; // avoid thinking about the capacity + let data = std::iter::repeat_with(|| rng.random()) + .take(n) + .collect::>(); + + let PinnedRingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = PinnedRingBufTest::new(&pin_path); + + let to_write_before_reopen = data.len().min(8); + let mut expected = VecDeque::new(); + for &v in data.iter().take(to_write_before_reopen) { + ring_buf_trigger_ebpf_program(v); + if v % 2 == 0 { + expected.push_back(v); + } + } + let to_read_before_reopen = expected.len() / 2; + for i in 0..to_read_before_reopen { + let read = ring_buf.next().unwrap(); + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + let exp = expected.pop_front().unwrap(); + assert_eq!(exp, arg); + } + + // Close the old pinned map and re-open it. + drop((_bpf, ring_buf)); + let PinnedRingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = PinnedRingBufTest::new(&pin_path); + + // Write some more items to the ring buffer. + for (i, &v) in data.iter().skip(to_write_before_reopen).enumerate() { + ring_buf_trigger_ebpf_program(v); + if v % 2 == 0 { + expected.push_back(v); + } + } + for _ in 0..expected.len() { + let read = ring_buf.next().unwrap(); + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + let exp = expected.pop_front().unwrap(); + assert_eq!(exp, arg); + } + + // Make sure that there is nothing else in the ring_buf. + assert_matches!(ring_buf.next(), None); + + // Clean up the pinned map from the filesystem. + fs::remove_dir_all(pin_path).unwrap(); +} + +impl PinnedRingBufTest { + fn new(pin_path: &Path) -> Self { + const RING_BUF_BYTE_SIZE: u32 = + (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; + + let mut bpf = EbpfLoader::new() + .map_pin_path(pin_path) + .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) + .load(crate::RING_BUF_PINNED) + .unwrap(); + let ring_buf_pin_path = pin_path.join("RING_BUF"); + let ring_buf = MapData::from_pin(ring_buf_pin_path).unwrap(); + let ring_buf = Map::RingBuf(ring_buf); + let ring_buf = RingBuf::try_from(ring_buf).unwrap(); + let regs = bpf.take_map("REGISTERS").unwrap(); + let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap(); + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + "ring_buf_trigger_ebpf_program", + "/proc/self/exe", + None, + None, + ) + .unwrap(); + + Self { + _bpf: bpf, + ring_buf, + regs, + } + } +} + #[unsafe(no_mangle)] #[inline(never)] pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {