mirror of https://github.com/aya-rs/aya
aya: Implement RingBuf
This implements the userspace binding for RingBuf. Instead of streaming the samples as heap buffers, the process_ring function takes a callback to which we pass the event's byte region, roughly following [libbpf]'s API design. This avoids a copy and allows marking the consumer pointer in a timely manner. [libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c Additionally, integration tests are added to demonstrate the usage of the new APIs and to ensure that they work end-to-end. Co-authored-by: William Findlay <william@williamfindlay.com> Co-authored-by: Tatsuyuki Ishi <ishitatsuyuki@gmail.com>reviewable/pr629/r59
parent
613927a391
commit
038c630ec1
@ -0,0 +1,385 @@
|
|||||||
|
//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs.
|
||||||
|
//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF
|
||||||
|
//! programs to userspace.
|
||||||
|
//!
|
||||||
|
//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ},
|
||||||
|
maps::{MapData, MapError},
|
||||||
|
sys::mmap,
|
||||||
|
};
|
||||||
|
use libc::{c_int, c_void, munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
|
||||||
|
use std::{
|
||||||
|
fmt::Debug,
|
||||||
|
io,
|
||||||
|
ops::Deref,
|
||||||
|
os::fd::{AsRawFd, BorrowedFd, RawFd},
|
||||||
|
ptr,
|
||||||
|
ptr::NonNull,
|
||||||
|
sync::atomic::{AtomicU32, AtomicUsize, Ordering},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A map that can be used to receive events from eBPF programs.
|
||||||
|
///
|
||||||
|
/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways:
|
||||||
|
/// * It's shared across all CPUs, which allows a strong ordering between events.
|
||||||
|
/// * Data notifications are delivered precisely instead of being sampled for every N events; the
|
||||||
|
/// eBPF program can also control notification delivery if sampling is desired for performance
|
||||||
|
/// reasons. By default, a notification will be sent if the consumer is caught up at the time of
|
||||||
|
/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to
|
||||||
|
/// control this behavior.
|
||||||
|
/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly
|
||||||
|
/// written into the ring without copying from a temporary location.
|
||||||
|
/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`,
|
||||||
|
/// and not the userspace reader. This might require extra code to handle, but allows for more
|
||||||
|
/// flexible schemes to handle dropped samples.
|
||||||
|
///
|
||||||
|
/// To receive events you need to:
|
||||||
|
/// * Construct [`RingBuf`] using [`RingBuf::try_from`].
|
||||||
|
/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`].
|
||||||
|
///
|
||||||
|
/// To receive async notifications of data availability, you may construct an
|
||||||
|
/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness.
|
||||||
|
///
|
||||||
|
/// # Minimum kernel version
|
||||||
|
///
|
||||||
|
/// The minimum kernel version required to use this feature is 5.8.
|
||||||
|
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
|
||||||
|
pub struct RingBuf<T> {
|
||||||
|
_map: T,
|
||||||
|
map_fd: i32,
|
||||||
|
consumer: ConsumerPos,
|
||||||
|
producer: ProducerData,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: core::borrow::Borrow<MapData>> RingBuf<T> {
|
||||||
|
pub(crate) fn new(map: T) -> Result<Self, MapError> {
|
||||||
|
let data: &MapData = map.borrow();
|
||||||
|
let page_size = crate::util::page_size();
|
||||||
|
let map_fd = data.fd;
|
||||||
|
let byte_size = data.obj.max_entries();
|
||||||
|
let consumer = ConsumerPos::new(map_fd, 0, page_size)?;
|
||||||
|
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
|
||||||
|
Ok(Self {
|
||||||
|
_map: map,
|
||||||
|
map_fd,
|
||||||
|
consumer,
|
||||||
|
producer,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RingBuf<T> {
|
||||||
|
/// Try to take a new entry from the ringbuf.
|
||||||
|
///
|
||||||
|
/// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in
|
||||||
|
/// which case the caller may register for availability notifications through `epoll` or other
|
||||||
|
/// APIs. Only one RingBufItem may be outstanding at a time.
|
||||||
|
//
|
||||||
|
// This is not an implementation of `Iterator` because we need to be able to refer to the
|
||||||
|
// lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item leveraged GATs,
|
||||||
|
// one could imagine an implementation of `Iterator` that would work. GATs are stabilized in
|
||||||
|
// Rust 1.65, but there's not yet a trait that the community seems to have standardized around.
|
||||||
|
#[allow(clippy::should_implement_trait)]
|
||||||
|
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
|
||||||
|
let Self {
|
||||||
|
consumer, producer, ..
|
||||||
|
} = self;
|
||||||
|
producer.next(consumer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Access to the RawFd can be used to construct an AsyncFd for use with epoll.
|
||||||
|
impl<T> AsRawFd for RingBuf<T> {
|
||||||
|
fn as_raw_fd(&self) -> RawFd {
|
||||||
|
self.map_fd
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The current outstanding item read from the ringbuf.
|
||||||
|
pub struct RingBufItem<'a> {
|
||||||
|
data: &'a [u8],
|
||||||
|
consumer: &'a mut ConsumerPos,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for RingBufItem<'_> {
|
||||||
|
type Target = [u8];
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
let Self { data, .. } = self;
|
||||||
|
data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for RingBufItem<'_> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let Self { consumer, data } = self;
|
||||||
|
consumer.consume(data.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for RingBufItem<'_> {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let Self { data, consumer } = self;
|
||||||
|
// In general Relaxed here is sufficient, for debugging, it certainly is.
|
||||||
|
let offset = consumer.as_ref().load(Ordering::Relaxed);
|
||||||
|
f.debug_struct("RingBufItem")
|
||||||
|
.field("offset", &offset)
|
||||||
|
.field("len", &data.len())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ConsumerPos(MMap);
|
||||||
|
|
||||||
|
impl ConsumerPos {
|
||||||
|
fn new(fd: RawFd, offset: usize, page_size: usize) -> Result<Self, MapError> {
|
||||||
|
Ok(Self(MMap::new(
|
||||||
|
fd,
|
||||||
|
page_size,
|
||||||
|
PROT_READ | PROT_WRITE,
|
||||||
|
MAP_SHARED,
|
||||||
|
offset.try_into().unwrap(),
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write operation needs to be properly ordered with respect to the producer committing new
|
||||||
|
// data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer
|
||||||
|
// reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst
|
||||||
|
// here we ensure that either a subsequent read by the consumer to consume messages will see
|
||||||
|
// an available message, or the producer in the kernel will see the updated consumer offset
|
||||||
|
// that is caught up.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L487-L488
|
||||||
|
// [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494
|
||||||
|
const WRITE_ORDERING: Ordering = Ordering::SeqCst;
|
||||||
|
|
||||||
|
fn consume(&mut self, len: usize) -> usize {
|
||||||
|
self.as_ref()
|
||||||
|
.fetch_add(Self::compute_increment(len), Self::WRITE_ORDERING)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_offset(&mut self, prev_offset: usize, len: usize) -> usize {
|
||||||
|
let offset = prev_offset + Self::compute_increment(len);
|
||||||
|
self.as_ref().store(offset, Self::WRITE_ORDERING);
|
||||||
|
offset
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compute_increment(len: usize) -> usize {
|
||||||
|
// TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized.
|
||||||
|
fn next_multiple_of(n: usize, multiple: usize) -> usize {
|
||||||
|
match n % multiple {
|
||||||
|
0 => n,
|
||||||
|
rem => n + (multiple - rem),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
next_multiple_of(len + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(), 8)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<AtomicUsize> for ConsumerPos {
|
||||||
|
fn as_ref(&self) -> &AtomicUsize {
|
||||||
|
let Self(MMap { ptr, .. }) = self;
|
||||||
|
unsafe { ptr.cast::<AtomicUsize>().as_ref() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ProducerData {
|
||||||
|
mmap: MMap,
|
||||||
|
|
||||||
|
// Offset in the mmap where the data starts.
|
||||||
|
data_offset: usize,
|
||||||
|
|
||||||
|
// A cache of the value of the producer position. It is used to avoid re-reading the producer
|
||||||
|
// position when we know there is more data to consume.
|
||||||
|
pos_cache: usize,
|
||||||
|
|
||||||
|
// A bitmask which truncates u32 values to the domain of valid offsets in the ringbuf.
|
||||||
|
mask: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProducerData {
|
||||||
|
fn new(fd: RawFd, offset: usize, page_size: usize, byte_size: u32) -> Result<Self, MapError> {
|
||||||
|
// The producer pages have one page of metadata and then the data pages, all mapped
|
||||||
|
// read-only. Note that the length of the mapping includes the data pages twice as the
|
||||||
|
// kernel will map them two time consecutively to avoid special handling of entries that
|
||||||
|
// cross over the end of the ring buffer.
|
||||||
|
//
|
||||||
|
// The kernel diagram below shows the layout of the ring buffer. It references "meta pages",
|
||||||
|
// but we only map exactly one producer meta page read-only. The consumer meta page is mapped
|
||||||
|
// read-write elsewhere, and is taken into consideration via the offset parameter.
|
||||||
|
//
|
||||||
|
// From kernel/bpf/ringbuf.c[0]:
|
||||||
|
//
|
||||||
|
// Each data page is mapped twice to allow "virtual"
|
||||||
|
// continuous read of samples wrapping around the end of ring
|
||||||
|
// buffer area:
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | meta pages | real data pages | same data pages |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | | TA DA | TA DA |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// ^^^^^^^
|
||||||
|
// |
|
||||||
|
// Here, no need to worry about special handling of wrapped-around
|
||||||
|
// data due to double-mapped data pages. This works both in kernel and
|
||||||
|
// when mmap()'ed in user-space, simplifying both kernel and
|
||||||
|
// user-space implementations significantly.
|
||||||
|
//
|
||||||
|
// [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124
|
||||||
|
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
|
||||||
|
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
|
||||||
|
|
||||||
|
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
|
||||||
|
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
|
||||||
|
debug_assert!(byte_size.is_power_of_two());
|
||||||
|
let mask = byte_size - 1;
|
||||||
|
Ok(Self {
|
||||||
|
mmap,
|
||||||
|
data_offset: page_size,
|
||||||
|
pos_cache: 0,
|
||||||
|
mask,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
|
||||||
|
let Self {
|
||||||
|
ref mmap,
|
||||||
|
data_offset,
|
||||||
|
pos_cache,
|
||||||
|
mask,
|
||||||
|
} = self;
|
||||||
|
let pos = unsafe { mmap.ptr.cast().as_ref() };
|
||||||
|
let data: &[u8] = &mmap.as_ref()[*data_offset..];
|
||||||
|
|
||||||
|
// Load our consumer position. Only this program writes this position, and this object is not
|
||||||
|
// Send, so relaxed is sufficient.
|
||||||
|
let mut consumer_pos = consumer.as_ref().load(Ordering::Relaxed);
|
||||||
|
while data_available(pos, pos_cache, consumer_pos) {
|
||||||
|
match read_item(data, *mask, consumer_pos) {
|
||||||
|
Item::Busy => return None,
|
||||||
|
Item::Discard { len } => consumer_pos = consumer.set_offset(consumer_pos, len),
|
||||||
|
Item::Data(data) => return Some(RingBufItem { data, consumer }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
|
||||||
|
enum Item<'a> {
|
||||||
|
Busy,
|
||||||
|
Discard { len: usize },
|
||||||
|
Data(&'a [u8]),
|
||||||
|
}
|
||||||
|
|
||||||
|
fn data_available(producer: &AtomicUsize, cache: &mut usize, consumer: usize) -> bool {
|
||||||
|
if consumer == *cache {
|
||||||
|
// This value is written using Release by the kernel [1], and should be read with
|
||||||
|
// Acquire to ensure that the prior writes to the entry header are visible.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448
|
||||||
|
*cache = producer.load(Ordering::Acquire);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that we don't compare the order of the values because the producer position may
|
||||||
|
// overflow u32 and wrap around to 0. Instead we just compare equality and assume that
|
||||||
|
// the consumer position is always logically less than the producer position.
|
||||||
|
//
|
||||||
|
// Note also that the kernel, at the time of writing [1], doesn't seem to handle this
|
||||||
|
// overflow correctly at all, and it's not clear that one can produce events after the
|
||||||
|
// producer position has wrapped around.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
|
||||||
|
consumer != *cache
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_item(data: &[u8], mask: u32, offset: usize) -> Item<'_> {
|
||||||
|
let offset = offset & usize::try_from(mask).unwrap();
|
||||||
|
let header_ptr = data
|
||||||
|
.get(offset..offset + core::mem::size_of::<AtomicU32>())
|
||||||
|
.expect("offset out of bounds")
|
||||||
|
.as_ptr() as *const AtomicU32;
|
||||||
|
// Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This
|
||||||
|
// ensures data written by the producer will be visible.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488
|
||||||
|
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
|
||||||
|
if header & BPF_RINGBUF_BUSY_BIT != 0 {
|
||||||
|
Item::Busy
|
||||||
|
} else {
|
||||||
|
let len = usize::try_from(header & mask).unwrap();
|
||||||
|
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
|
||||||
|
Item::Discard { len }
|
||||||
|
} else {
|
||||||
|
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
|
||||||
|
Item::Data(&data[data_offset..data_offset + len])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MMap corresponds to a memory-mapped region.
|
||||||
|
//
|
||||||
|
// The data is unmapped in Drop.
|
||||||
|
struct MMap {
|
||||||
|
ptr: NonNull<c_void>,
|
||||||
|
len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MMap {
|
||||||
|
fn new(
|
||||||
|
fd: RawFd,
|
||||||
|
len: usize,
|
||||||
|
prot: c_int,
|
||||||
|
flags: c_int,
|
||||||
|
offset: off_t,
|
||||||
|
) -> Result<Self, MapError> {
|
||||||
|
match unsafe {
|
||||||
|
mmap(
|
||||||
|
ptr::null_mut(),
|
||||||
|
len,
|
||||||
|
prot,
|
||||||
|
flags,
|
||||||
|
BorrowedFd::borrow_raw(fd),
|
||||||
|
offset,
|
||||||
|
)
|
||||||
|
} {
|
||||||
|
MAP_FAILED => Err(Into::into(crate::sys::SyscallError {
|
||||||
|
call: "mmap",
|
||||||
|
io_error: io::Error::last_os_error(),
|
||||||
|
})),
|
||||||
|
ptr => Ok(Self {
|
||||||
|
ptr: std::ptr::NonNull::new(ptr)
|
||||||
|
.ok_or(
|
||||||
|
// This should never happen, but to be paranoid, and so we never need to talk
|
||||||
|
// about a null pointer, we check it anyway.
|
||||||
|
crate::sys::SyscallError {
|
||||||
|
call: "mmap",
|
||||||
|
io_error: io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
"mmap returned null pointer",
|
||||||
|
),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.map_err(MapError::from)?,
|
||||||
|
len,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8]> for MMap {
|
||||||
|
fn as_ref(&self) -> &[u8] {
|
||||||
|
let Self { ptr, len } = self;
|
||||||
|
unsafe { std::slice::from_raw_parts(ptr.as_ptr().cast(), *len) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for MMap {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let Self { ptr, len } = *self;
|
||||||
|
unsafe { munmap(ptr.as_ptr(), len) };
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,163 @@
|
|||||||
|
use core::{
|
||||||
|
cell::UnsafeCell,
|
||||||
|
mem,
|
||||||
|
mem::MaybeUninit,
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "const_assert")]
|
||||||
|
use const_assert::{Assert, IsTrue};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF},
|
||||||
|
helpers::{
|
||||||
|
bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve,
|
||||||
|
bpf_ringbuf_submit,
|
||||||
|
},
|
||||||
|
maps::PinningType,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct RingBuf {
|
||||||
|
def: UnsafeCell<bpf_map_def>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Sync for RingBuf {}
|
||||||
|
|
||||||
|
/// A ring buffer entry, returned from [`RingBuf::reserve`].
|
||||||
|
///
|
||||||
|
/// You must [`submit`] or [`discard`] this entry before it gets dropped.
|
||||||
|
///
|
||||||
|
/// [`submit`]: RingBufEntry::submit
|
||||||
|
/// [`discard`]: RingBufEntry::discard
|
||||||
|
#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"]
|
||||||
|
pub struct RingBufEntry<T: 'static>(&'static mut MaybeUninit<T>);
|
||||||
|
|
||||||
|
impl<T> Deref for RingBufEntry<T> {
|
||||||
|
type Target = MaybeUninit<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for RingBufEntry<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RingBufEntry<T> {
|
||||||
|
/// Discard this ring buffer entry. The entry will be skipped by the userspace reader.
|
||||||
|
pub fn discard(self, flags: u64) {
|
||||||
|
unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) };
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Commit this ring buffer entry. The entry will be made visible to the userspace reader.
|
||||||
|
pub fn submit(self, flags: u64) {
|
||||||
|
unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RingBuf {
|
||||||
|
/// Declare a BPF ring buffer.
|
||||||
|
///
|
||||||
|
/// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size.
|
||||||
|
/// The loading program may coerce the size when loading the map.
|
||||||
|
pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self {
|
||||||
|
Self::new(byte_size, flags, PinningType::None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Declare a pinned BPF ring buffer.
|
||||||
|
///
|
||||||
|
/// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size.
|
||||||
|
/// The loading program may coerce the size when loading the map.
|
||||||
|
pub const fn pinned(byte_size: u32, flags: u32) -> Self {
|
||||||
|
Self::new(byte_size, flags, PinningType::ByName)
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self {
|
||||||
|
Self {
|
||||||
|
def: UnsafeCell::new(bpf_map_def {
|
||||||
|
type_: BPF_MAP_TYPE_RINGBUF,
|
||||||
|
key_size: 0,
|
||||||
|
value_size: 0,
|
||||||
|
max_entries: byte_size,
|
||||||
|
map_flags: flags,
|
||||||
|
id: 0,
|
||||||
|
pinning: pinning_type as u32,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reserve memory in the ring buffer that can fit `T`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the ring buffer is full.
|
||||||
|
#[cfg(feature = "const_assert")]
|
||||||
|
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>>
|
||||||
|
where
|
||||||
|
Assert<{ 8 % core::mem::align_of::<T>() == 0 }>: IsTrue,
|
||||||
|
{
|
||||||
|
self.reserve_impl(flags)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reserve memory in the ring buffer that can fit `T`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the ring buffer is full.
|
||||||
|
///
|
||||||
|
/// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger
|
||||||
|
/// alignment requests. If you use this with a `T` that isn't properly aligned, this function will
|
||||||
|
/// be compiled to a panic and silently make your eBPF program fail to load.
|
||||||
|
/// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418).
|
||||||
|
#[cfg(not(feature = "const_assert"))]
|
||||||
|
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||||
|
assert_eq!(8 % core::mem::align_of::<T>(), 0);
|
||||||
|
self.reserve_impl(flags)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reserve_impl<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||||
|
let ptr = unsafe {
|
||||||
|
bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::<T>() as _, flags)
|
||||||
|
} as *mut MaybeUninit<T>;
|
||||||
|
unsafe { ptr.as_mut() }.map(|ptr| RingBufEntry(ptr))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Copy `data` to the ring buffer output.
|
||||||
|
///
|
||||||
|
/// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a
|
||||||
|
/// copy from either a map buffer or the stack.
|
||||||
|
///
|
||||||
|
/// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to
|
||||||
|
/// create in eBPF but still possible, e.g. by slicing an array).
|
||||||
|
///
|
||||||
|
/// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger
|
||||||
|
/// alignment requests. If you use this with a `T` that isn't properly aligned, this function will
|
||||||
|
/// be compiled to a panic and silently make your eBPF program fail to load.
|
||||||
|
/// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418).
|
||||||
|
///
|
||||||
|
/// [`reserve`]: RingBuf::reserve
|
||||||
|
/// [`submit`]: RingBufEntry::submit
|
||||||
|
pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> {
|
||||||
|
assert_eq!(8 % core::mem::align_of_val(data), 0);
|
||||||
|
let ret = unsafe {
|
||||||
|
bpf_ringbuf_output(
|
||||||
|
self.def.get() as *mut _,
|
||||||
|
data as *const _ as *mut _,
|
||||||
|
mem::size_of_val(data) as _,
|
||||||
|
flags,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if ret < 0 {
|
||||||
|
Err(ret)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query various information about the ring buffer.
|
||||||
|
///
|
||||||
|
/// Consult `bpf_ringbuf_query` documentation for a list of allowed flags.
|
||||||
|
pub fn query(&self, flags: u64) -> u64 {
|
||||||
|
unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) }
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
#![no_std]
|
||||||
|
#![no_main]
|
||||||
|
|
||||||
|
use aya_bpf::{
|
||||||
|
macros::{map, uprobe},
|
||||||
|
maps::{PerCpuArray, RingBuf},
|
||||||
|
programs::ProbeContext,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[map]
|
||||||
|
static RING_BUF: RingBuf = RingBuf::with_byte_size(0, 0);
|
||||||
|
|
||||||
|
// This structure's definition is duplicated in userspace.
|
||||||
|
#[repr(C)]
|
||||||
|
struct Registers {
|
||||||
|
dropped: u64,
|
||||||
|
rejected: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs
|
||||||
|
// without needing synchronization. Atomics exist [1], but aren't exposed.
|
||||||
|
//
|
||||||
|
// [1]: https://lwn.net/Articles/838884/
|
||||||
|
#[map]
|
||||||
|
static REGISTERS: PerCpuArray<Registers> = PerCpuArray::with_max_entries(1, 0);
|
||||||
|
|
||||||
|
#[uprobe]
|
||||||
|
pub fn ring_buf_test(ctx: ProbeContext) {
|
||||||
|
let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) {
|
||||||
|
Some(regs) => unsafe { &mut *regs },
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
let mut entry = match RING_BUF.reserve::<u64>(0) {
|
||||||
|
Some(entry) => entry,
|
||||||
|
None => {
|
||||||
|
*dropped += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Write the first argument to the function back out to RING_BUF if it is even,
|
||||||
|
// otherwise increment the counter in REJECTED. This exercises discarding data.
|
||||||
|
let arg: u64 = match ctx.arg(0) {
|
||||||
|
Some(arg) => arg,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
if arg % 2 == 0 {
|
||||||
|
entry.write(arg);
|
||||||
|
entry.submit(0);
|
||||||
|
} else {
|
||||||
|
*rejected += 1;
|
||||||
|
entry.discard(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
#[panic_handler]
|
||||||
|
fn panic(_info: &core::panic::PanicInfo) -> ! {
|
||||||
|
loop {}
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
#include <vmlinux.h>
|
||||||
|
#include <bpf/bpf_helpers.h>
|
||||||
|
|
||||||
|
char _license[] SEC("license") = "GPL";
|
||||||
|
|
||||||
|
struct
|
||||||
|
{
|
||||||
|
__uint(type, BPF_MAP_TYPE_RINGBUF);
|
||||||
|
__uint(max_entries, 256 * 1024 /* 256 KB */);
|
||||||
|
} rb SEC(".maps");
|
||||||
|
|
||||||
|
// This probe writes a zero to rb every time the sched_switch tracepoint is hit.
|
||||||
|
SEC("tracepoint/sched/sched_switch")
|
||||||
|
int sched_switch(struct switch_args* ctx)
|
||||||
|
{
|
||||||
|
unsigned long long e = 0;
|
||||||
|
bpf_ringbuf_output(&rb, &e, sizeof(e), 0);
|
||||||
|
return 0;
|
||||||
|
}
|
@ -0,0 +1,396 @@
|
|||||||
|
use anyhow::Context as _;
|
||||||
|
use assert_matches::assert_matches;
|
||||||
|
use aya::{
|
||||||
|
maps::{array::PerCpuArray, ring_buf::RingBuf, MapData},
|
||||||
|
programs::{TracePoint, UProbe},
|
||||||
|
Bpf, BpfLoader, Btf, Pod,
|
||||||
|
};
|
||||||
|
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
|
||||||
|
use core::panic;
|
||||||
|
use rand::Rng as _;
|
||||||
|
use std::os::fd::AsRawFd as _;
|
||||||
|
use tokio::{
|
||||||
|
io::unix::AsyncFd,
|
||||||
|
time::{sleep, Duration},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Generate a variable length vector of u64s.
|
||||||
|
struct RingBufTest {
|
||||||
|
_bpf: Bpf,
|
||||||
|
ring_buf: RingBuf<MapData>,
|
||||||
|
regs: PerCpuArray<MapData, Registers>,
|
||||||
|
data: Vec<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// This structure's definition is duplicated in the probe.
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
|
||||||
|
struct Registers {
|
||||||
|
dropped: u64,
|
||||||
|
rejected: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl core::ops::Add for Registers {
|
||||||
|
type Output = Self;
|
||||||
|
fn add(self, rhs: Self) -> Self::Output {
|
||||||
|
Self {
|
||||||
|
dropped: self.dropped + rhs.dropped,
|
||||||
|
rejected: self.rejected + rhs.rejected,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> std::iter::Sum<&'a Registers> for Registers {
|
||||||
|
fn sum<I: Iterator<Item = &'a Registers>>(iter: I) -> Self {
|
||||||
|
iter.fold(Default::default(), |a, b| a + *b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Pod for Registers {}
|
||||||
|
|
||||||
|
// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
|
||||||
|
// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
|
||||||
|
// that's not the case because the actual size will be rounded up, and fewer entries will be dropped
|
||||||
|
// than expected.
|
||||||
|
const RING_BUF_MAX_ENTRIES: usize = 512;
|
||||||
|
|
||||||
|
impl RingBufTest {
|
||||||
|
fn new() -> Self {
|
||||||
|
const RING_BUF_BYTE_SIZE: u32 = (RING_BUF_MAX_ENTRIES
|
||||||
|
* (core::mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize))
|
||||||
|
as u32;
|
||||||
|
|
||||||
|
// Use the loader API to control the size of the ring_buf.
|
||||||
|
let mut bpf = BpfLoader::new()
|
||||||
|
.btf(Some(Btf::from_sys_fs().unwrap()).as_ref())
|
||||||
|
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
|
||||||
|
.load(crate::RING_BUF)
|
||||||
|
.unwrap();
|
||||||
|
let ring_buf = bpf.take_map("RING_BUF").unwrap();
|
||||||
|
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
|
||||||
|
let regs = bpf.take_map("REGISTERS").unwrap();
|
||||||
|
let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
|
||||||
|
let prog: &mut UProbe = bpf
|
||||||
|
.program_mut("ring_buf_test")
|
||||||
|
.unwrap()
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
prog.load().unwrap();
|
||||||
|
prog.attach(
|
||||||
|
Some("ring_buf_trigger_ebpf_program"),
|
||||||
|
0,
|
||||||
|
"/proc/self/exe",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let data = {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
// Generate more entries than there is space so we can test dropping entries.
|
||||||
|
let n = rng.gen_range(1..=RING_BUF_MAX_ENTRIES * 2);
|
||||||
|
std::iter::repeat_with(|| rng.gen()).take(n).collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
Self {
|
||||||
|
_bpf: bpf,
|
||||||
|
ring_buf,
|
||||||
|
regs,
|
||||||
|
data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ring_buf() {
|
||||||
|
let RingBufTest {
|
||||||
|
ring_buf,
|
||||||
|
ref regs,
|
||||||
|
ref data,
|
||||||
|
..
|
||||||
|
} = &mut RingBufTest::new();
|
||||||
|
|
||||||
|
// Note that after expected_capacity has been submitted, reserve calls in the probe will fail
|
||||||
|
// and the probe will give up.
|
||||||
|
let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
|
||||||
|
|
||||||
|
// Call the function that the uprobe is attached to with randomly generated data.
|
||||||
|
let mut expected = Vec::new();
|
||||||
|
let mut expected_rejected = 0u64;
|
||||||
|
let mut expected_dropped = 0u64;
|
||||||
|
for (i, &v) in data.iter().enumerate() {
|
||||||
|
ring_buf_trigger_ebpf_program(v);
|
||||||
|
if i >= expected_capacity {
|
||||||
|
expected_dropped += 1;
|
||||||
|
} else if v % 2 == 0 {
|
||||||
|
expected.push(v);
|
||||||
|
} else {
|
||||||
|
expected_rejected += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut seen = Vec::<u64>::new();
|
||||||
|
while seen.len() < expected.len() {
|
||||||
|
if let Some(read) = ring_buf.next() {
|
||||||
|
let read: [u8; 8] = (*read)
|
||||||
|
.try_into()
|
||||||
|
.with_context(|| format!("data: {:?}", read.len()))
|
||||||
|
.unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(read);
|
||||||
|
assert_eq!(arg % 2, 0, "got {arg} from probe");
|
||||||
|
seen.push(arg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that there is nothing else in the ring_buf.
|
||||||
|
assert_matches!(ring_buf.next(), None);
|
||||||
|
|
||||||
|
// Ensure that the data that was read matches what was passed, and the rejected count was set
|
||||||
|
// properly.
|
||||||
|
assert_eq!(seen, expected);
|
||||||
|
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||||
|
assert_eq!(dropped, expected_dropped);
|
||||||
|
assert_eq!(rejected, expected_rejected);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
#[inline(never)]
|
||||||
|
pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
|
||||||
|
std::hint::black_box(arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test differs from the other async test in that it's possible for the producer
|
||||||
|
// to fill the ring_buf. We just ensure that the number of events we see is sane given
|
||||||
|
// what the producer sees, and that the logic does not hang. This exercises interleaving
|
||||||
|
// discards, successful commits, and drops due to the ring_buf being full.
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn ring_buf_async_with_drops() {
|
||||||
|
let RingBufTest {
|
||||||
|
ring_buf,
|
||||||
|
ref regs,
|
||||||
|
ref data,
|
||||||
|
_bpf,
|
||||||
|
} = &mut RingBufTest::new();
|
||||||
|
|
||||||
|
let raw_fd = ring_buf.as_raw_fd();
|
||||||
|
let async_fd = AsyncFd::with_interest(raw_fd, tokio::io::Interest::READABLE).unwrap();
|
||||||
|
|
||||||
|
// Spwan the writer which internally will spawn many parallel writers.
|
||||||
|
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||||
|
let mut seen = 0;
|
||||||
|
let mut process_ring_buf = || {
|
||||||
|
while let Some(read) = ring_buf.next() {
|
||||||
|
let read: [u8; 8] = (*read)
|
||||||
|
.try_into()
|
||||||
|
.with_context(|| format!("data: {:?}", read.len()))
|
||||||
|
.unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(read);
|
||||||
|
assert_eq!(arg % 2, 0, "got {arg} from probe");
|
||||||
|
seen += 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
use futures::future::{
|
||||||
|
select,
|
||||||
|
Either::{Left, Right},
|
||||||
|
};
|
||||||
|
let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
|
||||||
|
tokio::spawn(async {
|
||||||
|
for value in v {
|
||||||
|
ring_buf_trigger_ebpf_program(value);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
let readable = {
|
||||||
|
let mut writer = writer;
|
||||||
|
loop {
|
||||||
|
let readable = Box::pin(async_fd.readable());
|
||||||
|
writer = match select(readable, writer).await {
|
||||||
|
Left((guard, writer)) => {
|
||||||
|
let mut guard = guard.unwrap();
|
||||||
|
process_ring_buf();
|
||||||
|
guard.clear_ready();
|
||||||
|
writer
|
||||||
|
}
|
||||||
|
Right((writer, readable)) => {
|
||||||
|
writer.unwrap();
|
||||||
|
break readable;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If there's more to read, we should receive a readiness notification in a timely manner.
|
||||||
|
// If we don't then, then assert that there's nothing else to read. Note that it's important
|
||||||
|
// to wait some time before attempting to read, otherwise we may catch up with the producer
|
||||||
|
// before epoll has an opportunity to send a notification; our consumer thread can race
|
||||||
|
// with the kernel epoll check.
|
||||||
|
let sleep_fut = sleep(Duration::from_millis(10));
|
||||||
|
tokio::pin!(sleep_fut);
|
||||||
|
match select(sleep_fut, readable).await {
|
||||||
|
Left(((), _)) => {
|
||||||
|
assert_matches!(ring_buf.next(), None);
|
||||||
|
}
|
||||||
|
Right((guard, _)) => {
|
||||||
|
process_ring_buf();
|
||||||
|
guard.unwrap().clear_ready();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let max_dropped: u64 = u64::try_from(
|
||||||
|
data.len()
|
||||||
|
.checked_sub(RING_BUF_MAX_ENTRIES - 1)
|
||||||
|
.unwrap_or_default(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
|
||||||
|
let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
|
||||||
|
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||||
|
let total = u64::try_from(data.len()).unwrap();
|
||||||
|
let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
|
||||||
|
let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
|
||||||
|
let facts = format!(
|
||||||
|
"seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
|
||||||
|
max_rejected={max_rejected}, max_dropped={max_dropped}",
|
||||||
|
);
|
||||||
|
assert_eq!(seen + rejected + dropped, total, "{facts}",);
|
||||||
|
assert!(
|
||||||
|
(0u64..=max_dropped).contains(&dropped),
|
||||||
|
"dropped={dropped} not in 0..={max_dropped}; {facts}",
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
(min_rejected..=max_rejected).contains(&rejected),
|
||||||
|
"rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
(min_seen..=max_seen).contains(&seen),
|
||||||
|
"seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn ring_buf_async_no_drop() {
|
||||||
|
let RingBufTest {
|
||||||
|
ring_buf,
|
||||||
|
ref regs,
|
||||||
|
ref data,
|
||||||
|
_bpf,
|
||||||
|
} = &mut RingBufTest::new();
|
||||||
|
|
||||||
|
let writer = {
|
||||||
|
let data = data.to_owned();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
for value in data {
|
||||||
|
// Sleep a tad so we feel confident that the consumer will keep up
|
||||||
|
// and no messages will be dropped.
|
||||||
|
let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10));
|
||||||
|
sleep(dur).await;
|
||||||
|
ring_buf_trigger_ebpf_program(value);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||||
|
let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap();
|
||||||
|
// Note that unlike in the synchronous case where all of the entries are written before any of
|
||||||
|
// them are read, in this case we expect all of the entries to make their way to userspace
|
||||||
|
// because entries are being consumed as they are produced.
|
||||||
|
let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
|
||||||
|
let expected_len = expected.len();
|
||||||
|
let reader = async {
|
||||||
|
let mut seen = Vec::with_capacity(expected_len);
|
||||||
|
while seen.len() < expected_len {
|
||||||
|
let mut guard = async_fd.readable().await.unwrap();
|
||||||
|
while let Some(read) = ring_buf.next() {
|
||||||
|
let read: [u8; 8] = (*read)
|
||||||
|
.try_into()
|
||||||
|
.with_context(|| format!("data: {:?}", read.len()))
|
||||||
|
.unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(read);
|
||||||
|
seen.push(arg);
|
||||||
|
}
|
||||||
|
guard.clear_ready();
|
||||||
|
}
|
||||||
|
seen
|
||||||
|
};
|
||||||
|
let (writer, seen) = futures::future::join(writer, reader).await;
|
||||||
|
writer.unwrap();
|
||||||
|
|
||||||
|
// Make sure that there is nothing else in the ring_buf.
|
||||||
|
assert_matches!(ring_buf.next(), None);
|
||||||
|
|
||||||
|
// Ensure that the data that was read matches what was passed.
|
||||||
|
assert_eq!(&seen, &expected);
|
||||||
|
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||||
|
assert_eq!(dropped, 0);
|
||||||
|
assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
// This can take a long time on a VM with a single CPU so we reduce the number of events
|
||||||
|
// when built in musl, which is a proxy for the VM configuration.
|
||||||
|
#[cfg(target_env = "musl")]
|
||||||
|
const SCHED_EVENTS: u32 = 10_000;
|
||||||
|
|
||||||
|
#[cfg(not(target_env = "musl"))]
|
||||||
|
const SCHED_EVENTS: u32 = 1_000_000;
|
||||||
|
|
||||||
|
// This test reproduces a bug where the ring buffer would not be notified of new entries if the
|
||||||
|
// state was not properly synchronized between the producer and consumer. This would result in the
|
||||||
|
// consumer never being woken up and the test hanging.
|
||||||
|
#[test]
|
||||||
|
fn ring_buf_epoll_wakeup() {
|
||||||
|
let mut bpf = Bpf::load(crate::RING_BUF_SCHED_TRACEPOINT).unwrap();
|
||||||
|
let rb = bpf.take_map("rb").unwrap();
|
||||||
|
let mut rb = RingBuf::try_from(rb).unwrap();
|
||||||
|
let prog: &mut TracePoint = bpf.program_mut("sched_switch").unwrap().try_into().unwrap();
|
||||||
|
prog.load().unwrap();
|
||||||
|
prog.attach("sched", "sched_switch").unwrap();
|
||||||
|
|
||||||
|
let epoll_fd = epoll::create(false).unwrap();
|
||||||
|
epoll::ctl(
|
||||||
|
epoll_fd,
|
||||||
|
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||||
|
rb.as_raw_fd(),
|
||||||
|
// The use of EPOLLET is intentional. Without it, level-triggering would result in
|
||||||
|
// more notifications, and would mask the underlying bug this test reproduced when
|
||||||
|
// the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
|
||||||
|
// AsyncFd always uses this flag (as demonstrated in the subsequent test).
|
||||||
|
epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
|
||||||
|
let mut total_events = 0;
|
||||||
|
while total_events < SCHED_EVENTS {
|
||||||
|
epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
|
||||||
|
let mut events_after_wake = 0;
|
||||||
|
while let Some(read) = rb.next() {
|
||||||
|
assert_eq!(read.len(), 8);
|
||||||
|
events_after_wake += 1;
|
||||||
|
total_events += 1;
|
||||||
|
}
|
||||||
|
assert_ne!(events_after_wake, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ring_buf_asyncfd_events() {
|
||||||
|
let mut bpf = Bpf::load(crate::RING_BUF_SCHED_TRACEPOINT).unwrap();
|
||||||
|
let rb = bpf.take_map("rb").unwrap();
|
||||||
|
let mut rb = RingBuf::try_from(rb).unwrap();
|
||||||
|
let prog: &mut TracePoint = bpf.program_mut("sched_switch").unwrap().try_into().unwrap();
|
||||||
|
prog.load().unwrap();
|
||||||
|
prog.attach("sched", "sched_switch").unwrap();
|
||||||
|
|
||||||
|
let async_fd = AsyncFd::new(rb.as_raw_fd()).unwrap();
|
||||||
|
let mut total_events = 0;
|
||||||
|
while total_events < SCHED_EVENTS {
|
||||||
|
let mut guard = async_fd.readable().await.unwrap();
|
||||||
|
let mut events_after_wake = 0;
|
||||||
|
while let Some(read) = rb.next() {
|
||||||
|
assert_eq!(read.len(), 8);
|
||||||
|
events_after_wake += 1;
|
||||||
|
total_events += 1;
|
||||||
|
}
|
||||||
|
guard.clear_ready();
|
||||||
|
assert_ne!(events_after_wake, 0);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue