aya/maps/ring_buf: fix producer position initialization

The RingBuf caches the last value it read of the producer so it doesn't
need to constantly contend on the actual producer cache line if lots of
messages have yet to be consumed. It was bogus to initialize this cache
at 0. This patch initializes it properly and adds testing.

Fixes: #1309
pull/1318/head
Ershaad Basheer 1 month ago committed by Andrew Werner
parent a3aa387a2e
commit fa1fa41733

@ -303,6 +303,9 @@ impl ProducerData {
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
let pos = unsafe { mmap.ptr().cast::<AtomicUsize>().as_ref() };
let pos_cache = pos.load(Ordering::Acquire);
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
debug_assert!(byte_size.is_power_of_two());
@ -310,7 +313,7 @@ impl ProducerData {
Ok(Self {
mmap,
data_offset: page_size,
pos_cache: 0,
pos_cache,
mask,
})
}

@ -91,3 +91,7 @@ path = "src/xdp_sec.rs"
[[bin]]
name = "uprobe_cookie"
path = "src/uprobe_cookie.rs"
[[bin]]
name = "ring_buf_pinned"
path = "src/ring_buf_pinned.rs"

@ -0,0 +1,26 @@
#![no_std]
#![no_main]
use aya_ebpf::{
macros::{map, uprobe},
maps::RingBuf,
programs::ProbeContext,
};
#[cfg(not(test))]
extern crate ebpf_panic;
#[map]
static RING_BUF: RingBuf = RingBuf::pinned(0, 0);
#[uprobe]
pub fn ring_buf_test(ctx: ProbeContext) {
// Write the first argument to the function back out to RING_BUF if it is even,
// otherwise increment the counter in REJECTED. This exercises discarding data.
let arg: u64 = match ctx.arg(0) {
Some(arg) => arg,
None => return,
};
if arg % 2 == 0 {
let _: Result<(), i64> = RING_BUF.output(&arg, 0);
}
}

@ -48,13 +48,14 @@ bpf_file!(
REDIRECT => "redirect",
RELOCATIONS => "relocations",
RING_BUF => "ring_buf",
RING_BUF_PINNED => "ring_buf_pinned",
SIMPLE_PROG => "simple_prog",
STRNCMP => "strncmp",
TCX => "tcx",
TEST => "test",
TWO_PROGS => "two_progs",
XDP_SEC => "xdp_sec",
UPROBE_COOKIE => "uprobe_cookie",
XDP_SEC => "xdp_sec",
);
#[cfg(test)]

@ -1,6 +1,8 @@
use std::{
mem,
collections::VecDeque,
fs, mem,
os::fd::AsRawFd as _,
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
@ -13,7 +15,7 @@ use anyhow::Context as _;
use assert_matches::assert_matches;
use aya::{
Ebpf, EbpfLoader,
maps::{MapData, array::PerCpuArray, ring_buf::RingBuf},
maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf},
programs::UProbe,
};
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
@ -412,3 +414,114 @@ impl WriterThread {
thread.join().unwrap();
}
}
// This test checks for a bug that the consumer index always started at position
// 0 of a newly-loaded ring-buffer map. This assumption is not true for a map
// that is pinned to the bpffs filesystem since the map "remembers" the last
// consumer index position even if all processes unloaded it. The structure of
// the test is as follows:
//
// Create the pinned ring buffer, write some items to it, and read some of them.
// Leave some in there, so that we can assert that upon re-opening the map, we
// can still read them. Then, re-open the map, write some more items and read
// both the old and new items.
#[test]
fn pinned_ring_buf() {
let mut rng = rand::rng();
let pin_path = Path::new("/sys/fs/bpf/").join(format!("{:x}", rng.random::<u64>()));
fs::create_dir_all(&pin_path).unwrap();
let n = RING_BUF_MAX_ENTRIES - 1; // avoid thinking about the capacity
let data = std::iter::repeat_with(|| rng.random())
.take(n)
.collect::<Vec<_>>();
let PinnedRingBufTest { mut ring_buf, _bpf } = PinnedRingBufTest::new(&pin_path);
let to_write_before_reopen = data.len().min(8);
let mut expected = VecDeque::new();
for &v in data.iter().take(to_write_before_reopen) {
ring_buf_trigger_ebpf_program(v);
if v % 2 == 0 {
expected.push_back(v);
}
}
let to_read_before_reopen = expected.len() / 2;
for _ in 0..to_read_before_reopen {
let read = ring_buf.next().unwrap();
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
let exp = expected.pop_front().unwrap();
assert_eq!(exp, arg);
}
// Close the old pinned map and re-open it.
drop((_bpf, ring_buf));
let PinnedRingBufTest { mut ring_buf, _bpf } = PinnedRingBufTest::new(&pin_path);
// Write some more items to the ring buffer.
for &v in data.iter().skip(to_write_before_reopen) {
ring_buf_trigger_ebpf_program(v);
if v % 2 == 0 {
expected.push_back(v);
}
}
for _ in 0..expected.len() {
let read = ring_buf.next().unwrap();
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
let exp = expected.pop_front().unwrap();
assert_eq!(exp, arg);
}
// Make sure that there is nothing else in the ring_buf.
assert_matches!(ring_buf.next(), None);
// Clean up the pinned map from the filesystem.
fs::remove_dir_all(pin_path).unwrap();
}
struct PinnedRingBufTest {
_bpf: Ebpf,
ring_buf: RingBuf<MapData>,
}
impl PinnedRingBufTest {
fn new(pin_path: &Path) -> Self {
const RING_BUF_BYTE_SIZE: u32 =
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
let mut bpf = EbpfLoader::new()
.map_pin_path(pin_path)
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
.load(crate::RING_BUF_PINNED)
.unwrap();
let ring_buf_pin_path = pin_path.join("RING_BUF");
let ring_buf = MapData::from_pin(ring_buf_pin_path).unwrap();
let ring_buf = Map::RingBuf(ring_buf);
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
"ring_buf_trigger_ebpf_program",
"/proc/self/exe",
None,
None,
)
.unwrap();
Self {
_bpf: bpf,
ring_buf,
}
}
}

Loading…
Cancel
Save