pull/1318/merge
lblebasheer 20 hours ago committed by GitHub
commit eef91ef334
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -303,6 +303,9 @@ impl ProducerData {
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
let pos = unsafe { mmap.ptr().cast::<AtomicUsize>().as_ref() };
let pos_cache = pos.load(Ordering::Acquire);
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
debug_assert!(byte_size.is_power_of_two());
@ -310,7 +313,7 @@ impl ProducerData {
Ok(Self {
mmap,
data_offset: page_size,
pos_cache: 0,
pos_cache,
mask,
})
}

@ -95,3 +95,7 @@ path = "src/xdp_sec.rs"
[[bin]]
name = "uprobe_cookie"
path = "src/uprobe_cookie.rs"
[[bin]]
name = "ring_buf_pinned"
path = "src/ring_buf_pinned.rs"

@ -0,0 +1,26 @@
#![no_std]
#![no_main]
use aya_ebpf::{
macros::{map, uprobe},
maps::RingBuf,
programs::ProbeContext,
};
#[cfg(not(test))]
extern crate ebpf_panic;
#[map]
static RING_BUF: RingBuf = RingBuf::pinned(0, 0);
#[uprobe]
pub fn ring_buf_test(ctx: ProbeContext) {
// Write the first argument to the function back out to RING_BUF if it is even,
// otherwise increment the counter in REJECTED. This exercises discarding data.
let arg: u64 = match ctx.arg(0) {
Some(arg) => arg,
None => return,
};
if arg % 2 == 0 {
let _: Result<(), i64> = RING_BUF.output(&arg, 0);
}
}

@ -49,13 +49,14 @@ bpf_file!(
REDIRECT => "redirect",
RELOCATIONS => "relocations",
RING_BUF => "ring_buf",
RING_BUF_PINNED => "ring_buf_pinned",
SIMPLE_PROG => "simple_prog",
STRNCMP => "strncmp",
TCX => "tcx",
TEST => "test",
TWO_PROGS => "two_progs",
XDP_SEC => "xdp_sec",
UPROBE_COOKIE => "uprobe_cookie",
XDP_SEC => "xdp_sec",
);
#[cfg(test)]

@ -1,6 +1,8 @@
use std::{
mem,
collections::VecDeque,
fs, mem,
os::fd::AsRawFd as _,
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
@ -13,7 +15,7 @@ use anyhow::Context as _;
use assert_matches::assert_matches;
use aya::{
Ebpf, EbpfLoader,
maps::{MapData, array::PerCpuArray, ring_buf::RingBuf},
maps::{Map, MapData, array::PerCpuArray, ring_buf::RingBuf},
programs::UProbe,
};
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
@ -412,3 +414,114 @@ impl WriterThread {
thread.join().unwrap();
}
}
// This test checks for a bug that the consumer index always started at position
// 0 of a newly-loaded ring-buffer map. This assumption is not true for a map
// that is pinned to the bpffs filesystem since the map "remembers" the last
// consumer index position even if all processes unloaded it. The structure of
// the test is as follows:
//
// Create the pinned ring buffer, write some items to it, and read some of them.
// Leave some in there, so that we can assert that upon re-opening the map, we
// can still read them. Then, re-open the map, write some more items and read
// both the old and new items.
#[test]
fn pinned_ring_buf() {
let mut rng = rand::rng();
let pin_path = Path::new("/sys/fs/bpf/").join(format!("{:x}", rng.random::<u64>()));
fs::create_dir_all(&pin_path).unwrap();
let n = RING_BUF_MAX_ENTRIES - 1; // avoid thinking about the capacity
let data = std::iter::repeat_with(|| rng.random())
.take(n)
.collect::<Vec<_>>();
let PinnedRingBufTest { mut ring_buf, _bpf } = PinnedRingBufTest::new(&pin_path);
let to_write_before_reopen = data.len().min(8);
let mut expected = VecDeque::new();
for &v in data.iter().take(to_write_before_reopen) {
ring_buf_trigger_ebpf_program(v);
if v % 2 == 0 {
expected.push_back(v);
}
}
let to_read_before_reopen = expected.len() / 2;
for _ in 0..to_read_before_reopen {
let read = ring_buf.next().unwrap();
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
let exp = expected.pop_front().unwrap();
assert_eq!(exp, arg);
}
// Close the old pinned map and re-open it.
drop((_bpf, ring_buf));
let PinnedRingBufTest { mut ring_buf, _bpf } = PinnedRingBufTest::new(&pin_path);
// Write some more items to the ring buffer.
for &v in data.iter().skip(to_write_before_reopen) {
ring_buf_trigger_ebpf_program(v);
if v % 2 == 0 {
expected.push_back(v);
}
}
for _ in 0..expected.len() {
let read = ring_buf.next().unwrap();
let read: [u8; 8] = (*read)
.try_into()
.with_context(|| format!("data: {:?}", read.len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
let exp = expected.pop_front().unwrap();
assert_eq!(exp, arg);
}
// Make sure that there is nothing else in the ring_buf.
assert_matches!(ring_buf.next(), None);
// Clean up the pinned map from the filesystem.
fs::remove_dir_all(pin_path).unwrap();
}
struct PinnedRingBufTest {
_bpf: Ebpf,
ring_buf: RingBuf<MapData>,
}
impl PinnedRingBufTest {
fn new(pin_path: &Path) -> Self {
const RING_BUF_BYTE_SIZE: u32 =
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
let mut bpf = EbpfLoader::new()
.map_pin_path(pin_path)
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
.load(crate::RING_BUF_PINNED)
.unwrap();
let ring_buf_pin_path = pin_path.join("RING_BUF");
let ring_buf = MapData::from_pin(ring_buf_pin_path).unwrap();
let ring_buf = Map::RingBuf(ring_buf);
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
"ring_buf_trigger_ebpf_program",
"/proc/self/exe",
None,
None,
)
.unwrap();
Self {
_bpf: bpf,
ring_buf,
}
}
}

Loading…
Cancel
Save