maps/ring_buf: fix producer position initialization

The RingBuf caches the last value it read of the producer so it doesn't
need to constantly contend on the actual producer cache line if lots of
messages have yet to be consumed. It was bogus to initialize this cache
at 0. This patch initializes it properly and adds testing.

Fixes: #1309
reviewable/pr1318/r9
Ershaad Basheer 3 months ago committed by Andrew Werner
parent a3aa387a2e
commit ed87ed6852

@ -303,6 +303,9 @@ impl ProducerData {
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
let pos = unsafe { mmap.ptr().cast::<AtomicUsize>().as_ref() };
let pos_cache = pos.load(Ordering::Acquire);
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
debug_assert!(byte_size.is_power_of_two());
@ -310,7 +313,7 @@ impl ProducerData {
Ok(Self {
mmap,
data_offset: page_size,
pos_cache: 0,
pos_cache,
mask,
})
}

@ -91,3 +91,7 @@ path = "src/xdp_sec.rs"
[[bin]]
name = "uprobe_cookie"
path = "src/uprobe_cookie.rs"
[[bin]]
name = "ring_buf_pinned"
path = "src/ring_buf_pinned.rs"

@ -0,0 +1,49 @@
#![no_std]
#![no_main]
use aya_ebpf::{
macros::{map, uprobe},
maps::{PerCpuArray, RingBuf},
programs::ProbeContext,
};
use integration_common::ring_buf::Registers;
#[cfg(not(test))]
extern crate ebpf_panic;
#[map]
static RING_BUF: RingBuf = RingBuf::pinned(0, 0);
// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs
// without needing synchronization. Atomics exist [1], but aren't exposed.
//
// [1]: https://lwn.net/Articles/838884/
#[map]
static REGISTERS: PerCpuArray<Registers> = PerCpuArray::with_max_entries(1, 0);
#[uprobe]
pub fn ring_buf_test(ctx: ProbeContext) {
let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) {
Some(regs) => unsafe { &mut *regs },
None => return,
};
let mut entry = match RING_BUF.reserve::<u64>(0) {
Some(entry) => entry,
None => {
*dropped += 1;
return;
}
};
// Write the first argument to the function back out to RING_BUF if it is even,
// otherwise increment the counter in REJECTED. This exercises discarding data.
let arg: u64 = match ctx.arg(0) {
Some(arg) => arg,
None => return,
};
if arg % 2 == 0 {
entry.write(arg);
entry.submit(0);
} else {
*rejected += 1;
entry.discard(0);
}
}

@ -55,6 +55,7 @@ bpf_file!(
TWO_PROGS => "two_progs",
XDP_SEC => "xdp_sec",
UPROBE_COOKIE => "uprobe_cookie",
RING_BUF_PINNED => "ring_buf_pinned",
);
#[cfg(test)]

@ -1,6 +1,8 @@
use std::{
mem,
collections::VecDeque,
fs, mem,
os::fd::AsRawFd as _,
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
@ -13,7 +15,7 @@ use anyhow::Context as _;
use assert_matches::assert_matches;
use aya::{
Ebpf, EbpfLoader,
maps::{MapData, array::PerCpuArray, ring_buf::RingBuf},
maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf},
programs::UProbe,
};
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
@ -27,6 +29,12 @@ struct RingBufTest {
regs: PerCpuArray<MapData, Registers>,
}
struct PinnedRingBufTest {
_bpf: Ebpf,
ring_buf: RingBuf<MapData>,
regs: PerCpuArray<MapData, Registers>,
}
// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
// that's not the case because the actual size will be rounded up, and fewer entries will be dropped
@ -43,6 +51,7 @@ impl RingBufTest {
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
.load(crate::RING_BUF)
.unwrap();
let ring_buf = bpf.take_map("RING_BUF").unwrap();
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
let regs = bpf.take_map("REGISTERS").unwrap();
@ -80,6 +89,15 @@ impl WithData {
}
}
impl PinnedWithData {
fn new(n: usize, pin_path: &Path) -> Self {
Self(PinnedRingBufTest::new(pin_path), {
let mut rng = rand::rng();
std::iter::repeat_with(|| rng.random()).take(n).collect()
})
}
}
#[test_case::test_case(0; "write zero items")]
#[test_case::test_case(1; "write one item")]
#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
@ -138,6 +156,123 @@ fn ring_buf(n: usize) {
assert_eq!(rejected, expected_rejected);
}
// This test checks for a bug that the consumer index always started at position
// 0 of a newly-loaded ring-buffer map. This assumption is not true for a map
// that is pinned to the bpffs filesystem since the map "remembers" the last
// consumer index position even if all processes unloaded it. The structure of
// the test is as follows:
//
// Create the pinned ring buffer, write some items to it, and read some of them.
// Leave some in there, so that we can assert that upon re-opening the map, we
// can still read them. Then, re-open the map, write some more items and read
// both the old and new items.
#[test]
fn pinned_ring_buf() {
let mut rng = rand::rng();
let pin_path = Path::new("/sys/fs/bpf/").join(format!("{:x}", rng.random::<u64>()));
fs::create_dir_all(&pin_path).unwrap();
let n = RING_BUF_MAX_ENTRIES - 1; // avoid thinking about the capacity
let data = std::iter::repeat_with(|| rng.random())
.take(n)
.collect::<Vec<_>>();
let PinnedRingBufTest {
mut ring_buf,
regs: _,
_bpf,
} = PinnedRingBufTest::new(&pin_path);
let to_write_before_reopen = data.len().min(8);
let mut expected = VecDeque::new();
for &v in data.iter().take(to_write_before_reopen) {
ring_buf_trigger_ebpf_program(v);
if v % 2 == 0 {
expected.push_back(v);
}
}
let to_read_before_reopen = expected.len() / 2;
for i in 0..to_read_before_reopen {
let read = ring_buf.next().unwrap();
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
let exp = expected.pop_front().unwrap();
assert_eq!(exp, arg);
}
// Close the old pinned map and re-open it.
drop((_bpf, ring_buf));
let PinnedRingBufTest {
mut ring_buf,
regs: _,
_bpf,
} = PinnedRingBufTest::new(&pin_path);
// Write some more items to the ring buffer.
for (i, &v) in data.iter().skip(to_write_before_reopen).enumerate() {
ring_buf_trigger_ebpf_program(v);
if v % 2 == 0 {
expected.push_back(v);
}
}
for _ in 0..expected.len() {
let read = ring_buf.next().unwrap();
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
let exp = expected.pop_front().unwrap();
assert_eq!(exp, arg);
}
// Make sure that there is nothing else in the ring_buf.
assert_matches!(ring_buf.next(), None);
// Clean up the pinned map from the filesystem.
fs::remove_dir_all(pin_path).unwrap();
}
impl PinnedRingBufTest {
fn new(pin_path: &Path) -> Self {
const RING_BUF_BYTE_SIZE: u32 =
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
let mut bpf = EbpfLoader::new()
.map_pin_path(pin_path)
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
.load(crate::RING_BUF_PINNED)
.unwrap();
let ring_buf_pin_path = pin_path.join("RING_BUF");
let ring_buf = MapData::from_pin(ring_buf_pin_path).unwrap();
let ring_buf = Map::RingBuf(ring_buf);
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
let regs = bpf.take_map("REGISTERS").unwrap();
let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
"ring_buf_trigger_ebpf_program",
"/proc/self/exe",
None,
None,
)
.unwrap();
Self {
_bpf: bpf,
ring_buf,
regs,
}
}
}
#[unsafe(no_mangle)]
#[inline(never)]
pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {

Loading…
Cancel
Save