mirror of https://github.com/aya-rs/aya
aya: Implement RingBuf
This implements the userspace binding for RingBuf. Instead of streaming the samples as heap buffers, the process_ring function takes a callback to which we pass the event's byte region, roughly following [libbpf]'s API design. This avoids a copy and allows marking the consumer pointer in a timely manner. [libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c Additionally, integration tests are added to demonstrate the usage of the new APIs and to ensure that they work end-to-end. Co-authored-by: William Findlay <william@williamfindlay.com> Co-authored-by: Tatsuyuki Ishi <ishitatsuyuki@gmail.com>pull/629/head
parent
4af9d1bd3e
commit
e2cf734490
@ -0,0 +1,458 @@
|
||||
//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs.
|
||||
//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF
|
||||
//! programs to userspace.
|
||||
//!
|
||||
//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html
|
||||
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
ffi::{c_int, c_void},
|
||||
fmt::{self, Debug, Formatter},
|
||||
io, mem,
|
||||
ops::Deref,
|
||||
os::fd::{AsFd as _, AsRawFd, BorrowedFd, RawFd},
|
||||
ptr,
|
||||
ptr::NonNull,
|
||||
slice,
|
||||
sync::atomic::{AtomicU32, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use libc::{munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
|
||||
|
||||
use crate::{
|
||||
generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ},
|
||||
maps::{MapData, MapError},
|
||||
sys::{mmap, SyscallError},
|
||||
util::page_size,
|
||||
};
|
||||
|
||||
/// A map that can be used to receive events from eBPF programs.
|
||||
///
|
||||
/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways:
|
||||
/// * It's shared across all CPUs, which allows a strong ordering between events.
|
||||
/// * Data notifications are delivered precisely instead of being sampled for every N events; the
|
||||
/// eBPF program can also control notification delivery if sampling is desired for performance
|
||||
/// reasons. By default, a notification will be sent if the consumer is caught up at the time of
|
||||
/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to
|
||||
/// control this behavior.
|
||||
/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly
|
||||
/// written into the ring without copying from a temporary location.
|
||||
/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`,
|
||||
/// and not the userspace reader. This might require extra code to handle, but allows for more
|
||||
/// flexible schemes to handle dropped samples.
|
||||
///
|
||||
/// To receive events you need to:
|
||||
/// * Construct [`RingBuf`] using [`RingBuf::try_from`].
|
||||
/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`].
|
||||
///
|
||||
/// To receive async notifications of data availability, you may construct an
|
||||
/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness.
|
||||
///
|
||||
/// # Minimum kernel version
|
||||
///
|
||||
/// The minimum kernel version required to use this feature is 5.8.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # struct PollFd<T>(T);
|
||||
/// # fn poll_fd<T>(t: T) -> PollFd<T> { PollFd(t) }
|
||||
/// # impl<T> PollFd<T> {
|
||||
/// # fn readable(&mut self) -> Guard<'_, T> { Guard(self) }
|
||||
/// # }
|
||||
/// # struct Guard<'a, T>(&'a mut PollFd<T>);
|
||||
/// # impl<T> Guard<'_, T> {
|
||||
/// # fn inner_mut(&mut self) -> &mut T {
|
||||
/// # let Guard(PollFd(t)) = self;
|
||||
/// # t
|
||||
/// # }
|
||||
/// # fn clear_ready(&mut self) {}
|
||||
/// # }
|
||||
/// # let bpf = aya::Bpf::load(&[])?;
|
||||
/// use aya::maps::RingBuf;
|
||||
/// use std::convert::TryFrom;
|
||||
///
|
||||
/// let ring_buf = RingBuf::try_from(bpf.map_mut("ARRAY")?)?;
|
||||
/// let poll = poll_fd(ring_buf);
|
||||
/// loop {
|
||||
/// let mut guard = poll.readable()?;
|
||||
/// let ring_buf = guard.inner_mut()
|
||||
/// while let Some(item) = ring_buf.next() {
|
||||
/// println!("Received: {:?}", item);
|
||||
/// }
|
||||
/// guard.clear_ready();
|
||||
/// }
|
||||
/// # Ok::<(), aya::BpfError>(())
|
||||
/// ```
|
||||
///
|
||||
/// # Polling
|
||||
///
|
||||
/// In the example above the implementations of poll(), poll.readable(), guard.inner_mut(), and
|
||||
/// guard.clear_ready() are not given. RingBuf implements the AsRawFd trait, so you can implement
|
||||
/// polling using any crate that can poll file descriptors, like epoll, mio etc. The above example
|
||||
/// API is motivated by that of [`tokio::io::unix::AsyncFd`].
|
||||
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
|
||||
pub struct RingBuf<T> {
|
||||
map: T,
|
||||
consumer: ConsumerPos,
|
||||
producer: ProducerData,
|
||||
}
|
||||
|
||||
impl<T: Borrow<MapData>> RingBuf<T> {
|
||||
pub(crate) fn new(map: T) -> Result<Self, MapError> {
|
||||
let data: &MapData = map.borrow();
|
||||
let page_size = page_size();
|
||||
let map_fd = data.fd().as_fd();
|
||||
let byte_size = data.obj.max_entries();
|
||||
let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?;
|
||||
let consumer = ConsumerPos::new(consumer_metadata);
|
||||
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
|
||||
Ok(Self {
|
||||
map,
|
||||
consumer,
|
||||
producer,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RingBuf<T> {
|
||||
/// Try to take a new entry from the ringbuf.
|
||||
///
|
||||
/// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in
|
||||
/// which case the caller may register for availability notifications through `epoll` or other
|
||||
/// APIs. Only one RingBufItem may be outstanding at a time.
|
||||
//
|
||||
// This is not an implementation of `Iterator` because we need to be able to refer to the
|
||||
// lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item leveraged GATs,
|
||||
// one could imagine an implementation of `Iterator` that would work. GATs are stabilized in
|
||||
// Rust 1.65, but there's not yet a trait that the community seems to have standardized around.
|
||||
#[allow(clippy::should_implement_trait)]
|
||||
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
|
||||
let Self {
|
||||
consumer, producer, ..
|
||||
} = self;
|
||||
producer.next(consumer)
|
||||
}
|
||||
}
|
||||
|
||||
/// Access to the RawFd can be used to construct an AsyncFd for use with epoll.
|
||||
impl<T: Borrow<MapData>> AsRawFd for RingBuf<T> {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
let Self {
|
||||
map,
|
||||
consumer: _,
|
||||
producer: _,
|
||||
} = self;
|
||||
map.borrow().fd().as_fd().as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
/// The current outstanding item read from the ringbuf.
|
||||
pub struct RingBufItem<'a> {
|
||||
data: &'a [u8],
|
||||
consumer: &'a mut ConsumerPos,
|
||||
}
|
||||
|
||||
impl Deref for RingBufItem<'_> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
let Self { data, .. } = self;
|
||||
data
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RingBufItem<'_> {
|
||||
fn drop(&mut self) {
|
||||
let Self { consumer, data } = self;
|
||||
consumer.consume(data.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for RingBufItem<'_> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
let Self {
|
||||
data,
|
||||
consumer:
|
||||
ConsumerPos {
|
||||
pos,
|
||||
metadata: ConsumerMetadata { mmap: _ },
|
||||
},
|
||||
} = self;
|
||||
// In general Relaxed here is sufficient, for debugging, it certainly is.
|
||||
f.debug_struct("RingBufItem")
|
||||
.field("pos", pos)
|
||||
.field("len", &data.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct ConsumerMetadata {
|
||||
mmap: MMap,
|
||||
}
|
||||
|
||||
impl ConsumerMetadata {
|
||||
fn new(fd: BorrowedFd<'_>, offset: usize, page_size: usize) -> Result<Self, MapError> {
|
||||
let mmap = MMap::new(
|
||||
fd,
|
||||
page_size,
|
||||
PROT_READ | PROT_WRITE,
|
||||
MAP_SHARED,
|
||||
offset.try_into().unwrap(),
|
||||
)?;
|
||||
Ok(Self { mmap })
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<AtomicUsize> for ConsumerMetadata {
|
||||
fn as_ref(&self) -> &AtomicUsize {
|
||||
let Self {
|
||||
mmap: MMap { ptr, .. },
|
||||
} = self;
|
||||
unsafe { ptr.cast::<AtomicUsize>().as_ref() }
|
||||
}
|
||||
}
|
||||
|
||||
struct ConsumerPos {
|
||||
pos: usize,
|
||||
metadata: ConsumerMetadata,
|
||||
}
|
||||
|
||||
impl ConsumerPos {
|
||||
fn new(metadata: ConsumerMetadata) -> Self {
|
||||
// Load the initial value of the consumer position. SeqCst is used to be safe given we don't
|
||||
// have any claims about memory synchronization performed by some previous writer.
|
||||
let pos = metadata.as_ref().load(Ordering::SeqCst);
|
||||
Self { pos, metadata }
|
||||
}
|
||||
|
||||
fn consume(&mut self, len: usize) {
|
||||
let Self { pos, metadata } = self;
|
||||
|
||||
// TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized.
|
||||
fn next_multiple_of(n: usize, multiple: usize) -> usize {
|
||||
match n % multiple {
|
||||
0 => n,
|
||||
rem => n + (multiple - rem),
|
||||
}
|
||||
}
|
||||
*pos += next_multiple_of(usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len, 8);
|
||||
|
||||
// Write operation needs to be properly ordered with respect to the producer committing new
|
||||
// data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer
|
||||
// reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst
|
||||
// here we ensure that either a subsequent read by the consumer to consume messages will see
|
||||
// an available message, or the producer in the kernel will see the updated consumer offset
|
||||
// that is caught up.
|
||||
//
|
||||
// [1]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L487-L488
|
||||
// [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494
|
||||
metadata.as_ref().store(*pos, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
struct ProducerData {
|
||||
mmap: MMap,
|
||||
|
||||
// Offset in the mmap where the data starts.
|
||||
data_offset: usize,
|
||||
|
||||
// A cache of the value of the producer position. It is used to avoid re-reading the producer
|
||||
// position when we know there is more data to consume.
|
||||
pos_cache: usize,
|
||||
|
||||
// A bitmask which truncates u32 values to the domain of valid offsets in the ringbuf.
|
||||
mask: u32,
|
||||
}
|
||||
|
||||
impl ProducerData {
|
||||
fn new(
|
||||
fd: BorrowedFd<'_>,
|
||||
offset: usize,
|
||||
page_size: usize,
|
||||
byte_size: u32,
|
||||
) -> Result<Self, MapError> {
|
||||
// The producer pages have one page of metadata and then the data pages, all mapped
|
||||
// read-only. Note that the length of the mapping includes the data pages twice as the
|
||||
// kernel will map them two time consecutively to avoid special handling of entries that
|
||||
// cross over the end of the ring buffer.
|
||||
//
|
||||
// The kernel diagram below shows the layout of the ring buffer. It references "meta pages",
|
||||
// but we only map exactly one producer meta page read-only. The consumer meta page is mapped
|
||||
// read-write elsewhere, and is taken into consideration via the offset parameter.
|
||||
//
|
||||
// From kernel/bpf/ringbuf.c[0]:
|
||||
//
|
||||
// Each data page is mapped twice to allow "virtual"
|
||||
// continuous read of samples wrapping around the end of ring
|
||||
// buffer area:
|
||||
// ------------------------------------------------------
|
||||
// | meta pages | real data pages | same data pages |
|
||||
// ------------------------------------------------------
|
||||
// | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
|
||||
// ------------------------------------------------------
|
||||
// | | TA DA | TA DA |
|
||||
// ------------------------------------------------------
|
||||
// ^^^^^^^
|
||||
// |
|
||||
// Here, no need to worry about special handling of wrapped-around
|
||||
// data due to double-mapped data pages. This works both in kernel and
|
||||
// when mmap()'ed in user-space, simplifying both kernel and
|
||||
// user-space implementations significantly.
|
||||
//
|
||||
// [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124
|
||||
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
|
||||
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
|
||||
|
||||
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
|
||||
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
|
||||
debug_assert!(byte_size.is_power_of_two());
|
||||
let mask = byte_size - 1;
|
||||
Ok(Self {
|
||||
mmap,
|
||||
data_offset: page_size,
|
||||
pos_cache: 0,
|
||||
mask,
|
||||
})
|
||||
}
|
||||
|
||||
fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
|
||||
let Self {
|
||||
ref mmap,
|
||||
data_offset,
|
||||
pos_cache,
|
||||
mask,
|
||||
} = self;
|
||||
let pos = unsafe { mmap.ptr.cast().as_ref() };
|
||||
let mmap_data = mmap.as_ref();
|
||||
let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"offset {} out of bounds, data len {}",
|
||||
data_offset,
|
||||
mmap_data.len()
|
||||
)
|
||||
});
|
||||
while data_available(pos, pos_cache, consumer) {
|
||||
match read_item(data_pages, *mask, consumer) {
|
||||
Item::Busy => return None,
|
||||
Item::Discard { len } => consumer.consume(len),
|
||||
Item::Data(data) => return Some(RingBufItem { data, consumer }),
|
||||
}
|
||||
}
|
||||
return None;
|
||||
|
||||
enum Item<'a> {
|
||||
Busy,
|
||||
Discard { len: usize },
|
||||
Data(&'a [u8]),
|
||||
}
|
||||
|
||||
fn data_available(
|
||||
producer: &AtomicUsize,
|
||||
cache: &mut usize,
|
||||
consumer: &ConsumerPos,
|
||||
) -> bool {
|
||||
let ConsumerPos { pos: consumer, .. } = consumer;
|
||||
if consumer == cache {
|
||||
// This value is written using Release by the kernel [1], and should be read with
|
||||
// Acquire to ensure that the prior writes to the entry header are visible.
|
||||
//
|
||||
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448
|
||||
*cache = producer.load(Ordering::Acquire);
|
||||
}
|
||||
|
||||
// Note that we don't compare the order of the values because the producer position may
|
||||
// overflow u32 and wrap around to 0. Instead we just compare equality and assume that
|
||||
// the consumer position is always logically less than the producer position.
|
||||
//
|
||||
// Note also that the kernel, at the time of writing [1], doesn't seem to handle this
|
||||
// overflow correctly at all, and it's not clear that one can produce events after the
|
||||
// producer position has wrapped around.
|
||||
//
|
||||
// [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
|
||||
consumer != cache
|
||||
}
|
||||
|
||||
fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
|
||||
let ConsumerPos { pos, .. } = pos;
|
||||
let offset = pos & usize::try_from(mask).unwrap();
|
||||
let must_get_data = |offset, len| {
|
||||
data.get(offset..offset + len).unwrap_or_else(|| {
|
||||
panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())
|
||||
})
|
||||
};
|
||||
let header_ptr =
|
||||
must_get_data(offset, mem::size_of::<AtomicU32>()).as_ptr() as *const AtomicU32;
|
||||
// Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This
|
||||
// ensures data written by the producer will be visible.
|
||||
//
|
||||
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488
|
||||
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
|
||||
if header & BPF_RINGBUF_BUSY_BIT != 0 {
|
||||
Item::Busy
|
||||
} else {
|
||||
let len = usize::try_from(header & mask).unwrap();
|
||||
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
|
||||
Item::Discard { len }
|
||||
} else {
|
||||
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
|
||||
let data = must_get_data(data_offset, len);
|
||||
Item::Data(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MMap corresponds to a memory-mapped region.
|
||||
//
|
||||
// The data is unmapped in Drop.
|
||||
struct MMap {
|
||||
ptr: NonNull<c_void>,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
impl MMap {
|
||||
fn new(
|
||||
fd: BorrowedFd<'_>,
|
||||
len: usize,
|
||||
prot: c_int,
|
||||
flags: c_int,
|
||||
offset: off_t,
|
||||
) -> Result<Self, MapError> {
|
||||
match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } {
|
||||
MAP_FAILED => Err(MapError::SyscallError(SyscallError {
|
||||
call: "mmap",
|
||||
io_error: io::Error::last_os_error(),
|
||||
})),
|
||||
ptr => Ok(Self {
|
||||
ptr: ptr::NonNull::new(ptr).ok_or(
|
||||
// This should never happen, but to be paranoid, and so we never need to talk
|
||||
// about a null pointer, we check it anyway.
|
||||
MapError::SyscallError(SyscallError {
|
||||
call: "mmap",
|
||||
io_error: io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"mmap returned null pointer",
|
||||
),
|
||||
}),
|
||||
)?,
|
||||
len,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for MMap {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
let Self { ptr, len } = self;
|
||||
unsafe { slice::from_raw_parts(ptr.as_ptr().cast(), *len) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MMap {
|
||||
fn drop(&mut self) {
|
||||
let Self { ptr, len } = *self;
|
||||
unsafe { munmap(ptr.as_ptr(), len) };
|
||||
}
|
||||
}
|
@ -0,0 +1,163 @@
|
||||
use core::{
|
||||
cell::UnsafeCell,
|
||||
mem,
|
||||
mem::MaybeUninit,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
#[cfg(feature = "const_assert")]
|
||||
use const_assert::{Assert, IsTrue};
|
||||
|
||||
use crate::{
|
||||
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF},
|
||||
helpers::{
|
||||
bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve,
|
||||
bpf_ringbuf_submit,
|
||||
},
|
||||
maps::PinningType,
|
||||
};
|
||||
|
||||
#[repr(transparent)]
|
||||
pub struct RingBuf {
|
||||
def: UnsafeCell<bpf_map_def>,
|
||||
}
|
||||
|
||||
unsafe impl Sync for RingBuf {}
|
||||
|
||||
/// A ring buffer entry, returned from [`RingBuf::reserve`].
|
||||
///
|
||||
/// You must [`submit`] or [`discard`] this entry before it gets dropped.
|
||||
///
|
||||
/// [`submit`]: RingBufEntry::submit
|
||||
/// [`discard`]: RingBufEntry::discard
|
||||
#[must_use = "eBPF verifier requires ring buffer entries to be either submitted or discarded"]
|
||||
pub struct RingBufEntry<T: 'static>(&'static mut MaybeUninit<T>);
|
||||
|
||||
impl<T> Deref for RingBufEntry<T> {
|
||||
type Target = MaybeUninit<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for RingBufEntry<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RingBufEntry<T> {
|
||||
/// Discard this ring buffer entry. The entry will be skipped by the userspace reader.
|
||||
pub fn discard(self, flags: u64) {
|
||||
unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) };
|
||||
}
|
||||
|
||||
/// Commit this ring buffer entry. The entry will be made visible to the userspace reader.
|
||||
pub fn submit(self, flags: u64) {
|
||||
unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) };
|
||||
}
|
||||
}
|
||||
|
||||
impl RingBuf {
|
||||
/// Declare an eBPF ring buffer.
|
||||
///
|
||||
/// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. The
|
||||
/// loading program may coerce the size when loading the map.
|
||||
pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self {
|
||||
Self::new(byte_size, flags, PinningType::None)
|
||||
}
|
||||
|
||||
/// Declare a pinned eBPF ring buffer.
|
||||
///
|
||||
/// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. The
|
||||
/// loading program may coerce the size when loading the map.
|
||||
pub const fn pinned(byte_size: u32, flags: u32) -> Self {
|
||||
Self::new(byte_size, flags, PinningType::ByName)
|
||||
}
|
||||
|
||||
const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self {
|
||||
Self {
|
||||
def: UnsafeCell::new(bpf_map_def {
|
||||
type_: BPF_MAP_TYPE_RINGBUF,
|
||||
key_size: 0,
|
||||
value_size: 0,
|
||||
max_entries: byte_size,
|
||||
map_flags: flags,
|
||||
id: 0,
|
||||
pinning: pinning_type as u32,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Reserve memory in the ring buffer that can fit `T`.
|
||||
///
|
||||
/// Returns `None` if the ring buffer is full.
|
||||
#[cfg(feature = "const_assert")]
|
||||
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>>
|
||||
where
|
||||
Assert<{ 8 % mem::align_of::<T>() == 0 }>: IsTrue,
|
||||
{
|
||||
self.reserve_impl(flags)
|
||||
}
|
||||
|
||||
/// Reserve memory in the ring buffer that can fit `T`.
|
||||
///
|
||||
/// Returns `None` if the ring buffer is full.
|
||||
///
|
||||
/// The kernel will reserve memory at an 8-bytes aligned boundary, so `mem::align_of<T>()` must
|
||||
/// be equal or smaller than 8. If you use this with a `T` that isn't properly aligned, this
|
||||
/// function will be compiled to a panic; depending on your panic_handler, this may make
|
||||
/// the eBPF program fail to load, or it may make it have undefined behavior.
|
||||
#[cfg(not(feature = "const_assert"))]
|
||||
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||
assert_eq!(8 % mem::align_of::<T>(), 0);
|
||||
self.reserve_impl(flags)
|
||||
}
|
||||
|
||||
fn reserve_impl<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
|
||||
let ptr = unsafe {
|
||||
bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::<T>() as _, flags)
|
||||
} as *mut MaybeUninit<T>;
|
||||
unsafe { ptr.as_mut() }.map(|ptr| RingBufEntry(ptr))
|
||||
}
|
||||
|
||||
/// Copy `data` to the ring buffer output.
|
||||
///
|
||||
/// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a
|
||||
/// copy from either a map buffer or the stack.
|
||||
///
|
||||
/// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to
|
||||
/// create in eBPF but still possible, e.g. by slicing an array).
|
||||
///
|
||||
/// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger
|
||||
/// alignment requests. If you use this with a `T` that isn't properly aligned, this function will
|
||||
/// be compiled to a panic and silently make your eBPF program fail to load.
|
||||
/// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418).
|
||||
///
|
||||
/// [`reserve`]: RingBuf::reserve
|
||||
/// [`submit`]: RingBufEntry::submit
|
||||
pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> {
|
||||
assert_eq!(8 % mem::align_of_val(data), 0);
|
||||
let ret = unsafe {
|
||||
bpf_ringbuf_output(
|
||||
self.def.get() as *mut _,
|
||||
data as *const _ as *mut _,
|
||||
mem::size_of_val(data) as _,
|
||||
flags,
|
||||
)
|
||||
};
|
||||
if ret < 0 {
|
||||
Err(ret)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Query various information about the ring buffer.
|
||||
///
|
||||
/// Consult `bpf_ringbuf_query` documentation for a list of allowed flags.
|
||||
pub fn query(&self, flags: u64) -> u64 {
|
||||
unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) }
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
#![no_std]
|
||||
#![no_main]
|
||||
|
||||
use aya_bpf::{
|
||||
macros::{map, uprobe},
|
||||
maps::{PerCpuArray, RingBuf},
|
||||
programs::ProbeContext,
|
||||
};
|
||||
|
||||
#[map]
|
||||
static RING_BUF: RingBuf = RingBuf::with_byte_size(0, 0);
|
||||
|
||||
// This structure's definition is duplicated in userspace.
|
||||
#[repr(C)]
|
||||
struct Registers {
|
||||
dropped: u64,
|
||||
rejected: u64,
|
||||
}
|
||||
|
||||
// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs
|
||||
// without needing synchronization. Atomics exist [1], but aren't exposed.
|
||||
//
|
||||
// [1]: https://lwn.net/Articles/838884/
|
||||
#[map]
|
||||
static REGISTERS: PerCpuArray<Registers> = PerCpuArray::with_max_entries(1, 0);
|
||||
|
||||
#[uprobe]
|
||||
pub fn ring_buf_test(ctx: ProbeContext) {
|
||||
let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) {
|
||||
Some(regs) => unsafe { &mut *regs },
|
||||
None => return,
|
||||
};
|
||||
let mut entry = match RING_BUF.reserve::<u64>(0) {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
*dropped += 1;
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Write the first argument to the function back out to RING_BUF if it is even,
|
||||
// otherwise increment the counter in REJECTED. This exercises discarding data.
|
||||
let arg: u64 = match ctx.arg(0) {
|
||||
Some(arg) => arg,
|
||||
None => return,
|
||||
};
|
||||
if arg % 2 == 0 {
|
||||
entry.write(arg);
|
||||
entry.submit(0);
|
||||
} else {
|
||||
*rejected += 1;
|
||||
entry.discard(0);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
#[panic_handler]
|
||||
fn panic(_info: &core::panic::PanicInfo) -> ! {
|
||||
loop {}
|
||||
}
|
@ -0,0 +1,441 @@
|
||||
use std::{
|
||||
mem,
|
||||
os::fd::AsRawFd as _,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use assert_matches::assert_matches;
|
||||
use aya::{
|
||||
maps::{array::PerCpuArray, ring_buf::RingBuf, MapData},
|
||||
programs::UProbe,
|
||||
Bpf, BpfLoader, Pod,
|
||||
};
|
||||
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
|
||||
use rand::Rng as _;
|
||||
use tokio::{
|
||||
io::unix::AsyncFd,
|
||||
time::{sleep, Duration},
|
||||
};
|
||||
|
||||
// This structure's definition is duplicated in the probe.
|
||||
#[repr(C)]
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
|
||||
struct Registers {
|
||||
dropped: u64,
|
||||
rejected: u64,
|
||||
}
|
||||
|
||||
impl std::ops::Add for Registers {
|
||||
type Output = Self;
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
Self {
|
||||
dropped: self.dropped + rhs.dropped,
|
||||
rejected: self.rejected + rhs.rejected,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> std::iter::Sum<&'a Registers> for Registers {
|
||||
fn sum<I: Iterator<Item = &'a Registers>>(iter: I) -> Self {
|
||||
iter.fold(Default::default(), |a, b| a + *b)
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Pod for Registers {}
|
||||
|
||||
struct RingBufTest {
|
||||
_bpf: Bpf,
|
||||
ring_buf: RingBuf<MapData>,
|
||||
regs: PerCpuArray<MapData, Registers>,
|
||||
}
|
||||
|
||||
// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer
|
||||
// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if
|
||||
// that's not the case because the actual size will be rounded up, and fewer entries will be dropped
|
||||
// than expected.
|
||||
const RING_BUF_MAX_ENTRIES: usize = 512;
|
||||
|
||||
impl RingBufTest {
|
||||
fn new() -> Self {
|
||||
const RING_BUF_BYTE_SIZE: u32 =
|
||||
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
|
||||
|
||||
// Use the loader API to control the size of the ring_buf.
|
||||
let mut bpf = BpfLoader::new()
|
||||
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
|
||||
.load(crate::RING_BUF)
|
||||
.unwrap();
|
||||
let ring_buf = bpf.take_map("RING_BUF").unwrap();
|
||||
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
|
||||
let regs = bpf.take_map("REGISTERS").unwrap();
|
||||
let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap();
|
||||
let prog: &mut UProbe = bpf
|
||||
.program_mut("ring_buf_test")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
prog.load().unwrap();
|
||||
prog.attach(
|
||||
Some("ring_buf_trigger_ebpf_program"),
|
||||
0,
|
||||
"/proc/self/exe",
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Self {
|
||||
_bpf: bpf,
|
||||
ring_buf,
|
||||
regs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct WithData(RingBufTest, Vec<u64>);
|
||||
|
||||
impl WithData {
|
||||
fn new(n: usize) -> Self {
|
||||
Self(RingBufTest::new(), {
|
||||
let mut rng = rand::thread_rng();
|
||||
std::iter::repeat_with(|| rng.gen()).take(n).collect()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[test_case::test_case(0; "write zero items")]
|
||||
#[test_case::test_case(1; "write one item")]
|
||||
#[test_case::test_case(RING_BUF_MAX_ENTRIES / 2; "write half the capacity items")]
|
||||
#[test_case::test_case(RING_BUF_MAX_ENTRIES - 1; "write one less than capacity items")]
|
||||
#[test_case::test_case(RING_BUF_MAX_ENTRIES * 8; "write more items than capacity")]
|
||||
fn ring_buf(n: usize) {
|
||||
let WithData(
|
||||
RingBufTest {
|
||||
mut ring_buf,
|
||||
regs,
|
||||
_bpf,
|
||||
},
|
||||
data,
|
||||
) = WithData::new(n);
|
||||
|
||||
// Note that after expected_capacity has been submitted, reserve calls in the probe will fail
|
||||
// and the probe will give up.
|
||||
let expected_capacity = RING_BUF_MAX_ENTRIES - 1;
|
||||
|
||||
// Call the function that the uprobe is attached to with the data.
|
||||
let mut expected = Vec::new();
|
||||
let mut expected_rejected = 0u64;
|
||||
let mut expected_dropped = 0u64;
|
||||
for (i, &v) in data.iter().enumerate() {
|
||||
ring_buf_trigger_ebpf_program(v);
|
||||
if i >= expected_capacity {
|
||||
expected_dropped += 1;
|
||||
} else if v % 2 == 0 {
|
||||
expected.push(v);
|
||||
} else {
|
||||
expected_rejected += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let mut seen = Vec::<u64>::new();
|
||||
while seen.len() < expected.len() {
|
||||
if let Some(read) = ring_buf.next() {
|
||||
let read: [u8; 8] = (*read)
|
||||
.try_into()
|
||||
.with_context(|| format!("data: {:?}", read.len()))
|
||||
.unwrap();
|
||||
let arg = u64::from_ne_bytes(read);
|
||||
assert_eq!(arg % 2, 0, "got {arg} from probe");
|
||||
seen.push(arg);
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that there is nothing else in the ring_buf.
|
||||
assert_matches!(ring_buf.next(), None);
|
||||
|
||||
// Ensure that the data that was read matches what was passed, and the rejected count was set
|
||||
// properly.
|
||||
assert_eq!(seen, expected);
|
||||
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||
assert_eq!(dropped, expected_dropped);
|
||||
assert_eq!(rejected, expected_rejected);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
#[inline(never)]
|
||||
pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) {
|
||||
std::hint::black_box(arg);
|
||||
}
|
||||
|
||||
// This test differs from the other async test in that it's possible for the producer
|
||||
// to fill the ring_buf. We just ensure that the number of events we see is sane given
|
||||
// what the producer sees, and that the logic does not hang. This exercises interleaving
|
||||
// discards, successful commits, and drops due to the ring_buf being full.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn ring_buf_async_with_drops() {
|
||||
let WithData(
|
||||
RingBufTest {
|
||||
ring_buf,
|
||||
regs,
|
||||
_bpf,
|
||||
},
|
||||
data,
|
||||
) = WithData::new(RING_BUF_MAX_ENTRIES * 8);
|
||||
|
||||
let mut async_fd = AsyncFd::new(ring_buf).unwrap();
|
||||
|
||||
// Spawn the writer which internally will spawn many parallel writers.
|
||||
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||
let mut seen = 0;
|
||||
let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| {
|
||||
while let Some(read) = ring_buf.next() {
|
||||
let read: [u8; 8] = (*read)
|
||||
.try_into()
|
||||
.with_context(|| format!("data: {:?}", read.len()))
|
||||
.unwrap();
|
||||
let arg = u64::from_ne_bytes(read);
|
||||
assert_eq!(arg % 2, 0, "got {arg} from probe");
|
||||
seen += 1;
|
||||
}
|
||||
};
|
||||
use futures::future::{
|
||||
select,
|
||||
Either::{Left, Right},
|
||||
};
|
||||
let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
|
||||
tokio::spawn(async {
|
||||
for value in v {
|
||||
ring_buf_trigger_ebpf_program(value);
|
||||
}
|
||||
})
|
||||
}));
|
||||
let readable = {
|
||||
let mut writer = writer;
|
||||
loop {
|
||||
let readable = Box::pin(async_fd.readable_mut());
|
||||
writer = match select(readable, writer).await {
|
||||
Left((guard, writer)) => {
|
||||
let mut guard = guard.unwrap();
|
||||
process_ring_buf(guard.get_inner_mut());
|
||||
guard.clear_ready();
|
||||
writer
|
||||
}
|
||||
Right((writer, readable)) => {
|
||||
writer.unwrap();
|
||||
break readable;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If there's more to read, we should receive a readiness notification in a timely manner.
|
||||
// If we don't then, then assert that there's nothing else to read. Note that it's important
|
||||
// to wait some time before attempting to read, otherwise we may catch up with the producer
|
||||
// before epoll has an opportunity to send a notification; our consumer thread can race
|
||||
// with the kernel epoll check.
|
||||
let sleep_fut = sleep(Duration::from_millis(10));
|
||||
tokio::pin!(sleep_fut);
|
||||
match select(sleep_fut, readable).await {
|
||||
Left(((), _)) => {}
|
||||
Right((guard, _)) => {
|
||||
let mut guard = guard.unwrap();
|
||||
process_ring_buf(guard.get_inner_mut());
|
||||
guard.clear_ready();
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure that there is nothing else in the ring_buf.
|
||||
assert_matches!(async_fd.into_inner().next(), None);
|
||||
|
||||
let max_dropped: u64 = u64::try_from(
|
||||
data.len()
|
||||
.checked_sub(RING_BUF_MAX_ENTRIES - 1)
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
.unwrap();
|
||||
let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap();
|
||||
let max_rejected = u64::try_from(data.len()).unwrap() - max_seen;
|
||||
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||
let total = u64::try_from(data.len()).unwrap();
|
||||
let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default();
|
||||
let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default();
|
||||
let facts = format!(
|
||||
"seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \
|
||||
max_rejected={max_rejected}, max_dropped={max_dropped}",
|
||||
);
|
||||
assert_eq!(seen + rejected + dropped, total, "{facts}",);
|
||||
assert!(
|
||||
(0u64..=max_dropped).contains(&dropped),
|
||||
"dropped={dropped} not in 0..={max_dropped}; {facts}",
|
||||
);
|
||||
assert!(
|
||||
(min_rejected..=max_rejected).contains(&rejected),
|
||||
"rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}",
|
||||
);
|
||||
assert!(
|
||||
(min_seen..=max_seen).contains(&seen),
|
||||
"seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn ring_buf_async_no_drop() {
|
||||
let WithData(
|
||||
RingBufTest {
|
||||
ring_buf,
|
||||
regs,
|
||||
_bpf,
|
||||
},
|
||||
data,
|
||||
) = WithData::new(RING_BUF_MAX_ENTRIES * 3);
|
||||
|
||||
let writer = {
|
||||
let data = data.to_owned();
|
||||
tokio::spawn(async move {
|
||||
for value in data {
|
||||
// Sleep a tad so we feel confident that the consumer will keep up
|
||||
// and no messages will be dropped.
|
||||
let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10));
|
||||
sleep(dur).await;
|
||||
ring_buf_trigger_ebpf_program(value);
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
|
||||
let mut async_fd = AsyncFd::new(ring_buf).unwrap();
|
||||
// Note that unlike in the synchronous case where all of the entries are written before any of
|
||||
// them are read, in this case we expect all of the entries to make their way to userspace
|
||||
// because entries are being consumed as they are produced.
|
||||
let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
|
||||
let expected_len = expected.len();
|
||||
let reader = async move {
|
||||
let mut seen = Vec::with_capacity(expected_len);
|
||||
while seen.len() < expected_len {
|
||||
let mut guard = async_fd.readable_mut().await.unwrap();
|
||||
let ring_buf = guard.get_inner_mut();
|
||||
while let Some(read) = ring_buf.next() {
|
||||
let read: [u8; 8] = (*read)
|
||||
.try_into()
|
||||
.with_context(|| format!("data: {:?}", read.len()))
|
||||
.unwrap();
|
||||
let arg = u64::from_ne_bytes(read);
|
||||
seen.push(arg);
|
||||
}
|
||||
guard.clear_ready();
|
||||
}
|
||||
(seen, async_fd.into_inner())
|
||||
};
|
||||
let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await;
|
||||
writer.unwrap();
|
||||
|
||||
// Make sure that there is nothing else in the ring_buf.
|
||||
assert_matches!(ring_buf.next(), None);
|
||||
|
||||
// Ensure that the data that was read matches what was passed.
|
||||
assert_eq!(&seen, &expected);
|
||||
let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum();
|
||||
assert_eq!(dropped, 0);
|
||||
assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap());
|
||||
}
|
||||
|
||||
// This test reproduces a bug where the ring buffer would not be notified of new entries if the
|
||||
// state was not properly synchronized between the producer and consumer. This would result in the
|
||||
// consumer never being woken up and the test hanging.
|
||||
#[test]
|
||||
fn ring_buf_epoll_wakeup() {
|
||||
let RingBufTest {
|
||||
mut ring_buf,
|
||||
_bpf,
|
||||
regs: _,
|
||||
} = RingBufTest::new();
|
||||
|
||||
let epoll_fd = epoll::create(false).unwrap();
|
||||
epoll::ctl(
|
||||
epoll_fd,
|
||||
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||
ring_buf.as_raw_fd(),
|
||||
// The use of EPOLLET is intentional. Without it, level-triggering would result in
|
||||
// more notifications, and would mask the underlying bug this test reproduced when
|
||||
// the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's
|
||||
// AsyncFd always uses this flag (as demonstrated in the subsequent test).
|
||||
epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0),
|
||||
)
|
||||
.unwrap();
|
||||
let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1];
|
||||
let mut total_events: u64 = 0;
|
||||
let writer = WriterThread::spawn();
|
||||
while total_events < WriterThread::NUM_MESSAGES {
|
||||
epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap();
|
||||
while let Some(read) = ring_buf.next() {
|
||||
assert_eq!(read.len(), 8);
|
||||
total_events += 1;
|
||||
}
|
||||
}
|
||||
writer.join();
|
||||
}
|
||||
|
||||
// This test is like the above test but uses tokio and AsyncFd instead of raw epoll.
|
||||
#[tokio::test]
|
||||
async fn ring_buf_asyncfd_events() {
|
||||
let RingBufTest {
|
||||
ring_buf,
|
||||
regs: _,
|
||||
_bpf,
|
||||
} = RingBufTest::new();
|
||||
|
||||
let mut async_fd = AsyncFd::new(ring_buf).unwrap();
|
||||
let mut total_events = 0;
|
||||
let writer = WriterThread::spawn();
|
||||
while total_events < WriterThread::NUM_MESSAGES {
|
||||
let mut guard = async_fd.readable_mut().await.unwrap();
|
||||
let rb = guard.get_inner_mut();
|
||||
while let Some(read) = rb.next() {
|
||||
assert_eq!(read.len(), 8);
|
||||
total_events += 1;
|
||||
}
|
||||
guard.clear_ready();
|
||||
}
|
||||
writer.join();
|
||||
}
|
||||
|
||||
// WriterThread triggers the ring_buf write continuously until the join() method is called. It is
|
||||
// used by both the epoll and async fd test that need frequent writes to the ring buffer to trigger
|
||||
// the memory synchronization bug that was fixed.
|
||||
struct WriterThread {
|
||||
thread: thread::JoinHandle<()>,
|
||||
done: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl WriterThread {
|
||||
// When the ring buffer implementation uses Ordering::Relaxed to write the consumer position
|
||||
// rather than Ordering::SeqCst, the test will hang. This number was determined to be large
|
||||
// enough to tickle that bug on a hardware accelerated VM with 2 vCPUs.
|
||||
const NUM_MESSAGES: u64 = 20_000;
|
||||
|
||||
fn spawn() -> Self {
|
||||
let done = Arc::new(AtomicBool::new(false));
|
||||
Self {
|
||||
thread: {
|
||||
let done = done.clone();
|
||||
thread::spawn(move || {
|
||||
while !done.load(Ordering::Relaxed) {
|
||||
// Write 0 which is even and won't be rejected.
|
||||
ring_buf_trigger_ebpf_program(0);
|
||||
}
|
||||
})
|
||||
},
|
||||
done,
|
||||
}
|
||||
}
|
||||
|
||||
fn join(self) {
|
||||
let Self { thread, done } = self;
|
||||
done.store(true, Ordering::Relaxed);
|
||||
thread.join().unwrap();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue