diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c6642411..2cfafd70 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -103,7 +103,7 @@ impl> RingBuf { let byte_size = data.obj.max_entries(); let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?; let consumer = ConsumerPos::new(consumer_metadata); - let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?; + let producer = ProducerData::new(map_fd, page_size, page_size, byte_size, consumer.pos)?; Ok(Self { map, consumer, @@ -270,6 +270,7 @@ impl ProducerData { offset: usize, page_size: usize, byte_size: u32, + pos_cache: usize, ) -> Result { // The producer pages have one page of metadata and then the data pages, all mapped // read-only. Note that the length of the mapping includes the data pages twice as the @@ -310,7 +311,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index b0573882..a8bba763 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -91,3 +91,7 @@ path = "src/xdp_sec.rs" [[bin]] name = "uprobe_cookie" path = "src/uprobe_cookie.rs" + +[[bin]] +name = "ring_buf_pinned" +path = "src/ring_buf_pinned.rs" diff --git a/test/integration-ebpf/src/ring_buf_pinned.rs b/test/integration-ebpf/src/ring_buf_pinned.rs new file mode 100644 index 00000000..769896fa --- /dev/null +++ b/test/integration-ebpf/src/ring_buf_pinned.rs @@ -0,0 +1,49 @@ +#![no_std] +#![no_main] + +use aya_ebpf::{ + macros::{map, uprobe}, + maps::{PerCpuArray, RingBuf}, + programs::ProbeContext, +}; +use integration_common::ring_buf::Registers; +#[cfg(not(test))] +extern crate ebpf_panic; + +#[map] +static RING_BUF: RingBuf = RingBuf::pinned(0, 0); + +// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs +// without needing synchronization. Atomics exist [1], but aren't exposed. +// +// [1]: https://lwn.net/Articles/838884/ +#[map] +static REGISTERS: PerCpuArray = PerCpuArray::with_max_entries(1, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) { + Some(regs) => unsafe { &mut *regs }, + None => return, + }; + let mut entry = match RING_BUF.reserve::(0) { + Some(entry) => entry, + None => { + *dropped += 1; + return; + } + }; + // Write the first argument to the function back out to RING_BUF if it is even, + // otherwise increment the counter in REJECTED. This exercises discarding data. + let arg: u64 = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if arg % 2 == 0 { + entry.write(arg); + entry.submit(0); + } else { + *rejected += 1; + entry.discard(0); + } +} diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index f7cc30c6..20529f6c 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -55,6 +55,7 @@ bpf_file!( TWO_PROGS => "two_progs", XDP_SEC => "xdp_sec", UPROBE_COOKIE => "uprobe_cookie", + RING_BUF_PINNED => "ring_buf_pinned", ); #[cfg(test)] diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 4089bcbe..f47e8e04 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,4 +1,5 @@ use std::{ + fs, mem, os::fd::AsRawFd as _, sync::{ @@ -13,7 +14,7 @@ use anyhow::Context as _; use assert_matches::assert_matches; use aya::{ Ebpf, EbpfLoader, - maps::{MapData, array::PerCpuArray, ring_buf::RingBuf}, + maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf}, programs::UProbe, }; use aya_obj::generated::BPF_RINGBUF_HDR_SZ; @@ -27,11 +28,18 @@ struct RingBufTest { regs: PerCpuArray, } +struct PinnedRingBufTest { + _bpf: Ebpf, + ring_buf: RingBuf, + regs: PerCpuArray, +} + // Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer // that is exactly a power-of-two multiple of the page size. The synchronous test will fail if // that's not the case because the actual size will be rounded up, and fewer entries will be dropped // than expected. const RING_BUF_MAX_ENTRIES: usize = 512; +const RING_BUF_PIN_PATH: &str = "/sys/fs/bpf/RING_BUF"; impl RingBufTest { fn new() -> Self { @@ -69,7 +77,45 @@ impl RingBufTest { } } +impl PinnedRingBufTest { + fn new() -> Self { + const RING_BUF_BYTE_SIZE: u32 = + (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; + + let mut bpf = EbpfLoader::new() + .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) + .load(crate::RING_BUF_PINNED) + .unwrap(); + // We assume the map has been pinned as part of the loading process + let ring_buf = MapData::from_pin(RING_BUF_PIN_PATH).unwrap(); + let ring_buf = Map::RingBuf(ring_buf); + let ring_buf = RingBuf::try_from(ring_buf).unwrap(); + let regs = bpf.take_map("REGISTERS").unwrap(); + let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap(); + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + "ring_buf_trigger_ebpf_program", + "/proc/self/exe", + None, + None, + ) + .unwrap(); + + Self { + _bpf: bpf, + ring_buf, + regs, + } + } +} + struct WithData(RingBufTest, Vec); +struct PinnedWithData(PinnedRingBufTest, Vec); impl WithData { fn new(n: usize) -> Self { @@ -80,6 +126,15 @@ impl WithData { } } +impl PinnedWithData { + fn new(n: usize) -> Self { + Self(PinnedRingBufTest::new(), { + let mut rng = rand::rng(); + std::iter::repeat_with(|| rng.random()).take(n).collect() + }) + } +} + #[test_case::test_case(0; "write zero items")] #[test_case::test_case(1; "write one item")] #[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")] @@ -138,6 +193,81 @@ fn ring_buf(n: usize) { assert_eq!(rejected, expected_rejected); } +#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")] +// This test checks for a bug that the consumer index always started at position 0 of a +// newly-loaded ring-buffer map. This assumption is not true for a map that is pinned to the bpffs +// filesystem since the map "remembers" the last consumer index position even if all processes +// unloaded it +fn pinned_ring_buf(n: usize) { + let run_test = |mut ring_buf: RingBuf, regs: PerCpuArray, data: Vec, expected_capacity: usize| { + let mut expected = Vec::new(); + let mut expected_rejected = 0u64; + let mut expected_dropped = 0u64; + + for (i, &v) in data.iter().enumerate() { + ring_buf_trigger_ebpf_program(v); + if i >= expected_capacity { + expected_dropped += 1; + } else if v % 2 == 0 { + expected.push(v); + } else { + expected_rejected += 1; + } + } + + let mut seen = Vec::::new(); + while seen.len() < expected.len() { + if let Some(read) = ring_buf.next() { + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + seen.push(arg); + } + } + + assert_matches!(ring_buf.next(), None); + assert_eq!(seen, expected); + + let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum(); + assert_eq!(dropped, expected_dropped); + assert_eq!(rejected, expected_rejected); + }; + + // Note that after expected_capacity has been submitted, reserve calls in the probe will fail + // and the probe will give up. + let expected_capacity = RING_BUF_MAX_ENTRIES - 1; + + let PinnedWithData( + PinnedRingBufTest { + ring_buf, + regs, + _bpf, + }, + data, + ) = PinnedWithData::new(n); + + run_test(ring_buf, regs, data, expected_capacity); + + // Close pinned map and re-open + drop(_bpf); + + let PinnedWithData( + PinnedRingBufTest { + ring_buf, + regs, + _bpf, + }, + data, + ) = PinnedWithData::new(n); + // Clean up the pinned map from the filesystem + let _ = fs::remove_file(RING_BUF_PIN_PATH).unwrap(); + + run_test(ring_buf, regs, data, expected_capacity); +} + #[unsafe(no_mangle)] #[inline(never)] pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {