mirror of https://github.com/aya-rs/aya
Merge pull request #629 from ajwerner/ringbuf
commit
62849944f2
@ -0,0 +1,458 @@
|
|||||||
|
//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs.
|
||||||
|
//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF
|
||||||
|
//! programs to userspace.
|
||||||
|
//!
|
||||||
|
//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
borrow::Borrow,
|
||||||
|
ffi::{c_int, c_void},
|
||||||
|
fmt::{self, Debug, Formatter},
|
||||||
|
io, mem,
|
||||||
|
ops::Deref,
|
||||||
|
os::fd::{AsFd as _, AsRawFd, BorrowedFd, RawFd},
|
||||||
|
ptr,
|
||||||
|
ptr::NonNull,
|
||||||
|
slice,
|
||||||
|
sync::atomic::{AtomicU32, AtomicUsize, Ordering},
|
||||||
|
};
|
||||||
|
|
||||||
|
use libc::{munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ},
|
||||||
|
maps::{MapData, MapError},
|
||||||
|
sys::{mmap, SyscallError},
|
||||||
|
util::page_size,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A map that can be used to receive events from eBPF programs.
|
||||||
|
///
|
||||||
|
/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways:
|
||||||
|
/// * It's shared across all CPUs, which allows a strong ordering between events.
|
||||||
|
/// * Data notifications are delivered precisely instead of being sampled for every N events; the
|
||||||
|
/// eBPF program can also control notification delivery if sampling is desired for performance
|
||||||
|
/// reasons. By default, a notification will be sent if the consumer is caught up at the time of
|
||||||
|
/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to
|
||||||
|
/// control this behavior.
|
||||||
|
/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly
|
||||||
|
/// written into the ring without copying from a temporary location.
|
||||||
|
/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`,
|
||||||
|
/// and not the userspace reader. This might require extra code to handle, but allows for more
|
||||||
|
/// flexible schemes to handle dropped samples.
|
||||||
|
///
|
||||||
|
/// To receive events you need to:
|
||||||
|
/// * Construct [`RingBuf`] using [`RingBuf::try_from`].
|
||||||
|
/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`].
|
||||||
|
///
|
||||||
|
/// To receive async notifications of data availability, you may construct an
|
||||||
|
/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness.
|
||||||
|
///
|
||||||
|
/// # Minimum kernel version
|
||||||
|
///
|
||||||
|
/// The minimum kernel version required to use this feature is 5.8.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// # struct PollFd<T>(T);
|
||||||
|
/// # fn poll_fd<T>(t: T) -> PollFd<T> { PollFd(t) }
|
||||||
|
/// # impl<T> PollFd<T> {
|
||||||
|
/// # fn readable(&mut self) -> Guard<'_, T> { Guard(self) }
|
||||||
|
/// # }
|
||||||
|
/// # struct Guard<'a, T>(&'a mut PollFd<T>);
|
||||||
|
/// # impl<T> Guard<'_, T> {
|
||||||
|
/// # fn inner_mut(&mut self) -> &mut T {
|
||||||
|
/// # let Guard(PollFd(t)) = self;
|
||||||
|
/// # t
|
||||||
|
/// # }
|
||||||
|
/// # fn clear_ready(&mut self) {}
|
||||||
|
/// # }
|
||||||
|
/// # let bpf = aya::Bpf::load(&[])?;
|
||||||
|
/// use aya::maps::RingBuf;
|
||||||
|
/// use std::convert::TryFrom;
|
||||||
|
///
|
||||||
|
/// let ring_buf = RingBuf::try_from(bpf.map_mut("ARRAY")?)?;
|
||||||
|
/// let poll = poll_fd(ring_buf);
|
||||||
|
/// loop {
|
||||||
|
/// let mut guard = poll.readable()?;
|
||||||
|
/// let ring_buf = guard.inner_mut()
|
||||||
|
/// while let Some(item) = ring_buf.next() {
|
||||||
|
/// println!("Received: {:?}", item);
|
||||||
|
/// }
|
||||||
|
/// guard.clear_ready();
|
||||||
|
/// }
|
||||||
|
/// # Ok::<(), aya::BpfError>(())
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// # Polling
|
||||||
|
///
|
||||||
|
/// In the example above the implementations of poll(), poll.readable(), guard.inner_mut(), and
|
||||||
|
/// guard.clear_ready() are not given. RingBuf implements the AsRawFd trait, so you can implement
|
||||||
|
/// polling using any crate that can poll file descriptors, like epoll, mio etc. The above example
|
||||||
|
/// API is motivated by that of [`tokio::io::unix::AsyncFd`].
|
||||||
|
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
|
||||||
|
pub struct RingBuf<T> {
|
||||||
|
map: T,
|
||||||
|
consumer: ConsumerPos,
|
||||||
|
producer: ProducerData,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Borrow<MapData>> RingBuf<T> {
|
||||||
|
pub(crate) fn new(map: T) -> Result<Self, MapError> {
|
||||||
|
let data: &MapData = map.borrow();
|
||||||
|
let page_size = page_size();
|
||||||
|
let map_fd = data.fd().as_fd();
|
||||||
|
let byte_size = data.obj.max_entries();
|
||||||
|
let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?;
|
||||||
|
let consumer = ConsumerPos::new(consumer_metadata);
|
||||||
|
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
|
||||||
|
Ok(Self {
|
||||||
|
map,
|
||||||
|
consumer,
|
||||||
|
producer,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RingBuf<T> {
|
||||||
|
/// Try to take a new entry from the ringbuf.
|
||||||
|
///
|
||||||
|
/// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in
|
||||||
|
/// which case the caller may register for availability notifications through `epoll` or other
|
||||||
|
/// APIs. Only one RingBufItem may be outstanding at a time.
|
||||||
|
//
|
||||||
|
// This is not an implementation of `Iterator` because we need to be able to refer to the
|
||||||
|
// lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item leveraged GATs,
|
||||||
|
// one could imagine an implementation of `Iterator` that would work. GATs are stabilized in
|
||||||
|
// Rust 1.65, but there's not yet a trait that the community seems to have standardized around.
|
||||||
|
#[allow(clippy::should_implement_trait)]
|
||||||
|
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
|
||||||
|
let Self {
|
||||||
|
consumer, producer, ..
|
||||||
|
} = self;
|
||||||
|
producer.next(consumer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Access to the RawFd can be used to construct an AsyncFd for use with epoll.
|
||||||
|
impl<T: Borrow<MapData>> AsRawFd for RingBuf<T> {
|
||||||
|
fn as_raw_fd(&self) -> RawFd {
|
||||||
|
let Self {
|
||||||
|
map,
|
||||||
|
consumer: _,
|
||||||
|
producer: _,
|
||||||
|
} = self;
|
||||||
|
map.borrow().fd().as_fd().as_raw_fd()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The current outstanding item read from the ringbuf.
|
||||||
|
pub struct RingBufItem<'a> {
|
||||||
|
data: &'a [u8],
|
||||||
|
consumer: &'a mut ConsumerPos,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for RingBufItem<'_> {
|
||||||
|
type Target = [u8];
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
let Self { data, .. } = self;
|
||||||
|
data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for RingBufItem<'_> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let Self { consumer, data } = self;
|
||||||
|
consumer.consume(data.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for RingBufItem<'_> {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||||
|
let Self {
|
||||||
|
data,
|
||||||
|
consumer:
|
||||||
|
ConsumerPos {
|
||||||
|
pos,
|
||||||
|
metadata: ConsumerMetadata { mmap: _ },
|
||||||
|
},
|
||||||
|
} = self;
|
||||||
|
// In general Relaxed here is sufficient, for debugging, it certainly is.
|
||||||
|
f.debug_struct("RingBufItem")
|
||||||
|
.field("pos", pos)
|
||||||
|
.field("len", &data.len())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ConsumerMetadata {
|
||||||
|
mmap: MMap,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConsumerMetadata {
|
||||||
|
fn new(fd: BorrowedFd<'_>, offset: usize, page_size: usize) -> Result<Self, MapError> {
|
||||||
|
let mmap = MMap::new(
|
||||||
|
fd,
|
||||||
|
page_size,
|
||||||
|
PROT_READ | PROT_WRITE,
|
||||||
|
MAP_SHARED,
|
||||||
|
offset.try_into().unwrap(),
|
||||||
|
)?;
|
||||||
|
Ok(Self { mmap })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<AtomicUsize> for ConsumerMetadata {
|
||||||
|
fn as_ref(&self) -> &AtomicUsize {
|
||||||
|
let Self {
|
||||||
|
mmap: MMap { ptr, .. },
|
||||||
|
} = self;
|
||||||
|
unsafe { ptr.cast::<AtomicUsize>().as_ref() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ConsumerPos {
|
||||||
|
pos: usize,
|
||||||
|
metadata: ConsumerMetadata,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConsumerPos {
|
||||||
|
fn new(metadata: ConsumerMetadata) -> Self {
|
||||||
|
// Load the initial value of the consumer position. SeqCst is used to be safe given we don't
|
||||||
|
// have any claims about memory synchronization performed by some previous writer.
|
||||||
|
let pos = metadata.as_ref().load(Ordering::SeqCst);
|
||||||
|
Self { pos, metadata }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(&mut self, len: usize) {
|
||||||
|
let Self { pos, metadata } = self;
|
||||||
|
|
||||||
|
// TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized.
|
||||||
|
fn next_multiple_of(n: usize, multiple: usize) -> usize {
|
||||||
|
match n % multiple {
|
||||||
|
0 => n,
|
||||||
|
rem => n + (multiple - rem),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*pos += next_multiple_of(usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len, 8);
|
||||||
|
|
||||||
|
// Write operation needs to be properly ordered with respect to the producer committing new
|
||||||
|
// data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer
|
||||||
|
// reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst
|
||||||
|
// here we ensure that either a subsequent read by the consumer to consume messages will see
|
||||||
|
// an available message, or the producer in the kernel will see the updated consumer offset
|
||||||
|
// that is caught up.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L487-L488
|
||||||
|
// [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494
|
||||||
|
metadata.as_ref().store(*pos, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ProducerData {
|
||||||
|
mmap: MMap,
|
||||||
|
|
||||||
|
// Offset in the mmap where the data starts.
|
||||||
|
data_offset: usize,
|
||||||
|
|
||||||
|
// A cache of the value of the producer position. It is used to avoid re-reading the producer
|
||||||
|
// position when we know there is more data to consume.
|
||||||
|
pos_cache: usize,
|
||||||
|
|
||||||
|
// A bitmask which truncates u32 values to the domain of valid offsets in the ringbuf.
|
||||||
|
mask: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProducerData {
|
||||||
|
fn new(
|
||||||
|
fd: BorrowedFd<'_>,
|
||||||
|
offset: usize,
|
||||||
|
page_size: usize,
|
||||||
|
byte_size: u32,
|
||||||
|
) -> Result<Self, MapError> {
|
||||||
|
// The producer pages have one page of metadata and then the data pages, all mapped
|
||||||
|
// read-only. Note that the length of the mapping includes the data pages twice as the
|
||||||
|
// kernel will map them two time consecutively to avoid special handling of entries that
|
||||||
|
// cross over the end of the ring buffer.
|
||||||
|
//
|
||||||
|
// The kernel diagram below shows the layout of the ring buffer. It references "meta pages",
|
||||||
|
// but we only map exactly one producer meta page read-only. The consumer meta page is mapped
|
||||||
|
// read-write elsewhere, and is taken into consideration via the offset parameter.
|
||||||
|
//
|
||||||
|
// From kernel/bpf/ringbuf.c[0]:
|
||||||
|
//
|
||||||
|
// Each data page is mapped twice to allow "virtual"
|
||||||
|
// continuous read of samples wrapping around the end of ring
|
||||||
|
// buffer area:
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | meta pages | real data pages | same data pages |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// | | TA DA | TA DA |
|
||||||
|
// ------------------------------------------------------
|
||||||
|
// ^^^^^^^
|
||||||
|
// |
|
||||||
|
// Here, no need to worry about special handling of wrapped-around
|
||||||
|
// data due to double-mapped data pages. This works both in kernel and
|
||||||
|
// when mmap()'ed in user-space, simplifying both kernel and
|
||||||
|
// user-space implementations significantly.
|
||||||
|
//
|
||||||
|
// [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124
|
||||||
|
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
|
||||||
|
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
|
||||||
|
|
||||||
|
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
|
||||||
|
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
|
||||||
|
debug_assert!(byte_size.is_power_of_two());
|
||||||
|
let mask = byte_size - 1;
|
||||||
|
Ok(Self {
|
||||||
|
mmap,
|
||||||
|
data_offset: page_size,
|
||||||
|
pos_cache: 0,
|
||||||
|
mask,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
|
||||||
|
let Self {
|
||||||
|
ref mmap,
|
||||||
|
data_offset,
|
||||||
|
pos_cache,
|
||||||
|
mask,
|
||||||
|
} = self;
|
||||||
|
let pos = unsafe { mmap.ptr.cast().as_ref() };
|
||||||
|
let mmap_data = mmap.as_ref();
|
||||||
|
let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
|
||||||
|
panic!(
|
||||||
|
"offset {} out of bounds, data len {}",
|
||||||
|
data_offset,
|
||||||
|
mmap_data.len()
|
||||||
|
)
|
||||||
|
});
|
||||||
|
while data_available(pos, pos_cache, consumer) {
|
||||||
|
match read_item(data_pages, *mask, consumer) {
|
||||||
|
Item::Busy => return None,
|
||||||
|
Item::Discard { len } => consumer.consume(len),
|
||||||
|
Item::Data(data) => return Some(RingBufItem { data, consumer }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
|
||||||
|
enum Item<'a> {
|
||||||
|
Busy,
|
||||||
|
Discard { len: usize },
|
||||||
|
Data(&'a [u8]),
|
||||||
|
}
|
||||||
|
|
||||||
|
fn data_available(
|
||||||
|
producer: &AtomicUsize,
|
||||||
|
cache: &mut usize,
|
||||||
|
consumer: &ConsumerPos,
|
||||||
|
) -> bool {
|
||||||
|
let ConsumerPos { pos: consumer, .. } = consumer;
|
||||||
|
if consumer == cache {
|
||||||
|
// This value is written using Release by the kernel [1], and should be read with
|
||||||
|
// Acquire to ensure that the prior writes to the entry header are visible.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448
|
||||||
|
*cache = producer.load(Ordering::Acquire);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that we don't compare the order of the values because the producer position may
|
||||||
|
// overflow u32 and wrap around to 0. Instead we just compare equality and assume that
|
||||||
|
// the consumer position is always logically less than the producer position.
|
||||||
|
//
|
||||||
|
// Note also that the kernel, at the time of writing [1], doesn't seem to handle this
|
||||||
|
// overflow correctly at all, and it's not clear that one can produce events after the
|
||||||
|
// producer position has wrapped around.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
|
||||||
|
consumer != cache
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
|
||||||
|
let ConsumerPos { pos, .. } = pos;
|
||||||
|
let offset = pos & usize::try_from(mask).unwrap();
|
||||||
|
let must_get_data = |offset, len| {
|
||||||
|
data.get(offset..offset + len).unwrap_or_else(|| {
|
||||||
|
panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())
|
||||||
|
})
|
||||||
|
};
|
||||||
|
let header_ptr =
|
||||||
|
must_get_data(offset, mem::size_of::<AtomicU32>()).as_ptr() as *const AtomicU32;
|
||||||
|
// Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This
|
||||||
|
// ensures data written by the producer will be visible.
|
||||||
|
//
|
||||||
|
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488
|
||||||
|
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
|
||||||
|
if header & BPF_RINGBUF_BUSY_BIT != 0 {
|
||||||
|
Item::Busy
|
||||||
|
} else {
|
||||||
|
let len = usize::try_from(header & mask).unwrap();
|
||||||
|
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
|
||||||
|
Item::Discard { len }
|
||||||
|
} else {
|
||||||
|
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
|
||||||
|
let data = must_get_data(data_offset, len);
|
||||||
|
Item::Data(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MMap corresponds to a memory-mapped region.
|
||||||
|
//
|
||||||
|
// The data is unmapped in Drop.
|
||||||
|
struct MMap {
|
||||||
|
ptr: NonNull<c_void>,
|
||||||
|
len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MMap {
|
||||||
|
fn new(
|
||||||
|
fd: BorrowedFd<'_>,
|
||||||
|
len: usize,
|
||||||
|
prot: c_int,
|
||||||
|
flags: c_int,
|
||||||
|
offset: off_t,
|
||||||
|
) -> Result<Self, MapError> {
|
||||||
|
match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } {
|
||||||
|
MAP_FAILED => Err(MapError::SyscallError(SyscallError {
|
||||||
|
call: "mmap",
|
||||||
|
io_error: io::Error::last_os_error(),
|
||||||
|
})),
|
||||||
|
ptr => Ok(Self {
|
||||||
|
ptr: ptr::NonNull::new(ptr).ok_or(
|
||||||
|
// This should never happen, but to be paranoid, and so we never need to talk
|
||||||
|
// about a null pointer, we check it anyway.
|
||||||
|
MapError::SyscallError(SyscallError {
|
||||||
|
call: "mmap",
|
||||||
|
io_error: io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
"mmap returned null pointer",
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
)?,
|
||||||
|
len,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8]> for MMap {
|
||||||
|
fn as_ref(&self) -> &[u8] {
|
||||||
|
let Self { ptr, len } = self;
|
||||||
|
unsafe { slice::from_raw_parts(ptr.as_ptr().cast(), *len) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for MMap {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let Self { ptr, len } = *self;
|
||||||
|
unsafe { munmap(ptr.as_ptr(), len) };
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,163 @@
|
|||||||
|
use core::{
|
||||||
|
cell::UnsafeCell,
|
||||||
|
mem,
|
||||||
|
mem::MaybeUninit,
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "const_assert")]
|
||||||
|
use const_assert::{Assert, IsTrue};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF},
|
||||||
|
helpers::{
|
||||||
|
bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve,
|
||||||
|
bpf_ringbuf_submit,
|
||||||
|
},
|
||||||
|
maps::PinningType,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct RingBuf {
|
||||||
|
def: UnsafeCell<bpf_map_def>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Sync for RingBuf {}
|
||||||
|
|
||||||
|
/// A ring buffer entry, returned from [`RingBuf::reserve`].
|
||||||
|
///
|
||||||
|
/// You must [`submit`] or [`discard`] this entry before it gets dropped.
|
||||||
|
///
|
||||||
|
/// [`submit`]: RingBufEntry::submit
|
||||||
|
/// [`discard`]: RingBufEntry::discard
|
||||||
|
#[must_use = "eBPF verifier requires ring buffer entries to be either submitted or discarded"]
|
||||||
|
pub struct RingBufEntry<T: 'static>(&'static mut MaybeUninit<T>);
|
||||||
|
|
||||||
|
impl<T> Deref for RingBufEntry<T> {
|
||||||
|
type Target = MaybeUninit<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for RingBufEntry<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RingBufEntry<T> {
|
||||||
|
/// Discard this ring buffer entry. The entry will be skipped by the userspace reader.
|
||||||
|
pub fn discard(self, flags: u64) {
|
||||||
|
unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) };
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Commit this ring buffer entry. The entry will be made visible to the userspace reader.
|
||||||
|
pub fn submit(self, flags: u64) {
|
||||||
|
unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RingBuf {
|
||||||
|
/// Declare an eBPF ring buffer.
|
||||||
|
///
|
||||||
|
/// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. The
|
||||||
|
/// loading program may coerce the size when loading the map.
|
||||||
|
pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self {
|
||||||
|
Self::new(byte_size, flags, PinningType::None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Declare a pinned eBPF ring buffer.
|
||||||
|
///
|
||||||
|
/// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. The
|
||||||
|
/// loading program may coerce the size when loading the map.
|
||||||
|
pub const fn pinned(byte_size: u32, flags: u32) -> Self {
|
||||||
|
Self::new(byte_size, flags, PinningType::ByName)
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self {
|
||||||
|
Self {
|
||||||
|
def: UnsafeCell::new(bpf_map_def {
|
||||||
|
type_: BPF_MAP_TYPE_RINGBUF,
|
||||||
|
key_size: 0,
|
||||||
|
value_size: 0,
|
||||||
|
max_entries: byte_size,
|
||||||
|
map_flags: flags,
|
||||||
|
id: 0,
|
||||||
|
pinning: pinning_type as u32,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reserve memory in the ring buffer that can fit `T`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the ring buffer is full.
|
||||||
|
#[cfg(feature = "const_assert")]
|
||||||
|
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>>
|
||||||
|
where
|
||||||
|
Assert<{ 8 % mem::align_of::<T>() == 0 }>: IsTrue,
|
||||||
|
{
|
||||||
|
self.reserve_impl(flags)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reserve memory in the ring buffer that can fit `T`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the ring buffer is full.
|
||||||
|
///
|
||||||
|
/// The kernel will reserve memory at an 8-bytes aligned boundary, so `mem::align_of<T>()` must
|
||||||
|
/// be equal or smaller than 8. If you use this with a `T` that isn't properly aligned, this
|
||||||
|
/// function will be compiled to a panic; depending on your panic_handler, this may make
|
||||||
|
/// the eBPF program fail to load, or it may make it have undefined behavior.
|
||||||
|
#[cfg(not(feature = "const_assert"))]
|
||||||
|
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||||
|
assert_eq!(8 % mem::align_of::<T>(), 0);
|
||||||
|
self.reserve_impl(flags)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reserve_impl<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||||
|
let ptr = unsafe {
|
||||||
|
bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::<T>() as _, flags)
|
||||||
|
} as *mut MaybeUninit<T>;
|
||||||
|
unsafe { ptr.as_mut() }.map(|ptr| RingBufEntry(ptr))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Copy `data` to the ring buffer output.
|
||||||
|
///
|
||||||
|
/// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a
|
||||||
|
/// copy from either a map buffer or the stack.
|
||||||
|
///
|
||||||
|
/// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to
|
||||||
|
/// create in eBPF but still possible, e.g. by slicing an array).
|
||||||
|
///
|
||||||
|
/// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger
|
||||||
|
/// alignment requests. If you use this with a `T` that isn't properly aligned, this function will
|
||||||
|
/// be compiled to a panic and silently make your eBPF program fail to load.
|
||||||
|
/// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418).
|
||||||
|
///
|
||||||
|
/// [`reserve`]: RingBuf::reserve
|
||||||
|
/// [`submit`]: RingBufEntry::submit
|
||||||
|
pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> {
|
||||||
|
assert_eq!(8 % mem::align_of_val(data), 0);
|
||||||
|
let ret = unsafe {
|
||||||
|
bpf_ringbuf_output(
|
||||||
|
self.def.get() as *mut _,
|
||||||
|
data as *const _ as *mut _,
|
||||||
|
mem::size_of_val(data) as _,
|
||||||
|
flags,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
if ret < 0 {
|
||||||
|
Err(ret)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query various information about the ring buffer.
|
||||||
|
///
|
||||||
|
/// Consult `bpf_ringbuf_query` documentation for a list of allowed flags.
|
||||||
|
pub fn query(&self, flags: u64) -> u64 {
|
||||||
|
unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) }
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
#![no_std]
|
||||||
|
#![no_main]
|
||||||
|
|
||||||
|
use aya_bpf::{
|
||||||
|
macros::{map, uprobe},
|
||||||
|
maps::{PerCpuArray, RingBuf},
|
||||||
|
programs::ProbeContext,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[map]
|
||||||
|
static RING_BUF: RingBuf = RingBuf::with_byte_size(0, 0);
|
||||||
|
|
||||||
|
// This structure's definition is duplicated in userspace.
|
||||||
|
#[repr(C)]
|
||||||
|
struct Registers {
|
||||||
|
dropped: u64,
|
||||||
|
rejected: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs
|
||||||
|
// without needing synchronization. Atomics exist [1], but aren't exposed.
|
||||||
|
//
|
||||||
|
// [1]: https://lwn.net/Articles/838884/
|
||||||
|
#[map]
|
||||||
|
static REGISTERS: PerCpuArray<Registers> = PerCpuArray::with_max_entries(1, 0);
|
||||||
|
|
||||||
|
#[uprobe]
|
||||||
|
pub fn ring_buf_test(ctx: ProbeContext) {
|
||||||
|
let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) {
|
||||||
|
Some(regs) => unsafe { &mut *regs },
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
let mut entry = match RING_BUF.reserve::<u64>(0) {
|
||||||
|
Some(entry) => entry,
|
||||||
|
None => {
|
||||||
|
*dropped += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Write the first argument to the function back out to RING_BUF if it is even,
|
||||||
|
// otherwise increment the counter in REJECTED. This exercises discarding data.
|
||||||
|
let arg: u64 = match ctx.arg(0) {
|
||||||
|
Some(arg) => arg,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
if arg % 2 == 0 {
|
||||||
|
entry.write(arg);
|
||||||
|
entry.submit(0);
|
||||||
|
} else {
|
||||||
|
*rejected += 1;
|
||||||
|
entry.discard(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
#[panic_handler]
|
||||||
|
fn panic(_info: &core::panic::PanicInfo) -> ! {
|
||||||
|
loop {}
|
||||||
|
}
|
@ -0,0 +1,441 @@
|
|||||||
|
use std::{
|
||||||
|
mem,
|
||||||
|
os::fd::AsRawFd as _,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Context as _;
|
||||||
|
use assert_matches::assert_matches;
|
||||||
|
use aya::{
|
||||||
|
maps::{array::PerCpuArray, ring_buf::RingBuf, MapData},
|
||||||
|
programs::UProbe,
|
||||||
|
Bpf, BpfLoader, Pod,
|
||||||
|
};
|
||||||
|
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
|
||||||
|
use rand::Rng as _;
|
||||||
|
use tokio::{
|
||||||
|
io::unix::AsyncFd,
|
||||||
|
time::{sleep, Duration},
|
||||||
|
};
|
||||||
|
|
||||||
|
// This structure's definition is duplicated in the probe.
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
|
||||||
|
struct Registers {
|
||||||
|
dropped: u64,
|
||||||
|
rejected: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Add for Registers {
|
||||||
|
type Output = Self;
|
||||||
|
fn add(self, rhs: Self) -> Self::Output {
|
||||||
|
Self {
|
||||||
|
dropped: self.dropped + rhs.dropped,
|
||||||
|
rejected: self.rejected + rhs.rejected,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> std::iter::Sum<&'a Registers> for Registers {
|
||||||
|
fn sum<I: Iterator<Item = &'a Registers>>(iter: I) -> Self {
|
||||||
|
iter.fold(Default::default(), |a, b| a + *b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Pod for Registers {}
|
||||||
|
|
||||||
|
struct RingBufTest {
|
||||||
|
_bpf: Bpf,
|
||||||
|
ring_buf: RingBuf<MapData>,
|
||||||
|
regs: PerCpuArray<MapData, Registers>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
|
||||||
|
// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
|
||||||
|
// that's not the case because the actual size will be rounded up, and fewer entries will be dropped
|
||||||
|
// than expected.
|
||||||
|
const RING_BUF_MAX_ENTRIES: usize = 512;
|
||||||
|
|
||||||
|
impl RingBufTest {
|
||||||
|
fn new() -> Self {
|
||||||
|
const RING_BUF_BYTE_SIZE: u32 =
|
||||||
|
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
|
||||||
|
|
||||||
|
// Use the loader API to control the size of the ring_buf.
|
||||||
|
let mut bpf = BpfLoader::new()
|
||||||
|
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
|
||||||
|
.load(crate::RING_BUF)
|
||||||
|
.unwrap();
|
||||||
|
let ring_buf = bpf.take_map("RING_BUF").unwrap();
|
||||||
|
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
|
||||||
|
let regs = bpf.take_map("REGISTERS").unwrap();
|
||||||
|
let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
|
||||||
|
let prog: &mut UProbe = bpf
|
||||||
|
.program_mut("ring_buf_test")
|
||||||
|
.unwrap()
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
prog.load().unwrap();
|
||||||
|
prog.attach(
|
||||||
|
Some("ring_buf_trigger_ebpf_program"),
|
||||||
|
0,
|
||||||
|
"/proc/self/exe",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
_bpf: bpf,
|
||||||
|
ring_buf,
|
||||||
|
regs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct WithData(RingBufTest, Vec<u64>);
|
||||||
|
|
||||||
|
impl WithData {
|
||||||
|
fn new(n: usize) -> Self {
|
||||||
|
Self(RingBufTest::new(), {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
std::iter::repeat_with(|| rng.gen()).take(n).collect()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test_case::test_case(0; "write zero items")]
|
||||||
|
#[test_case::test_case(1; "write one item")]
|
||||||
|
#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
|
||||||
|
#[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
|
||||||
|
#[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
|
||||||
|
fn ring_buf(n: usize) {
|
||||||
|
let WithData(
|
||||||
|
RingBufTest {
|
||||||
|
mut ring_buf,
|
||||||
|
regs,
|
||||||
|
_bpf,
|
||||||
|
},
|
||||||
|
data,
|
||||||
|
) = WithData::new(n);
|
||||||
|
|
||||||
|
// Note that after expected_capacity has been submitted, reserve calls in the probe will fail
|
||||||
|
// and the probe will give up.
|
||||||
|
let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
|
||||||
|
|
||||||
|
// Call the function that the uprobe is attached to with the data.
|
||||||
|
let mut expected = Vec::new();
|
||||||
|
let mut expected_rejected = 0u64;
|
||||||
|
let mut expected_dropped = 0u64;
|
||||||
|
for (i, &v) in data.iter().enumerate() {
|
||||||
|
ring_buf_trigger_ebpf_program(v);
|
||||||
|
if i >= expected_capacity {
|
||||||
|
expected_dropped += 1;
|
||||||
|
} else if v % 2 == 0 {
|
||||||
|
expected.push(v);
|
||||||
|
} else {
|
||||||
|
expected_rejected += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut seen = Vec::<u64>::new();
|
||||||
|
while seen.len() < expected.len() {
|
||||||
|
if let Some(read) = ring_buf.next() {
|
||||||
|
let read: [u8; 8] = (*read)
|
||||||
|
.try_into()
|
||||||
|
.with_context(|| format!("data: {:?}", read.len()))
|
||||||
|
.unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(read);
|
||||||
|
assert_eq!(arg % 2, 0, "got {arg} from probe");
|
||||||
|
seen.push(arg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that there is nothing else in the ring_buf.
|
||||||
|
assert_matches!(ring_buf.next(), None);
|
||||||
|
|
||||||
|
// Ensure that the data that was read matches what was passed, and the rejected count was set
|
||||||
|
// properly.
|
||||||
|
assert_eq!(seen, expected);
|
||||||
|
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||||
|
assert_eq!(dropped, expected_dropped);
|
||||||
|
assert_eq!(rejected, expected_rejected);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
#[inline(never)]
|
||||||
|
pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
|
||||||
|
std::hint::black_box(arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test differs from the other async test in that it's possible for the producer
|
||||||
|
// to fill the ring_buf. We just ensure that the number of events we see is sane given
|
||||||
|
// what the producer sees, and that the logic does not hang. This exercises interleaving
|
||||||
|
// discards, successful commits, and drops due to the ring_buf being full.
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn ring_buf_async_with_drops() {
|
||||||
|
let WithData(
|
||||||
|
RingBufTest {
|
||||||
|
ring_buf,
|
||||||
|
regs,
|
||||||
|
_bpf,
|
||||||
|
},
|
||||||
|
data,
|
||||||
|
) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
|
||||||
|
|
||||||
|
let mut async_fd = AsyncFd::new(ring_buf).unwrap();
|
||||||
|
|
||||||
|
// Spawn the writer which internally will spawn many parallel writers.
|
||||||
|
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||||
|
let mut seen = 0;
|
||||||
|
let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
|
||||||
|
while let Some(read) = ring_buf.next() {
|
||||||
|
let read: [u8; 8] = (*read)
|
||||||
|
.try_into()
|
||||||
|
.with_context(|| format!("data: {:?}", read.len()))
|
||||||
|
.unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(read);
|
||||||
|
assert_eq!(arg % 2, 0, "got {arg} from probe");
|
||||||
|
seen += 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
use futures::future::{
|
||||||
|
select,
|
||||||
|
Either::{Left, Right},
|
||||||
|
};
|
||||||
|
let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
|
||||||
|
tokio::spawn(async {
|
||||||
|
for value in v {
|
||||||
|
ring_buf_trigger_ebpf_program(value);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
let readable = {
|
||||||
|
let mut writer = writer;
|
||||||
|
loop {
|
||||||
|
let readable = Box::pin(async_fd.readable_mut());
|
||||||
|
writer = match select(readable, writer).await {
|
||||||
|
Left((guard, writer)) => {
|
||||||
|
let mut guard = guard.unwrap();
|
||||||
|
process_ring_buf(guard.get_inner_mut());
|
||||||
|
guard.clear_ready();
|
||||||
|
writer
|
||||||
|
}
|
||||||
|
Right((writer, readable)) => {
|
||||||
|
writer.unwrap();
|
||||||
|
break readable;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If there's more to read, we should receive a readiness notification in a timely manner.
|
||||||
|
// If we don't then, then assert that there's nothing else to read. Note that it's important
|
||||||
|
// to wait some time before attempting to read, otherwise we may catch up with the producer
|
||||||
|
// before epoll has an opportunity to send a notification; our consumer thread can race
|
||||||
|
// with the kernel epoll check.
|
||||||
|
let sleep_fut = sleep(Duration::from_millis(10));
|
||||||
|
tokio::pin!(sleep_fut);
|
||||||
|
match select(sleep_fut, readable).await {
|
||||||
|
Left(((), _)) => {}
|
||||||
|
Right((guard, _)) => {
|
||||||
|
let mut guard = guard.unwrap();
|
||||||
|
process_ring_buf(guard.get_inner_mut());
|
||||||
|
guard.clear_ready();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that there is nothing else in the ring_buf.
|
||||||
|
assert_matches!(async_fd.into_inner().next(), None);
|
||||||
|
|
||||||
|
let max_dropped: u64 = u64::try_from(
|
||||||
|
data.len()
|
||||||
|
.checked_sub(RING_BUF_MAX_ENTRIES - 1)
|
||||||
|
.unwrap_or_default(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
|
||||||
|
let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
|
||||||
|
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||||
|
let total = u64::try_from(data.len()).unwrap();
|
||||||
|
let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
|
||||||
|
let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
|
||||||
|
let facts = format!(
|
||||||
|
"seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
|
||||||
|
max_rejected={max_rejected}, max_dropped={max_dropped}",
|
||||||
|
);
|
||||||
|
assert_eq!(seen + rejected + dropped, total, "{facts}",);
|
||||||
|
assert!(
|
||||||
|
(0u64..=max_dropped).contains(&dropped),
|
||||||
|
"dropped={dropped} not in 0..={max_dropped}; {facts}",
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
(min_rejected..=max_rejected).contains(&rejected),
|
||||||
|
"rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
(min_seen..=max_seen).contains(&seen),
|
||||||
|
"seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn ring_buf_async_no_drop() {
|
||||||
|
let WithData(
|
||||||
|
RingBufTest {
|
||||||
|
ring_buf,
|
||||||
|
regs,
|
||||||
|
_bpf,
|
||||||
|
},
|
||||||
|
data,
|
||||||
|
) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
|
||||||
|
|
||||||
|
let writer = {
|
||||||
|
let data = data.to_owned();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
for value in data {
|
||||||
|
// Sleep a tad so we feel confident that the consumer will keep up
|
||||||
|
// and no messages will be dropped.
|
||||||
|
let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10));
|
||||||
|
sleep(dur).await;
|
||||||
|
ring_buf_trigger_ebpf_program(value);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||||
|
let mut async_fd = AsyncFd::new(ring_buf).unwrap();
|
||||||
|
// Note that unlike in the synchronous case where all of the entries are written before any of
|
||||||
|
// them are read, in this case we expect all of the entries to make their way to userspace
|
||||||
|
// because entries are being consumed as they are produced.
|
||||||
|
let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
|
||||||
|
let expected_len = expected.len();
|
||||||
|
let reader = async move {
|
||||||
|
let mut seen = Vec::with_capacity(expected_len);
|
||||||
|
while seen.len() < expected_len {
|
||||||
|
let mut guard = async_fd.readable_mut().await.unwrap();
|
||||||
|
let ring_buf = guard.get_inner_mut();
|
||||||
|
while let Some(read) = ring_buf.next() {
|
||||||
|
let read: [u8; 8] = (*read)
|
||||||
|
.try_into()
|
||||||
|
.with_context(|| format!("data: {:?}", read.len()))
|
||||||
|
.unwrap();
|
||||||
|
let arg = u64::from_ne_bytes(read);
|
||||||
|
seen.push(arg);
|
||||||
|
}
|
||||||
|
guard.clear_ready();
|
||||||
|
}
|
||||||
|
(seen, async_fd.into_inner())
|
||||||
|
};
|
||||||
|
let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
|
||||||
|
writer.unwrap();
|
||||||
|
|
||||||
|
// Make sure that there is nothing else in the ring_buf.
|
||||||
|
assert_matches!(ring_buf.next(), None);
|
||||||
|
|
||||||
|
// Ensure that the data that was read matches what was passed.
|
||||||
|
assert_eq!(&seen, &expected);
|
||||||
|
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||||
|
assert_eq!(dropped, 0);
|
||||||
|
assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test reproduces a bug where the ring buffer would not be notified of new entries if the
|
||||||
|
// state was not properly synchronized between the producer and consumer. This would result in the
|
||||||
|
// consumer never being woken up and the test hanging.
|
||||||
|
#[test]
|
||||||
|
fn ring_buf_epoll_wakeup() {
|
||||||
|
let RingBufTest {
|
||||||
|
mut ring_buf,
|
||||||
|
_bpf,
|
||||||
|
regs: _,
|
||||||
|
} = RingBufTest::new();
|
||||||
|
|
||||||
|
let epoll_fd = epoll::create(false).unwrap();
|
||||||
|
epoll::ctl(
|
||||||
|
epoll_fd,
|
||||||
|
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||||
|
ring_buf.as_raw_fd(),
|
||||||
|
// The use of EPOLLET is intentional. Without it, level-triggering would result in
|
||||||
|
// more notifications, and would mask the underlying bug this test reproduced when
|
||||||
|
// the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
|
||||||
|
// AsyncFd always uses this flag (as demonstrated in the subsequent test).
|
||||||
|
epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
|
||||||
|
let mut total_events: u64 = 0;
|
||||||
|
let writer = WriterThread::spawn();
|
||||||
|
while total_events < WriterThread::NUM_MESSAGES {
|
||||||
|
epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
|
||||||
|
while let Some(read) = ring_buf.next() {
|
||||||
|
assert_eq!(read.len(), 8);
|
||||||
|
total_events += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writer.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ring_buf_asyncfd_events() {
|
||||||
|
let RingBufTest {
|
||||||
|
ring_buf,
|
||||||
|
regs: _,
|
||||||
|
_bpf,
|
||||||
|
} = RingBufTest::new();
|
||||||
|
|
||||||
|
let mut async_fd = AsyncFd::new(ring_buf).unwrap();
|
||||||
|
let mut total_events = 0;
|
||||||
|
let writer = WriterThread::spawn();
|
||||||
|
while total_events < WriterThread::NUM_MESSAGES {
|
||||||
|
let mut guard = async_fd.readable_mut().await.unwrap();
|
||||||
|
let rb = guard.get_inner_mut();
|
||||||
|
while let Some(read) = rb.next() {
|
||||||
|
assert_eq!(read.len(), 8);
|
||||||
|
total_events += 1;
|
||||||
|
}
|
||||||
|
guard.clear_ready();
|
||||||
|
}
|
||||||
|
writer.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriterThread triggers the ring_buf write continuously until the join() method is called. It is
|
||||||
|
// used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
|
||||||
|
// the memory synchronization bug that was fixed.
|
||||||
|
struct WriterThread {
|
||||||
|
thread: thread::JoinHandle<()>,
|
||||||
|
done: Arc<AtomicBool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WriterThread {
|
||||||
|
// When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
|
||||||
|
// rather than Ordering::SeqCst, the test will hang. This number was determined to be large
|
||||||
|
// enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
|
||||||
|
const NUM_MESSAGES: u64 = 20_000;
|
||||||
|
|
||||||
|
fn spawn() -> Self {
|
||||||
|
let done = Arc::new(AtomicBool::new(false));
|
||||||
|
Self {
|
||||||
|
thread: {
|
||||||
|
let done = done.clone();
|
||||||
|
thread::spawn(move || {
|
||||||
|
while !done.load(Ordering::Relaxed) {
|
||||||
|
// Write 0 which is even and won't be rejected.
|
||||||
|
ring_buf_trigger_ebpf_program(0);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
},
|
||||||
|
done,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn join(self) {
|
||||||
|
let Self { thread, done } = self;
|
||||||
|
done.store(true, Ordering::Relaxed);
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue