mirror of https://github.com/aya-rs/aya
aya: Implement RingBuf
This implements the userspace binding for RingBuf. Instead of streaming the samples as heap buffers, the process_ring function takes a callback to which we pass the event's byte region, roughly following [libbpf]'s API design. This avoids a copy and allows marking the consumer pointer in a timely manner. [libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c Additionally, integration tests are added to demonstrate the usage of the new APIs and to ensure that they work end-to-end. Co-authored-by: William Findlay <william@williamfindlay.com> Co-authored-by: Andrew Werner <awerner32@gmail.com>reviewable/pr629/r6
parent
7ef7291e96
commit
687498a4c5
@ -0,0 +1,301 @@
|
|||||||
|
//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs.
|
||||||
|
//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF
|
||||||
|
//! programs to userspace.
|
||||||
|
//!
|
||||||
|
//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
io,
|
||||||
|
ops::Deref,
|
||||||
|
os::fd::{AsRawFd, RawFd},
|
||||||
|
ptr,
|
||||||
|
sync::atomic::{fence, AtomicU32, AtomicUsize, Ordering},
|
||||||
|
};
|
||||||
|
|
||||||
|
use libc::{munmap, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ},
|
||||||
|
maps::{MapData, MapError},
|
||||||
|
sys::mmap,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A map that can be used to receive events from eBPF programs.
|
||||||
|
///
|
||||||
|
/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways:
|
||||||
|
/// * It's shared across all CPUs, which allows a strong ordering between events. It also makes the
|
||||||
|
/// buffer creation easier.
|
||||||
|
/// * Data notifications are delivered for every event instead of being sampled for every N event;
|
||||||
|
/// the eBPF program can also control notification delivery if sampling is desired for performance reasons.
|
||||||
|
/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly
|
||||||
|
/// written into the ring without copying from a temporary location.
|
||||||
|
/// * Dropped sample notifications goes to the eBPF program as the return value of `reserve`/`output`,
|
||||||
|
/// and not the userspace reader. This might require extra code to handle, but allows for more
|
||||||
|
/// flexible schemes to handle dropped samples.
|
||||||
|
///
|
||||||
|
/// To receive events you need to:
|
||||||
|
/// * call [`RingBuf::try_from`]
|
||||||
|
/// * poll the returned [`RingBuf`] to be notified when events are inserted in the buffer
|
||||||
|
/// * call [`RingBuf::next`] to read the events
|
||||||
|
///
|
||||||
|
/// # Minimum kernel version
|
||||||
|
///
|
||||||
|
/// The minimum kernel version required to use this feature is 5.8.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// See the integration tests for usage examples. The ring_buf test demonstrates busy
|
||||||
|
/// waiting to read from the RingBuf, and the ring_buf_async test demonstrates how to
|
||||||
|
/// utilize the AsyncFd to recieve notifications.
|
||||||
|
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
|
||||||
|
pub struct RingBuf<T> {
|
||||||
|
_map: T,
|
||||||
|
map_fd: i32,
|
||||||
|
data_ptr: *const u8,
|
||||||
|
consumer_pos_ptr: *const AtomicUsize,
|
||||||
|
producer_pos_ptr: *const AtomicUsize,
|
||||||
|
// A copy of `*producer_pos_ptr` to reduce cache line contention.
|
||||||
|
// Might be stale, and should be refreshed once the consumer position has caught up.
|
||||||
|
producer_pos_cache: usize,
|
||||||
|
page_size: usize,
|
||||||
|
mask: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: core::borrow::Borrow<MapData>> RingBuf<T> {
|
||||||
|
pub(crate) fn new(map: T) -> Result<Self, MapError> {
|
||||||
|
let data: &MapData = map.borrow();
|
||||||
|
|
||||||
|
// Determine page_size, map_fd, and set mask to map size - 1
|
||||||
|
let page_size = crate::util::page_size();
|
||||||
|
let map_fd = data.fd_or_err().map_err(MapError::from)?;
|
||||||
|
let mask = (data.obj.max_entries() - 1) as usize;
|
||||||
|
|
||||||
|
// Map writable consumer page
|
||||||
|
let consumer_page = unsafe {
|
||||||
|
mmap(
|
||||||
|
ptr::null_mut(),
|
||||||
|
page_size,
|
||||||
|
PROT_READ | PROT_WRITE,
|
||||||
|
MAP_SHARED,
|
||||||
|
map_fd,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if consumer_page == MAP_FAILED {
|
||||||
|
return Err(MapError::SyscallError {
|
||||||
|
call: "mmap".to_string(),
|
||||||
|
io_error: io::Error::last_os_error(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// From kernel/bpf/ringbuf.c:
|
||||||
|
// Each data page is mapped twice to allow "virtual"
|
||||||
|
// continuous read of samples wrapping around the end of ring
|
||||||
|
// buffer area:
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | meta pages | real data pages | same data pages |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | | TA DA | TA DA |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// ^^^^^^^
|
||||||
|
// |
|
||||||
|
// Here, no need to worry about special handling of wrapped-around
|
||||||
|
// data due to double-mapped data pages. This works both in kernel and
|
||||||
|
// when mmap()'ed in user-space, simplifying both kernel and
|
||||||
|
// user-space implementations significantly.
|
||||||
|
let producer_pages = unsafe {
|
||||||
|
mmap(
|
||||||
|
ptr::null_mut(),
|
||||||
|
page_size + 2 * (mask + 1),
|
||||||
|
PROT_READ,
|
||||||
|
MAP_SHARED,
|
||||||
|
map_fd,
|
||||||
|
page_size as _,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if producer_pages == MAP_FAILED {
|
||||||
|
return Err(MapError::SyscallError {
|
||||||
|
call: "mmap".to_string(),
|
||||||
|
io_error: io::Error::last_os_error(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(RingBuf {
|
||||||
|
_map: map,
|
||||||
|
map_fd,
|
||||||
|
data_ptr: unsafe { (producer_pages as *mut u8).add(page_size) },
|
||||||
|
consumer_pos_ptr: consumer_page as *mut _,
|
||||||
|
producer_pos_ptr: producer_pages as *mut _,
|
||||||
|
producer_pos_cache: 0,
|
||||||
|
page_size,
|
||||||
|
mask,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RingBuf<T> {
|
||||||
|
/// Try to take a new entry from the ringbuf.
|
||||||
|
///
|
||||||
|
/// Returns `Some(item)` if the ringbuf is not empty.
|
||||||
|
/// Returns `None` if the ringbuf is empty, in which case the caller may register for
|
||||||
|
/// availability notifications through `epoll` or other APIs.
|
||||||
|
// This is a streaming iterator which is not viable without GATs (stabilized in 1.65).
|
||||||
|
#[allow(clippy::should_implement_trait)]
|
||||||
|
pub fn next(&mut self) -> Option<RingBufItem<T>> {
|
||||||
|
// If `cb()` is true, do a memory barrier and test again if it's really true.
|
||||||
|
// Returns true if both tests returns true.
|
||||||
|
fn confirm_with_mb(mut cb: impl FnMut() -> bool) -> bool {
|
||||||
|
cb() && {
|
||||||
|
fence(Ordering::SeqCst);
|
||||||
|
cb()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Consumer pos is written by *us*. This means that we'll load the same value regardless
|
||||||
|
// of the `Ordering`.
|
||||||
|
let consumer_pos = unsafe { (*self.consumer_pos_ptr).load(Ordering::Relaxed) };
|
||||||
|
|
||||||
|
#[allow(clippy::blocks_in_if_conditions)] // Meaning is clearer this way
|
||||||
|
// Have we caught up?
|
||||||
|
if consumer_pos == self.producer_pos_cache {
|
||||||
|
// Cache might be stale, so test again. First, test without a costly memory barrier.
|
||||||
|
// If that says we have caught up, do a memory barrier to ensure the previous write
|
||||||
|
// is visible and test again.
|
||||||
|
//
|
||||||
|
// The memory barrier is necessary before committing to sleep due to possible race
|
||||||
|
// condition: when the kernel writes n+2, see the consumer index n, while we write
|
||||||
|
// n+1 and see the producer index n+1. If we then sleep, we'll never be waken up
|
||||||
|
// because the kernel think we haven't caught up.
|
||||||
|
if confirm_with_mb(|| {
|
||||||
|
self.producer_pos_cache =
|
||||||
|
unsafe { (*self.producer_pos_ptr).load(Ordering::Acquire) };
|
||||||
|
consumer_pos == self.producer_pos_cache
|
||||||
|
}) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let sample_head = unsafe { self.data_ptr.add(consumer_pos & self.mask) };
|
||||||
|
let mut len_and_flags = 0; // Dummy value
|
||||||
|
|
||||||
|
// For reasons same as above, re-test with memory barrier before committing to sleep.
|
||||||
|
#[allow(clippy::blocks_in_if_conditions)]
|
||||||
|
if confirm_with_mb(|| {
|
||||||
|
len_and_flags =
|
||||||
|
unsafe { (*(sample_head as *mut AtomicU32)).load(Ordering::Acquire) };
|
||||||
|
(len_and_flags & BPF_RINGBUF_BUSY_BIT) != 0
|
||||||
|
}) {
|
||||||
|
return None;
|
||||||
|
} else if (len_and_flags & BPF_RINGBUF_DISCARD_BIT) != 0 {
|
||||||
|
self.consume();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(RingBufItem(self))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(&mut self) {
|
||||||
|
let consumer_pos = unsafe { (*self.consumer_pos_ptr).load(Ordering::Relaxed) };
|
||||||
|
let sample_head = unsafe { self.data_ptr.add(consumer_pos & self.mask) };
|
||||||
|
let len_and_flags = unsafe { (*(sample_head as *mut AtomicU32)).load(Ordering::Relaxed) };
|
||||||
|
assert_eq!(
|
||||||
|
(len_and_flags & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
|
||||||
|
let new_consumer_pos = consumer_pos + roundup_len(len_and_flags) as usize;
|
||||||
|
unsafe {
|
||||||
|
(*self.consumer_pos_ptr).store(new_consumer_pos, Ordering::Release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for RingBuf<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if !self.consumer_pos_ptr.is_null() {
|
||||||
|
// SAFETY: `consumer_pos` is not null and consumer page is not null and
|
||||||
|
// consumer page was mapped with size `self.page_size`
|
||||||
|
unsafe { munmap(self.consumer_pos_ptr as *mut _, self.page_size) };
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.producer_pos_ptr.is_null() {
|
||||||
|
// SAFETY: `producer_pos` is not null and producer pages were mapped with size
|
||||||
|
// `self.page_size + 2 * (self.mask + 1)`
|
||||||
|
unsafe {
|
||||||
|
munmap(
|
||||||
|
self.producer_pos_ptr as *mut _,
|
||||||
|
self.page_size + 2 * (self.mask + 1),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> AsRawFd for RingBuf<T> {
|
||||||
|
fn as_raw_fd(&self) -> RawFd {
|
||||||
|
self.map_fd
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An ringbuf item. When this item is dropped, the consumer index in the ringbuf will be updated.
|
||||||
|
pub struct RingBufItem<'a, T>(&'a mut RingBuf<T>);
|
||||||
|
|
||||||
|
impl<'a, T> Deref for RingBufItem<'a, T> {
|
||||||
|
type Target = [u8];
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
let consumer_pos = unsafe { (*self.0.consumer_pos_ptr).load(Ordering::Relaxed) };
|
||||||
|
let sample_head = unsafe { self.0.data_ptr.add(consumer_pos & self.0.mask) };
|
||||||
|
let len_and_flags = unsafe { (*(sample_head as *mut AtomicU32)).load(Ordering::Relaxed) };
|
||||||
|
assert_eq!(
|
||||||
|
(len_and_flags & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
|
||||||
|
// Coerce the sample into a &[u8]
|
||||||
|
let sample_ptr = unsafe { sample_head.add(BPF_RINGBUF_HDR_SZ as usize) };
|
||||||
|
unsafe { std::slice::from_raw_parts(sample_ptr, len_and_flags as usize) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Drop for RingBufItem<'a, T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.consume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Round up a `len` to the nearest 8 byte alignment, adding BPF_RINGBUF_HDR_SZ and
|
||||||
|
/// clearing out the upper two bits of `len`.
|
||||||
|
fn roundup_len(mut len: u32) -> u32 {
|
||||||
|
const LEN_MASK: u32 = !(BPF_RINGBUF_DISCARD_BIT | BPF_RINGBUF_BUSY_BIT);
|
||||||
|
// clear out the upper two bits (busy and discard)
|
||||||
|
len &= LEN_MASK;
|
||||||
|
// add the size of the header prefix
|
||||||
|
len += BPF_RINGBUF_HDR_SZ;
|
||||||
|
// round to up to next multiple of 8
|
||||||
|
(len + 7) & !7
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{roundup_len, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_roundup_len() {
|
||||||
|
// should always round up to nearest 8 byte alignment + BPF_RINGBUF_HDR_SZ
|
||||||
|
assert_eq!(roundup_len(0), BPF_RINGBUF_HDR_SZ);
|
||||||
|
assert_eq!(roundup_len(1), BPF_RINGBUF_HDR_SZ + 8);
|
||||||
|
assert_eq!(roundup_len(8), BPF_RINGBUF_HDR_SZ + 8);
|
||||||
|
assert_eq!(roundup_len(9), BPF_RINGBUF_HDR_SZ + 16);
|
||||||
|
// should discard the upper two bits of len
|
||||||
|
assert_eq!(
|
||||||
|
roundup_len(0 | (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)),
|
||||||
|
BPF_RINGBUF_HDR_SZ
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,159 @@
|
|||||||
|
use core::{
|
||||||
|
cell::UnsafeCell,
|
||||||
|
mem,
|
||||||
|
mem::MaybeUninit,
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF},
|
||||||
|
helpers::{
|
||||||
|
bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve,
|
||||||
|
bpf_ringbuf_submit,
|
||||||
|
},
|
||||||
|
maps::PinningType,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct RingBuf {
|
||||||
|
def: UnsafeCell<bpf_map_def>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Sync for RingBuf {}
|
||||||
|
|
||||||
|
/// A ring buffer entry, returned from [`RingBuf::reserve`].
|
||||||
|
///
|
||||||
|
/// You must [`submit`] or [`discard`] this entry before this gets dropped.
|
||||||
|
///
|
||||||
|
/// [`submit`]: RingBufEntry::submit
|
||||||
|
/// [`discard`]: RingBufEntry::discard
|
||||||
|
#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"]
|
||||||
|
pub struct RingBufEntry<T: 'static>(&'static mut MaybeUninit<T>);
|
||||||
|
|
||||||
|
impl<T> Deref for RingBufEntry<T> {
|
||||||
|
type Target = MaybeUninit<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for RingBufEntry<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RingBufEntry<T> {
|
||||||
|
/// Discard this ring buffer entry. The entry will be skipped by the userspace reader.
|
||||||
|
pub fn discard(self, flags: u64) {
|
||||||
|
unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) };
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Commit this ring buffer entry. The entry will be made visible to the userspace reader.
|
||||||
|
pub fn submit(self, flags: u64) {
|
||||||
|
unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RingBuf {
|
||||||
|
/// Declare a BPF ring buffer.
|
||||||
|
///
|
||||||
|
/// `byte_size` should be a power-of-2 multiple of the page size. If it is not, it will
|
||||||
|
/// be coerced to the next largest valid size when the program is loaded..
|
||||||
|
pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self {
|
||||||
|
Self::new(byte_size, flags, PinningType::None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Declare a pinned BPF ring buffer.
|
||||||
|
///
|
||||||
|
/// `byte_size` should be a power-of-2 multiple of the page size. If it is not, it will
|
||||||
|
/// be coerced to the next largest valid size when the program is loaded..
|
||||||
|
pub const fn pinned(byte_size: u32, flags: u32) -> Self {
|
||||||
|
Self::new(byte_size, flags, PinningType::ByName)
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self {
|
||||||
|
Self {
|
||||||
|
def: UnsafeCell::new(bpf_map_def {
|
||||||
|
type_: BPF_MAP_TYPE_RINGBUF,
|
||||||
|
key_size: 0,
|
||||||
|
value_size: 0,
|
||||||
|
max_entries: byte_size,
|
||||||
|
map_flags: flags,
|
||||||
|
id: 0,
|
||||||
|
pinning: pinning_type as u32,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reserve memory in the ring buffer that can fit `T`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the ring buffer is full, or a reference to the allocated memory if the
|
||||||
|
/// allocation succeeds.
|
||||||
|
///
|
||||||
|
/// If the return value is not None, you must commit or discard the reserved entry through a
|
||||||
|
/// call to [`RingBufEntry::submit`] or [`RingBufEntry::discard`].
|
||||||
|
///
|
||||||
|
/// `T` must be aligned to 1, 2, 4 or 8 bytes; it's not possible to fulfill larger alignment
|
||||||
|
/// requests. If you use this with a `T` that isn't properly aligned, this function will
|
||||||
|
/// be compiled to a panic and silently make your eBPF program fail to load.
|
||||||
|
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||||
|
// The reserved pointer may be null, which we handle with an Option.
|
||||||
|
// We also need to ensure that the returned pointer is of a proper sized allocation and
|
||||||
|
// satisfies T's alignment requirements.
|
||||||
|
// Finally, cast it to an MaybeUninit as creating a reference to uninitialized memory is UB.
|
||||||
|
|
||||||
|
// ringbuf allocations are aligned to 8 bytes (hardcoded in kernel code).
|
||||||
|
assert!(8 % mem::align_of::<T>() == 0);
|
||||||
|
|
||||||
|
let ptr = unsafe {
|
||||||
|
bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::<T>() as _, flags)
|
||||||
|
as *mut MaybeUninit<T>
|
||||||
|
};
|
||||||
|
match ptr.is_null() {
|
||||||
|
true => None,
|
||||||
|
false => Some(RingBufEntry(unsafe { &mut *ptr })),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Copy `data` to the ring buffer output.
|
||||||
|
///
|
||||||
|
/// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a
|
||||||
|
/// redundant allocation on and a copy from the stack.
|
||||||
|
///
|
||||||
|
/// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to
|
||||||
|
/// create in eBPF but still possible, e.g. by slicing an array).
|
||||||
|
///
|
||||||
|
/// `T` must be aligned to 1, 2, 4 or 8 bytes; it's not possible to fulfill larger alignment
|
||||||
|
/// requests. If you use this with a `T` that isn't properly aligned, this function will
|
||||||
|
/// be compiled to a panic and silently make your eBPF program fail to load.
|
||||||
|
///
|
||||||
|
/// [`reserve`]: RingBuf::reserve
|
||||||
|
/// [`submit`]: RingBufEntry::submit
|
||||||
|
pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> {
|
||||||
|
// See `reserve` for alignment requirements.
|
||||||
|
assert!(8 % mem::align_of_val(data) == 0);
|
||||||
|
|
||||||
|
let ret = unsafe {
|
||||||
|
bpf_ringbuf_output(
|
||||||
|
self.def.get() as *mut _,
|
||||||
|
data as *const _ as *mut _,
|
||||||
|
mem::size_of_val(data) as _,
|
||||||
|
flags,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if ret < 0 {
|
||||||
|
Err(ret)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query various information about the ring buffer.
|
||||||
|
///
|
||||||
|
/// Consult `bpf_ringbuf_query` documentation for a list of allowed flags.
|
||||||
|
pub fn query(&self, flags: u64) -> u64 {
|
||||||
|
unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) }
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,40 @@
|
|||||||
|
#![no_std]
|
||||||
|
#![no_main]
|
||||||
|
|
||||||
|
use aya_bpf::{
|
||||||
|
macros::{map, uprobe},
|
||||||
|
maps::RingBuf,
|
||||||
|
programs::ProbeContext,
|
||||||
|
};
|
||||||
|
use core::mem::size_of;
|
||||||
|
|
||||||
|
// Make a buffer large enough to hold MAX_ENTRIES entries at the same time.
|
||||||
|
// This requires taking into consideration the header size.
|
||||||
|
type Entry = u64;
|
||||||
|
const MAX_ENTRIES: usize = 1024;
|
||||||
|
const HDR_SIZE: usize = aya_bpf::bindings::BPF_RINGBUF_HDR_SZ as usize;
|
||||||
|
|
||||||
|
// Add 1 because the capacity is actually one less than you might think
|
||||||
|
// because the consumer_pos and producer_pos being equal would mean that
|
||||||
|
// the buffer is empty.
|
||||||
|
const RING_BUF_SIZE: usize = ((size_of::<Entry>() + HDR_SIZE) * MAX_ENTRIES) + 1;
|
||||||
|
|
||||||
|
#[map]
|
||||||
|
static RING_BUF: RingBuf = RingBuf::with_byte_size(RING_BUF_SIZE as u32, 0);
|
||||||
|
|
||||||
|
#[uprobe]
|
||||||
|
pub fn ring_buf_test(ctx: ProbeContext) {
|
||||||
|
// Write the first argument to the function back out to RING_BUF.
|
||||||
|
let Some(arg): Option<Entry> = ctx.arg(0) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if let Some(mut entry) = RING_BUF.reserve::<Entry>(0) {
|
||||||
|
entry.write(arg);
|
||||||
|
entry.submit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[panic_handler]
|
||||||
|
fn panic(_info: &core::panic::PanicInfo) -> ! {
|
||||||
|
loop {}
|
||||||
|
}
|
@ -0,0 +1,56 @@
|
|||||||
|
use aya::{include_bytes_aligned, maps::ring_buf::RingBuf, programs::UProbe, Bpf};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ring_buf() {
|
||||||
|
let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf");
|
||||||
|
let mut bpf = Bpf::load(bytes).unwrap();
|
||||||
|
let ring_buf = bpf.take_map("RING_BUF").unwrap();
|
||||||
|
let mut ring_buf = RingBuf::try_from(ring_buf).unwrap();
|
||||||
|
|
||||||
|
let prog: &mut UProbe = bpf
|
||||||
|
.program_mut("ring_buf_test")
|
||||||
|
.unwrap()
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
prog.load().unwrap();
|
||||||
|
prog.attach(
|
||||||
|
Some("ring_buf_trigger_ebpf_program"),
|
||||||
|
0,
|
||||||
|
"/proc/self/exe",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Generate some random data.
|
||||||
|
let data = gen_data();
|
||||||
|
|
||||||
|
// Call the function that the uprobe is attached to with randomly generated data.
|
||||||
|
for val in &data {
|
||||||
|
ring_buf_trigger_ebpf_program(*val);
|
||||||
|
}
|
||||||
|
// Read the data back out of the ring buffer.
|
||||||
|
let mut seen = Vec::<u64>::new();
|
||||||
|
while seen.len() < data.len() {
|
||||||
|
if let Some(item) = ring_buf.next() {
|
||||||
|
let item: [u8; 8] = (*item).try_into().unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(item);
|
||||||
|
seen.push(arg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Ensure that the data that was read matches what was passed.
|
||||||
|
assert_eq!(seen, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
#[inline(never)]
|
||||||
|
pub extern "C" fn ring_buf_trigger_ebpf_program(_arg: u64) {}
|
||||||
|
|
||||||
|
/// Generate a variable length vector of u64s. The number of values is always small enough to fit
|
||||||
|
/// into the RING_BUF defined in the probe.
|
||||||
|
pub(crate) fn gen_data() -> Vec<u64> {
|
||||||
|
const DATA_LEN_RANGE: core::ops::RangeInclusive<usize> = 1..=1024;
|
||||||
|
use rand::Rng as _;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let n = rng.gen_range(DATA_LEN_RANGE);
|
||||||
|
std::iter::repeat_with(|| rng.gen()).take(n).collect()
|
||||||
|
}
|
@ -0,0 +1,72 @@
|
|||||||
|
use std::os::fd::AsRawFd as _;
|
||||||
|
|
||||||
|
use aya::maps::RingBuf;
|
||||||
|
|
||||||
|
mod ring_buf;
|
||||||
|
use aya::{include_bytes_aligned, programs::UProbe, Bpf};
|
||||||
|
use ring_buf::{gen_data, ring_buf_trigger_ebpf_program};
|
||||||
|
use tokio::{
|
||||||
|
io::unix::AsyncFd,
|
||||||
|
task::spawn,
|
||||||
|
time::{sleep, Duration},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ring_buf_async() {
|
||||||
|
let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf");
|
||||||
|
let mut bpf = Bpf::load(bytes).unwrap();
|
||||||
|
let ring_buf = bpf.take_map("RING_BUF").unwrap();
|
||||||
|
let mut ring_buf = RingBuf::try_from(ring_buf).unwrap();
|
||||||
|
|
||||||
|
let prog: &mut UProbe = bpf
|
||||||
|
.program_mut("ring_buf_test")
|
||||||
|
.unwrap()
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
prog.load().unwrap();
|
||||||
|
prog.attach(
|
||||||
|
Some("ring_buf_trigger_ebpf_program"),
|
||||||
|
0,
|
||||||
|
"/proc/self/exe",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Generate some random data.
|
||||||
|
let data = gen_data();
|
||||||
|
let write_handle = spawn(call_ring_buf_trigger_ebpf_program_over_time(data.clone()));
|
||||||
|
|
||||||
|
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||||
|
let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap();
|
||||||
|
let seen = {
|
||||||
|
let mut seen = Vec::with_capacity(data.len());
|
||||||
|
while seen.len() < data.len() {
|
||||||
|
// Wait for readiness, then clear the bit before reading so that no notifications
|
||||||
|
// are missed.
|
||||||
|
async_fd.readable().await.unwrap().clear_ready();
|
||||||
|
while let Some(data) = ring_buf.next() {
|
||||||
|
let data: [u8; 8] = (*data).try_into().unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(data);
|
||||||
|
seen.push(arg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
seen
|
||||||
|
};
|
||||||
|
|
||||||
|
// Ensure that the data that was read matches what was passed.
|
||||||
|
assert_eq!(seen, data);
|
||||||
|
write_handle.await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn call_ring_buf_trigger_ebpf_program_over_time(data: Vec<u64>) {
|
||||||
|
let random_duration = || {
|
||||||
|
use rand::Rng as _;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let micros = rng.gen_range(0..1_000);
|
||||||
|
Duration::from_micros(micros)
|
||||||
|
};
|
||||||
|
for value in data {
|
||||||
|
sleep(random_duration()).await;
|
||||||
|
ring_buf_trigger_ebpf_program(value);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue