fix consumer index handling for pinned ringbuf map

Newly loaded ring buffer maps assumed that the consumer index was at
position 0. This is not always true for pinned maps since they are
persistent and therefore remember the consumer index position from the
last time that they were read from

Fixes: #1309
reviewable/pr1318/r1
Ershaad Basheer 1 month ago
parent a3aa387a2e
commit d8bd45b84c

@ -103,7 +103,7 @@ impl<T: Borrow<MapData>> RingBuf<T> {
let byte_size = data.obj.max_entries();
let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?;
let consumer = ConsumerPos::new(consumer_metadata);
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size, consumer.pos)?;
Ok(Self {
map,
consumer,
@ -270,6 +270,7 @@ impl ProducerData {
offset: usize,
page_size: usize,
byte_size: u32,
pos_cache: usize,
) -> Result<Self, MapError> {
// The producer pages have one page of metadata and then the data pages, all mapped
// read-only. Note that the length of the mapping includes the data pages twice as the
@ -310,7 +311,7 @@ impl ProducerData {
Ok(Self {
mmap,
data_offset: page_size,
pos_cache: 0,
pos_cache,
mask,
})
}

@ -91,3 +91,7 @@ path = "src/xdp_sec.rs"
[[bin]]
name = "uprobe_cookie"
path = "src/uprobe_cookie.rs"
[[bin]]
name = "ring_buf_pinned"
path = "src/ring_buf_pinned.rs"

@ -0,0 +1,49 @@
#![no_std]
#![no_main]
use aya_ebpf::{
macros::{map, uprobe},
maps::{PerCpuArray, RingBuf},
programs::ProbeContext,
};
use integration_common::ring_buf::Registers;
#[cfg(not(test))]
extern crate ebpf_panic;
#[map]
static RING_BUF: RingBuf = RingBuf::pinned(0, 0);
// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs
// without needing synchronization. Atomics exist [1], but aren't exposed.
//
// [1]: https://lwn.net/Articles/838884/
#[map]
static REGISTERS: PerCpuArray<Registers> = PerCpuArray::with_max_entries(1, 0);
#[uprobe]
pub fn ring_buf_test(ctx: ProbeContext) {
let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) {
Some(regs) => unsafe { &mut *regs },
None => return,
};
let mut entry = match RING_BUF.reserve::<u64>(0) {
Some(entry) => entry,
None => {
*dropped += 1;
return;
}
};
// Write the first argument to the function back out to RING_BUF if it is even,
// otherwise increment the counter in REJECTED. This exercises discarding data.
let arg: u64 = match ctx.arg(0) {
Some(arg) => arg,
None => return,
};
if arg % 2 == 0 {
entry.write(arg);
entry.submit(0);
} else {
*rejected += 1;
entry.discard(0);
}
}

@ -55,6 +55,7 @@ bpf_file!(
TWO_PROGS => "two_progs",
XDP_SEC => "xdp_sec",
UPROBE_COOKIE => "uprobe_cookie",
RING_BUF_PINNED => "ring_buf_pinned",
);
#[cfg(test)]

@ -1,4 +1,5 @@
use std::{
fs,
mem,
os::fd::AsRawFd as _,
sync::{
@ -13,7 +14,7 @@ use anyhow::Context as _;
use assert_matches::assert_matches;
use aya::{
Ebpf, EbpfLoader,
maps::{MapData, array::PerCpuArray, ring_buf::RingBuf},
maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf},
programs::UProbe,
};
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
@ -27,11 +28,18 @@ struct RingBufTest {
regs: PerCpuArray<MapData, Registers>,
}
struct PinnedRingBufTest {
_bpf: Ebpf,
ring_buf: RingBuf<MapData>,
regs: PerCpuArray<MapData, Registers>,
}
// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
// that's not the case because the actual size will be rounded up, and fewer entries will be dropped
// than expected.
const RING_BUF_MAX_ENTRIES: usize = 512;
const RING_BUF_PIN_PATH: &str = "/sys/fs/bpf/RING_BUF";
impl RingBufTest {
fn new() -> Self {
@ -69,7 +77,45 @@ impl RingBufTest {
}
}
impl PinnedRingBufTest {
fn new() -> Self {
const RING_BUF_BYTE_SIZE: u32 =
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
let mut bpf = EbpfLoader::new()
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
.load(crate::RING_BUF_PINNED)
.unwrap();
// We assume the map has been pinned as part of the loading process
let ring_buf = MapData::from_pin(RING_BUF_PIN_PATH).unwrap();
let ring_buf = Map::RingBuf(ring_buf);
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
let regs = bpf.take_map("REGISTERS").unwrap();
let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
"ring_buf_trigger_ebpf_program",
"/proc/self/exe",
None,
None,
)
.unwrap();
Self {
_bpf: bpf,
ring_buf,
regs,
}
}
}
struct WithData(RingBufTest, Vec<u64>);
struct PinnedWithData(PinnedRingBufTest, Vec<u64>);
impl WithData {
fn new(n: usize) -> Self {
@ -80,6 +126,15 @@ impl WithData {
}
}
impl PinnedWithData {
fn new(n: usize) -> Self {
Self(PinnedRingBufTest::new(), {
let mut rng = rand::rng();
std::iter::repeat_with(|| rng.random()).take(n).collect()
})
}
}
#[test_case::test_case(0; "write zero items")]
#[test_case::test_case(1; "write one item")]
#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
@ -138,6 +193,81 @@ fn ring_buf(n: usize) {
assert_eq!(rejected, expected_rejected);
}
#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
// This test checks for a bug that the consumer index always started at position 0 of a
// newly-loaded ring-buffer map. This assumption is not true for a map that is pinned to the bpffs
// filesystem since the map "remembers" the last consumer index position even if all processes
// unloaded it
fn pinned_ring_buf(n: usize) {
let run_test = |mut ring_buf: RingBuf<MapData>, regs: PerCpuArray<MapData, Registers>, data: Vec<u64>, expected_capacity: usize| {
let mut expected = Vec::new();
let mut expected_rejected = 0u64;
let mut expected_dropped = 0u64;
for (i, &v) in data.iter().enumerate() {
ring_buf_trigger_ebpf_program(v);
if i >= expected_capacity {
expected_dropped += 1;
} else if v % 2 == 0 {
expected.push(v);
} else {
expected_rejected += 1;
}
}
let mut seen = Vec::<u64>::new();
while seen.len() < expected.len() {
if let Some(read) = ring_buf.next() {
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
assert_eq!(arg % 2, 0, "got {arg} from probe");
seen.push(arg);
}
}
assert_matches!(ring_buf.next(), None);
assert_eq!(seen, expected);
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
assert_eq!(dropped, expected_dropped);
assert_eq!(rejected, expected_rejected);
};
// Note that after expected_capacity has been submitted, reserve calls in the probe will fail
// and the probe will give up.
let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
let PinnedWithData(
PinnedRingBufTest {
ring_buf,
regs,
_bpf,
},
data,
) = PinnedWithData::new(n);
run_test(ring_buf, regs, data, expected_capacity);
// Close pinned map and re-open
drop(_bpf);
let PinnedWithData(
PinnedRingBufTest {
ring_buf,
regs,
_bpf,
},
data,
) = PinnedWithData::new(n);
// Clean up the pinned map from the filesystem
let _ = fs::remove_file(RING_BUF_PIN_PATH).unwrap();
run_test(ring_buf, regs, data, expected_capacity);
}
#[unsafe(no_mangle)]
#[inline(never)]
pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {

Loading…
Cancel
Save