diff --git a/aya/src/bpf.rs b/aya/src/bpf.rs index 430da43b..db8b8897 100644 --- a/aya/src/bpf.rs +++ b/aya/src/bpf.rs @@ -39,7 +39,7 @@ use crate::{ is_btf_supported, is_btf_type_tag_supported, is_perf_link_supported, is_probe_read_kernel_supported, is_prog_name_supported, retry_with_verifier_logs, }, - util::{bytes_of, bytes_of_slice, possible_cpus, POSSIBLE_CPUS}, + util::{bytes_of, bytes_of_slice, page_size, possible_cpus, POSSIBLE_CPUS}, }; pub(crate) const BPF_OBJ_NAME_LEN: usize = 16; @@ -451,23 +451,23 @@ impl<'a> BpfLoader<'a> { { continue; } - - match max_entries.get(name.as_str()) { - Some(size) => obj.set_max_entries(*size), - None => { - if obj.map_type() == BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 - && obj.max_entries() == 0 - { - obj.set_max_entries( - possible_cpus() - .map_err(|error| BpfError::FileError { - path: PathBuf::from(POSSIBLE_CPUS), - error, - })? - .len() as u32, - ); - } - } + let num_cpus = || -> Result<u32, BpfError> { + Ok(possible_cpus() + .map_err(|error| BpfError::FileError { + path: PathBuf::from(POSSIBLE_CPUS), + error, + })? + .len() as u32) + }; + let map_type: bpf_map_type = obj.map_type().try_into().map_err(MapError::from)?; + if let Some(max_entries) = max_entries_override( + map_type, + max_entries.get(name.as_str()).copied(), + || obj.max_entries(), + num_cpus, + || page_size() as u32, + )? { + obj.set_max_entries(max_entries) } let mut map = MapData { obj, @@ -715,6 +715,7 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { BPF_MAP_TYPE_PERCPU_HASH => Map::PerCpuHashMap(map), BPF_MAP_TYPE_LRU_PERCPU_HASH => Map::PerCpuLruHashMap(map), BPF_MAP_TYPE_PERF_EVENT_ARRAY => Map::PerfEventArray(map), + BPF_MAP_TYPE_RINGBUF => Map::RingBuf(map), BPF_MAP_TYPE_SOCKHASH => Map::SockHash(map), BPF_MAP_TYPE_SOCKMAP => Map::SockMap(map), BPF_MAP_TYPE_BLOOM_FILTER => Map::BloomFilter(map), @@ -731,6 +732,106 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { Ok((name, map)) } +/// Computes the value which should be used to override the max_entries value of the map +/// based on the user-provided override and the rules for that map type. +fn max_entries_override( + map_type: bpf_map_type, + user_override: Option<u32>, + current_value: impl Fn() -> u32, + num_cpus: impl Fn() -> Result<u32, BpfError>, + page_size: impl Fn() -> u32, +) -> Result<Option<u32>, BpfError> { + let max_entries = || user_override.unwrap_or_else(¤t_value); + Ok(match map_type { + BPF_MAP_TYPE_PERF_EVENT_ARRAY if max_entries() == 0 => Some(num_cpus()?), + BPF_MAP_TYPE_RINGBUF => Some(adjust_to_page_size(max_entries(), page_size())) + .filter(|adjusted| *adjusted != max_entries()) + .or(user_override), + _ => user_override, + }) +} + +// Adjusts the byte size of a RingBuf map to match a power-of-two multiple of the page size. +// +// This mirrors the logic used by libbpf. +// See https://github.com/libbpf/libbpf/blob/ec6f716eda43/src/libbpf.c#L2461-L2463 +fn adjust_to_page_size(byte_size: u32, page_size: u32) -> u32 { + // If the byte_size is zero, return zero and let the verifier reject the map + // when it is loaded. This is the behavior of libbpf. + if byte_size == 0 { + return 0; + } + // TODO: Replace with primitive method when int_roundings (https://github.com/rust-lang/rust/issues/88581) + // is stabilized. + fn div_ceil(n: u32, rhs: u32) -> u32 { + let d = n / rhs; + let r = n % rhs; + if r > 0 && rhs > 0 { + d + 1 + } else { + d + } + } + let pages_needed = div_ceil(byte_size, page_size); + page_size * pages_needed.next_power_of_two() +} + +#[cfg(test)] +mod tests { + + use crate::generated::bpf_map_type::*; + + const PAGE_SIZE: u32 = 4096; + const NUM_CPUS: u32 = 4; + + #[test] + fn test_adjust_to_page_size() { + use super::adjust_to_page_size; + [ + (0, 0), + (4096, 1), + (4096, 4095), + (4096, 4096), + (8192, 4097), + (8192, 8192), + (16384, 8193), + ] + .into_iter() + .for_each(|(exp, input)| assert_eq!(exp, adjust_to_page_size(input, PAGE_SIZE))) + } + + #[test] + fn test_max_entries_override() { + use super::max_entries_override; + [ + (BPF_MAP_TYPE_RINGBUF, Some(1), 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, PAGE_SIZE, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 1, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(42), 1, Some(42)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(0), 1, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 0, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 42, None), + (BPF_MAP_TYPE_ARRAY, None, 1, None), + (BPF_MAP_TYPE_ARRAY, Some(2), 1, Some(2)), + ] + .into_iter() + .for_each(|(map_type, user_override, current_value, exp)| { + assert_eq!( + exp, + max_entries_override( + map_type, + user_override, + || { current_value }, + || Ok(NUM_CPUS), + || PAGE_SIZE + ) + .unwrap() + ) + }) + } +} + impl<'a> Default for BpfLoader<'a> { fn default() -> Self { BpfLoader::new() diff --git a/aya/src/maps/mod.rs b/aya/src/maps/mod.rs index 07d56ab0..2fdc9289 100644 --- a/aya/src/maps/mod.rs +++ b/aya/src/maps/mod.rs @@ -70,6 +70,7 @@ pub mod hash_map; pub mod lpm_trie; pub mod perf; pub mod queue; +pub mod ring_buf; pub mod sock; pub mod stack; pub mod stack_trace; @@ -83,6 +84,7 @@ pub use lpm_trie::LpmTrie; pub use perf::AsyncPerfEventArray; pub use perf::PerfEventArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock::{SockHash, SockMap}; pub use stack::Stack; pub use stack_trace::StackTraceMap; @@ -194,7 +196,7 @@ pub enum MapError { } // Note that this is not just derived using #[from] because InvalidMapTypeError cannot implement -// Error due the the fact that aya-obj is nostd and error_in_core is not stabilized +// Error due the the fact that aya-obj is no_std and error_in_core is not stabilized // (https://github.com/rust-lang/rust/issues/103765). impl From<InvalidMapTypeError> for MapError { fn from(e: InvalidMapTypeError) -> Self { @@ -266,6 +268,8 @@ pub enum Map { PerCpuLruHashMap(MapData), /// A [`PerfEventArray`] map PerfEventArray(MapData), + /// A [`RingBuf`] map + RingBuf(MapData), /// A [`SockMap`] map SockMap(MapData), /// A [`SockHash`] map @@ -296,6 +300,7 @@ impl Map { Map::PerCpuHashMap(map) => map.obj.map_type(), Map::PerCpuLruHashMap(map) => map.obj.map_type(), Map::PerfEventArray(map) => map.obj.map_type(), + Map::RingBuf(map) => map.obj.map_type(), Map::SockHash(map) => map.obj.map_type(), Map::SockMap(map) => map.obj.map_type(), Map::BloomFilter(map) => map.obj.map_type(), @@ -358,6 +363,7 @@ impl_try_from_map!( SockMap from Map::SockMap, PerfEventArray from Map::PerfEventArray, StackTraceMap from Map::StackTraceMap, + RingBuf from Map::RingBuf, ); #[cfg(any(feature = "async_tokio", feature = "async_std"))] diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs new file mode 100644 index 00000000..16c485f3 --- /dev/null +++ b/aya/src/maps/ring_buf.rs @@ -0,0 +1,374 @@ +//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs. +//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF +//! programs to userspace. +//! +//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html + +use crate::{ + generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ}, + maps::{MapData, MapError}, + sys::mmap, +}; +use libc::{c_int, c_void, munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE}; +use std::{ + fmt::Debug, + io, + ops::Deref, + os::fd::{AsRawFd, RawFd}, + ptr, + ptr::NonNull, + sync::atomic::{AtomicU32, AtomicUsize, Ordering}, +}; + +/// A map that can be used to receive events from eBPF programs. +/// +/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways: +/// * It's shared across all CPUs, which allows a strong ordering between events. +/// * Data notifications are delivered precisely instead of being sampled for every N events; the +/// eBPF program can also control notification delivery if sampling is desired for performance +/// reasons. By default, a notification will be sent if the consumer is caught up at the time of +/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to +/// control this behavior. +/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly +/// written into the ring without copying from a temporary location. +/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`, +/// and not the userspace reader. This might require extra code to handle, but allows for more +/// flexible schemes to handle dropped samples. +/// +/// To receive events you need to: +/// * Construct [`RingBuf`] using [`RingBuf::try_from`]. +/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`]. +/// +/// To receive async notifications of data availability, you may construct an +/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 5.8. +#[doc(alias = "BPF_MAP_TYPE_RINGBUF")] +pub struct RingBuf<T> { + _map: T, + map_fd: i32, + consumer: ConsumerPos, + producer: ProducerData, +} + +impl<T: core::borrow::Borrow<MapData>> RingBuf<T> { + pub(crate) fn new(map: T) -> Result<Self, MapError> { + let data: &MapData = map.borrow(); + let page_size = crate::util::page_size(); + let map_fd = data.fd_or_err().map_err(MapError::from)?; + let byte_size = data.obj.max_entries(); + let consumer = ConsumerPos::new(map_fd, 0, page_size)?; + let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?; + Ok(RingBuf { + _map: map, + map_fd, + consumer, + producer, + }) + } +} + +impl<T> RingBuf<T> { + /// Try to take a new entry from the ringbuf. + /// + /// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in + /// which case the caller may register for availability notifications through `epoll` or other + /// APIs. Only one RingBufItem may be outstanding at a time. + // + // This is not an implementation of `Iterator` because we need to be able to refer to the + // lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item leveraged GATs, + // one could imagine an implementation of `Iterator` that would work. GATs are stabilized in + // Rust 1.65, but there's not yet a trait that the community seems to have standardized around. + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> Option<RingBufItem<'_>> { + let Self { + consumer, producer, .. + } = self; + producer.next(consumer) + } +} + +/// Access to the RawFd can be used to construct an AsyncFd for use with epoll. +impl<T> AsRawFd for RingBuf<T> { + fn as_raw_fd(&self) -> RawFd { + self.map_fd + } +} + +/// The current outstanding item read from the ringbuf. +pub struct RingBufItem<'a> { + data: &'a [u8], + consumer: &'a mut ConsumerPos, +} + +impl Deref for RingBufItem<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let Self { data, .. } = self; + data + } +} + +impl Drop for RingBufItem<'_> { + fn drop(&mut self) { + let Self { consumer, data } = self; + consumer.consume(data.len()); + } +} + +impl Debug for RingBufItem<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { data, consumer } = self; + // In general Relaxed here is sufficient, for debugging, it certainly is. + let offset = consumer.as_ref().load(Ordering::Relaxed); + f.debug_struct("RingBufItem") + .field("offset", &offset) + .field("len", &data.len()) + .finish() + } +} + +struct ConsumerPos(MMap); + +impl ConsumerPos { + fn new(fd: RawFd, offset: usize, page_size: usize) -> Result<Self, MapError> { + Ok(Self(MMap::new( + fd, + page_size, + PROT_READ | PROT_WRITE, + MAP_SHARED, + offset.try_into().unwrap(), + )?)) + } + + // Write operation needs to be properly ordered with respect to the producer committing new + // data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer + // reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst + // here we ensure that either a subsequent read by the consumer to consume messages will see + // an available message, or the producer in the kernel will see the updated consumer offset + // that is caught up. + // + // [1]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L487-L488 + // [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494 + const WRITE_ORDERING: Ordering = Ordering::SeqCst; + + fn consume(&mut self, len: usize) -> usize { + self.as_ref() + .fetch_add(Self::compute_increment(len), Self::WRITE_ORDERING) + } + + fn set_offset(&mut self, prev_offset: usize, len: usize) -> usize { + let offset = prev_offset + Self::compute_increment(len); + self.as_ref().store(offset, Self::WRITE_ORDERING); + offset + } + + fn compute_increment(len: usize) -> usize { + // TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized. + fn next_multiple_of(n: usize, multiple: usize) -> usize { + match n % multiple { + 0 => n, + rem => n + (multiple - rem), + } + } + next_multiple_of(len + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(), 8) + } +} + +impl AsRef<AtomicUsize> for ConsumerPos { + fn as_ref(&self) -> &AtomicUsize { + let Self(MMap { ptr, .. }) = self; + unsafe { ptr.cast::<AtomicUsize>().as_ref() } + } +} + +struct ProducerData { + mmap: MMap, + + // Offset in the mmap where the data starts. + data_offset: usize, + + // A cache of the value of the producer position. It is used to avoid re-reading the producer + // position when we know there is more data to consume. + pos_cache: usize, + + // A bitmask which truncates u32 values to the domain of valid offsets in the ringbuf. + mask: u32, +} + +impl ProducerData { + fn new(fd: RawFd, offset: usize, page_size: usize, byte_size: u32) -> Result<Self, MapError> { + // The producer pages have one page of metadata and then the data pages, all mapped + // read-only. Note that the length of the mapping includes the data pages twice as the + // kernel will map them two time consecutively to avoid special handling of entries that + // cross over the end of the ring buffer. + // + // The kernel diagram below shows the layout of the ring buffer. It references "meta pages", + // but we only map exactly one producer meta page read-only. The consumer meta page is mapped + // read-write elsewhere, and is taken into consideration via the offset parameter. + // + // From kernel/bpf/ringbuf.c[0]: + // + // Each data page is mapped twice to allow "virtual" + // continuous read of samples wrapping around the end of ring + // buffer area: + // ------------------------------------------------------ + // | meta pages | real data pages | same data pages | + // ------------------------------------------------------ + // | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | + // ------------------------------------------------------ + // | | TA DA | TA DA | + // ------------------------------------------------------ + // ^^^^^^^ + // | + // Here, no need to worry about special handling of wrapped-around + // data due to double-mapped data pages. This works both in kernel and + // when mmap()'ed in user-space, simplifying both kernel and + // user-space implementations significantly. + // + // [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124 + let len = page_size + 2 * usize::try_from(byte_size).unwrap(); + let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + + // byte_size is required to be a power of two multiple of page_size (which implicitly is a + // power of 2), so subtracting one will create a bitmask for values less than byte_size. + debug_assert!(byte_size.is_power_of_two()); + let mask = byte_size - 1; + Ok(Self { + mmap, + data_offset: page_size, + pos_cache: 0, + mask, + }) + } + + fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> { + let Self { + ref mmap, + data_offset, + pos_cache, + mask, + } = self; + let pos = unsafe { mmap.ptr.cast().as_ref() }; + let data: &[u8] = &mmap.as_ref()[*data_offset..]; + + // Load our consumer position. Only this program writes this position, and this object is not + // Send, so relaxed is sufficient. + let mut consumer_pos = consumer.as_ref().load(Ordering::Relaxed); + while data_available(pos, pos_cache, consumer_pos) { + match read_item(data, *mask, consumer_pos) { + Item::Busy => return None, + Item::Discard { len } => consumer_pos = consumer.set_offset(consumer_pos, len), + Item::Data(data) => return Some(RingBufItem { data, consumer }), + } + } + return None; + + enum Item<'a> { + Busy, + Discard { len: usize }, + Data(&'a [u8]), + } + + fn data_available(producer: &AtomicUsize, cache: &mut usize, consumer: usize) -> bool { + if consumer == *cache { + // This value is written using Release by the kernel [1], and should be read with + // Acquire to ensure that the prior writes to the entry header are visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 + *cache = producer.load(Ordering::Acquire); + } + + // Note that we don't compare the order of the values because the producer position may + // overflow u32 and wrap around to 0. Instead we just compare equality and assume that + // the consumer position is always logically less than the producer position. + // + // Note also that the kernel, at the time of writing [1], doesn't seem to handle this + // overflow correctly at all, and it's not clear that one can produce events after the + // producer position has wrapped around. + // + // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 + consumer != *cache + } + + fn read_item(data: &[u8], mask: u32, offset: usize) -> Item { + let offset = offset & usize::try_from(mask).unwrap(); + let header_ptr = data + .get(offset..offset + core::mem::size_of::<AtomicU32>()) + .expect("offset out of bounds") + .as_ptr() as *const AtomicU32; + // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This + // ensures data written by the producer will be visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488 + let header = unsafe { &*header_ptr }.load(Ordering::Acquire); + if header & BPF_RINGBUF_BUSY_BIT != 0 { + Item::Busy + } else { + let len = usize::try_from(header & mask).unwrap(); + if header & BPF_RINGBUF_DISCARD_BIT != 0 { + Item::Discard { len } + } else { + let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(); + Item::Data(&data[data_offset..data_offset + len]) + } + } + } + } +} + +// MMap corresponds to a memory-mapped region. +// +// The data is unmapped in Drop. +struct MMap { + ptr: NonNull<c_void>, + len: usize, +} + +impl MMap { + fn new( + fd: RawFd, + len: usize, + prot: c_int, + flags: c_int, + offset: off_t, + ) -> Result<Self, MapError> { + match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } { + MAP_FAILED => Err(MapError::SyscallError { + call: "mmap", + io_error: io::Error::last_os_error(), + }), + ptr => Ok(Self { + ptr: std::ptr::NonNull::new(ptr).ok_or( + // This should never happen, but to be paranoid, and so we never need to talk + // about a null pointer, we check it anyway. + MapError::SyscallError { + call: "mmap", + io_error: io::Error::new( + io::ErrorKind::Other, + "mmap returned null pointer", + ), + }, + )?, + len, + }), + } + } +} + +impl AsRef<[u8]> for MMap { + fn as_ref(&self) -> &[u8] { + let Self { ptr, len } = self; + unsafe { std::slice::from_raw_parts(ptr.as_ptr().cast(), *len) } + } +} + +impl Drop for MMap { + fn drop(&mut self) { + let Self { ptr, len } = *self; + unsafe { munmap(ptr.as_ptr(), len) }; + } +} diff --git a/bpf/aya-bpf/Cargo.toml b/bpf/aya-bpf/Cargo.toml index 3e6b390d..3395b2fb 100644 --- a/bpf/aya-bpf/Cargo.toml +++ b/bpf/aya-bpf/Cargo.toml @@ -11,3 +11,7 @@ aya-bpf-bindings = { path = "../aya-bpf-bindings" } [build-dependencies] rustversion = "1.0" + +[features] +default = [] +const_assert = [] diff --git a/bpf/aya-bpf/src/lib.rs b/bpf/aya-bpf/src/lib.rs index 517e23fa..82c17edd 100644 --- a/bpf/aya-bpf/src/lib.rs +++ b/bpf/aya-bpf/src/lib.rs @@ -8,6 +8,11 @@ html_logo_url = "https://aya-rs.dev/assets/images/crabby.svg", html_favicon_url = "https://aya-rs.dev/assets/images/crabby.svg" )] +#![cfg_attr( + feature = "const_assert", + allow(incomplete_features), + feature(generic_const_exprs) +)] #![cfg_attr(unstable, feature(never_type))] #![cfg_attr(target_arch = "bpf", feature(asm_experimental_arch))] #![allow(clippy::missing_safety_doc)] diff --git a/bpf/aya-bpf/src/maps/mod.rs b/bpf/aya-bpf/src/maps/mod.rs index 8fa375dd..f117a777 100644 --- a/bpf/aya-bpf/src/maps/mod.rs +++ b/bpf/aya-bpf/src/maps/mod.rs @@ -13,6 +13,7 @@ pub mod per_cpu_array; pub mod perf; pub mod program_array; pub mod queue; +pub mod ring_buf; pub mod sock_hash; pub mod sock_map; pub mod stack; @@ -26,6 +27,7 @@ pub use per_cpu_array::PerCpuArray; pub use perf::{PerfEventArray, PerfEventByteArray}; pub use program_array::ProgramArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock_hash::SockHash; pub use sock_map::SockMap; pub use stack::Stack; diff --git a/bpf/aya-bpf/src/maps/ring_buf.rs b/bpf/aya-bpf/src/maps/ring_buf.rs new file mode 100644 index 00000000..09f7dd36 --- /dev/null +++ b/bpf/aya-bpf/src/maps/ring_buf.rs @@ -0,0 +1,167 @@ +use core::{ + cell::UnsafeCell, + mem, + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; + +use crate::{ + bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF}, + helpers::{ + bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve, + bpf_ringbuf_submit, + }, + maps::PinningType, +}; + +#[repr(transparent)] +pub struct RingBuf { + def: UnsafeCell<bpf_map_def>, +} + +unsafe impl Sync for RingBuf {} + +/// A ring buffer entry, returned from [`RingBuf::reserve`]. +/// +/// You must [`submit`] or [`discard`] this entry before it gets dropped. +/// +/// [`submit`]: RingBufEntry::submit +/// [`discard`]: RingBufEntry::discard +#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"] +pub struct RingBufEntry<T: 'static>(&'static mut MaybeUninit<T>); + +impl<T> Deref for RingBufEntry<T> { + type Target = MaybeUninit<T>; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl<T> DerefMut for RingBufEntry<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl<T> RingBufEntry<T> { + /// Discard this ring buffer entry. The entry will be skipped by the userspace reader. + pub fn discard(self, flags: u64) { + unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) }; + } + + /// Commit this ring buffer entry. The entry will be made visible to the userspace reader. + pub fn submit(self, flags: u64) { + unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) }; + } +} + +impl RingBuf { + /// Declare a BPF ring buffer. + /// + /// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. + /// The loading program may coerce the size when loading the map. + pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::None) + } + + /// Declare a pinned BPF ring buffer. + /// + /// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. + /// The loading program may coerce the size when loading the map. + pub const fn pinned(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::ByName) + } + + const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self { + Self { + def: UnsafeCell::new(bpf_map_def { + type_: BPF_MAP_TYPE_RINGBUF, + key_size: 0, + value_size: 0, + max_entries: byte_size, + map_flags: flags, + id: 0, + pinning: pinning_type as u32, + }), + } + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full. + #[cfg(feature = "const_assert")] + pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> + where + Assert<{ 8 % core::mem::align_of::<T>() == 0 }>: IsTrue, + { + self.reserve_impl(flags) + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full. + /// + /// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger + /// alignment requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + /// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418). + #[cfg(not(feature = "const_assert"))] + pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> { + assert_eq!(8 % core::mem::align_of::<T>(), 0); + self.reserve_impl(flags) + } + + fn reserve_impl<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> { + let ptr = unsafe { + bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::<T>() as _, flags) + } as *mut MaybeUninit<T>; + unsafe { ptr.as_mut() }.map(|ptr| RingBufEntry(ptr)) + } + + /// Copy `data` to the ring buffer output. + /// + /// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a + /// copy from either a map buffer or the stack. + /// + /// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to + /// create in eBPF but still possible, e.g. by slicing an array). + /// + /// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger + /// alignment requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + /// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418). + /// + /// [`reserve`]: RingBuf::reserve + /// [`submit`]: RingBufEntry::submit + pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> { + assert_eq!(8 % core::mem::align_of_val(data), 0); + let ret = unsafe { + bpf_ringbuf_output( + self.def.get() as *mut _, + data as *const _ as *mut _, + mem::size_of_val(data) as _, + flags, + ) + }; + if ret < 0 { + Err(ret) + } else { + Ok(()) + } + } + + /// Query various information about the ring buffer. + /// + /// Consult `bpf_ringbuf_query` documentation for a list of allowed flags. + pub fn query(&self, flags: u64) -> u64 { + unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) } + } +} + +#[cfg(feature = "const_assert")] +pub struct Assert<const COND: bool> {} +#[cfg(feature = "const_assert")] +pub trait IsTrue {} +#[cfg(feature = "const_assert")] +impl IsTrue for Assert<true> {} diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index ff35ddb7..1ef5645d 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -35,3 +35,7 @@ path = "src/relocations.rs" [[bin]] name = "bpf_probe_read" path = "src/bpf_probe_read.rs" + +[[bin]] +name = "ring_buf" +path = "src/ring_buf.rs" diff --git a/test/integration-ebpf/src/ring_buf.rs b/test/integration-ebpf/src/ring_buf.rs new file mode 100644 index 00000000..5f96fc10 --- /dev/null +++ b/test/integration-ebpf/src/ring_buf.rs @@ -0,0 +1,55 @@ +#![no_std] +#![no_main] + +use aya_bpf::{ + macros::{map, uprobe}, + maps::{Array, RingBuf}, + programs::ProbeContext, +}; + +#[map] +static RING_BUF: RingBuf = RingBuf::with_byte_size(0, 0); + +// This structure's definition is duplicated in userspace. +#[repr(C)] +struct Registers { + dropped: u64, + rejected: u64, +} + +#[map] +static REGISTERS: Array<Registers> = Array::with_max_entries(1, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) { + Some(regs) => unsafe { &mut *regs }, + None => return, + }; + let mut entry = match RING_BUF.reserve::<u64>(0) { + Some(entry) => entry, + None => { + *dropped += 1; + return; + } + }; + // Write the first argument to the function back out to RING_BUF if it is even, + // otherwise increment the counter in REJECTED. This exercises discarding data. + let arg: u64 = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if arg % 2 == 0 { + entry.write(arg); + entry.submit(0); + } else { + *rejected += 1; + entry.discard(0); + } +} + +#[cfg(not(test))] +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + loop {} +} diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index 8fb11bda..fc16293c 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -9,18 +9,22 @@ anyhow = "1" aya = { path = "../../aya" } aya-log = { path = "../../aya-log" } aya-obj = { path = "../../aya-obj" } +epoll = "4.3.3" +futures = "0.3.28" libc = { version = "0.2.105" } log = "0.4" -matches = "0.1.8" +matches = "0.1.10" object = { version = "0.31", default-features = false, features = [ "elf", "read_core", "std", ] } +rand = { version = "0.8.5" } rbpf = "0.2.0" tokio = { version = "1.24", default-features = false, features = [ "macros", "time", + "rt-multi-thread", ] } [build-dependencies] diff --git a/test/integration-test/bpf/ring_buf_sched_tracepoint.bpf.c b/test/integration-test/bpf/ring_buf_sched_tracepoint.bpf.c new file mode 100644 index 00000000..bfd38639 --- /dev/null +++ b/test/integration-test/bpf/ring_buf_sched_tracepoint.bpf.c @@ -0,0 +1,23 @@ +#include <linux/bpf.h> +#include <bpf/bpf_helpers.h> + +char _license[] SEC("license") = "GPL"; + +struct +{ + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024 /* 256 KB */); +} rb SEC(".maps"); + +// This probe writes a zero to rb every time the sched_switch tracepoint is hit. +// +// TODO(https://github.com/aya-rs/aya/issues/375): This should be called something like +// "tracepoint/sched/sched_switch" but there's a bug related to loading tracepoints with such names. +// Fix that and rename this. +SEC("tracepoint") +int bpf_prog(struct switch_args* ctx) +{ + unsigned long long e = 0; + bpf_ringbuf_output(&rb, &e, sizeof(e), 0); + return 0; +} diff --git a/test/integration-test/build.rs b/test/integration-test/build.rs index 3d9fa08e..2b7310ef 100644 --- a/test/integration-test/build.rs +++ b/test/integration-test/build.rs @@ -42,6 +42,10 @@ fn main() { const C_BPF_PROBES: &[(&str, &str)] = &[ ("ext.bpf.c", "ext.bpf.o"), ("main.bpf.c", "main.bpf.o"), + ( + "ring_buf_sched_tracepoint.bpf.c", + "ring_buf_sched_tracepoint.bpf.o", + ), ("multimap-btf.bpf.c", "multimap-btf.bpf.o"), ("text_64_64_reloc.c", "text_64_64_reloc.o"), ]; diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index 7d571104..d058319a 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -15,6 +15,9 @@ pub const TEST: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/test") pub const RELOCATIONS: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/relocations")); pub const BPF_PROBE_READ: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/bpf_probe_read")); +pub const RING_BUF: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/ring_buf")); +pub const RING_BUF_SCHED_TRACEPOINT: &[u8] = + include_bytes_aligned!(concat!(env!("OUT_DIR"), "/ring_buf_sched_tracepoint.bpf.o")); #[cfg(test)] mod tests; diff --git a/test/integration-test/src/tests.rs b/test/integration-test/src/tests.rs index dd8565b0..9843b0ad 100644 --- a/test/integration-test/src/tests.rs +++ b/test/integration-test/src/tests.rs @@ -5,4 +5,5 @@ mod load; mod log; mod rbpf; mod relocations; +mod ring_buf; mod smoke; diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs new file mode 100644 index 00000000..7825d9c1 --- /dev/null +++ b/test/integration-test/src/tests/ring_buf.rs @@ -0,0 +1,338 @@ +use anyhow::Context as _; +use aya::{ + maps::{array::Array, ring_buf::RingBuf, MapData}, + programs::{TracePoint, UProbe}, + Bpf, BpfLoader, Btf, Pod, +}; +use aya_obj::generated::BPF_RINGBUF_HDR_SZ; +use core::panic; +use futures::{select_biased, FutureExt as _}; +use matches::assert_matches; +use rand::Rng as _; +use std::os::fd::AsRawFd as _; +use tokio::{ + io::unix::AsyncFd, + time::{sleep, Duration}, +}; + +/// Generate a variable length vector of u64s. +struct RingBufTest { + _bpf: Bpf, + ring_buf: RingBuf<MapData>, + regs: Array<MapData, Registers>, + data: Vec<u64>, +} + +// This structure's definition is duplicated in the probe. +#[repr(C)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct Registers { + dropped: u64, + rejected: u64, +} + +unsafe impl Pod for Registers {} + +// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer +// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if +// that's not the case because the actual size will be rounded up, and fewer entries will be dropped +// than expected. +const RING_BUF_MAX_ENTRIES: usize = 512; + +impl RingBufTest { + fn new() -> Self { + const RING_BUF_BYTE_SIZE: u32 = (RING_BUF_MAX_ENTRIES + * (core::mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) + as u32; + + // Use the loader API to control the size of the ring_buf. + let mut bpf = BpfLoader::new() + .btf(Some(Btf::from_sys_fs().unwrap()).as_ref()) + .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) + .load(crate::RING_BUF) + .unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); + let ring_buf = RingBuf::try_from(ring_buf).unwrap(); + let regs = bpf.take_map("REGISTERS").unwrap(); + let regs = Array::<_, Registers>::try_from(regs).unwrap(); + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + Some("ring_buf_trigger_ebpf_program"), + 0, + "/proc/self/exe", + None, + ) + .unwrap(); + + let data = { + let mut rng = rand::thread_rng(); + // Generate more entries than there is space so we can test dropping entries. + let n = rng.gen_range(1..=RING_BUF_MAX_ENTRIES * 2); + std::iter::repeat_with(|| rng.gen()).take(n).collect() + }; + + Self { + _bpf: bpf, + ring_buf, + regs, + data, + } + } +} + +#[test] +fn ring_buf() { + let RingBufTest { + ring_buf, + ref regs, + ref data, + .. + } = &mut RingBufTest::new(); + // Note that after expected_capacity has been submitted, reserve calls in the probe will fail + // and the probe will give up. + let expected_capacity = RING_BUF_MAX_ENTRIES - 1; + + // Call the function that the uprobe is attached to with randomly generated data. + let mut expected = Vec::new(); + let mut expected_rejected = 0u64; + let mut expected_dropped = 0u64; + for (i, &v) in data.iter().enumerate() { + ring_buf_trigger_ebpf_program(v); + if i >= expected_capacity { + expected_dropped += 1; + } else if v % 2 == 0 { + expected.push(v); + } else { + expected_rejected += 1; + } + } + + let mut seen = Vec::<u64>::new(); + while seen.len() < expected.len() { + if let Some(item) = ring_buf.next() { + let item: [u8; 8] = (*item).try_into().unwrap(); + let arg = u64::from_ne_bytes(item); + seen.push(arg); + } + } + + // Make sure that there is nothing else in the ring_buf. + assert_matches!(ring_buf.next(), None); + + // Ensure that the data that was read matches what was passed, and the rejected count was set + // properly. + assert_eq!(seen, expected); + assert_eq!( + regs.get(&0, 0).unwrap(), + Registers { + dropped: expected_dropped, + rejected: expected_rejected, + } + ); +} + +#[no_mangle] +#[inline(never)] +pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) { + std::hint::black_box(arg); +} + +// This test differs from the other async test in that it's possible for the producer +// to fill the ring_buf. We just ensure that the number of events we see is sane given +// what the producer sees, and that the logic does not hang. This exercises interleaving +// discards, successful commits, and drops due to the ring_buf being full. +#[tokio::test(flavor = "multi_thread")] +async fn ring_buf_async_with_drops() { + let RingBufTest { + ring_buf, + ref regs, + ref data, + _bpf, + } = &mut RingBufTest::new(); + + let mut writer = futures::future::try_join_all(data.chunks(64).map(|v| { + let writer_chunk = Vec::from(v); + tokio::spawn(async move { + for value in writer_chunk { + ring_buf_trigger_ebpf_program(value); + } + }) + })) + .fuse(); + + // Spwan the writer which internally will spawn many parallel writers. + // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. + let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap(); + let mut seen = 0; + loop { + let readable = async_fd.readable().fuse(); + tokio::pin!(readable); + select_biased! { + guard = readable => { + let mut got = 0; + while let Some(read) = ring_buf.next() { + got += 1; + let read: [u8; 8] = (*read) + .try_into() + .context(format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + } + seen += got; + guard.unwrap().clear_ready(); + } + writer = writer => { + writer.unwrap() ; + break; + }, + }; + } + let max_dropped: u64 = data + .len() + .checked_sub(RING_BUF_MAX_ENTRIES - 1) + .unwrap_or_default() + .try_into() + .unwrap(); + let max_seen: u64 = data + .iter() + .filter(|v| *v % 2 == 0) + .count() + .try_into() + .unwrap(); + let max_rejected = u64::try_from(data.len()).unwrap() - max_seen; + let Registers { dropped, rejected } = regs.get(&0, 0).unwrap(); + assert_in(dropped, 0u64..=max_dropped); + assert_in(rejected, rejected - dropped..=max_rejected); + assert_in(seen, max_seen - dropped..=max_seen); + + fn assert_in(val: u64, range: impl core::ops::RangeBounds<u64> + core::fmt::Debug) { + assert!(range.contains(&val), "{val} not in {range:?}"); + } +} + +#[tokio::test] +async fn ring_buf_async_no_drop() { + let RingBufTest { + ring_buf, + ref regs, + ref data, + _bpf, + } = &mut RingBufTest::new(); + + let writer = async move { + let mut rng = rand::thread_rng(); + for &value in data { + // Sleep a tad so we feel confident that the consumer will keep up + // and no messages will be dropped. + sleep(Duration::from_nanos(rng.gen_range(0..1000))).await; + ring_buf_trigger_ebpf_program(value); + } + }; + + // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. + let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap(); + // Note that unlike in the synchronous case where all of the entries are written before any of + // them are read, in this case we expect all of the entries to make their way to userspace + // because entries are being consumed as they are produced. + let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect(); + let expected_len = expected.len(); + let reader = async { + let mut seen = Vec::with_capacity(expected_len); + while seen.len() < expected_len { + let mut guard = async_fd.readable().await.unwrap(); + while let Some(read) = ring_buf.next() { + let read: [u8; 8] = (*read) + .try_into() + .context(format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + seen.push(arg); + } + guard.clear_ready(); + } + seen + }; + let ((), seen) = futures::future::join(writer, reader).await; + + // Make sure that there is nothing else in the ring_buf. + assert_matches!(ring_buf.next(), None); + + // Ensure that the data that was read matches what was passed. + assert_eq!(&seen, &expected); + assert_eq!( + regs.get(&0, 0).unwrap(), + Registers { + dropped: 0, + rejected: (data.len() - expected.len()).try_into().unwrap(), + } + ); +} + +// This test reproduces a bug where the ring buffer would not be notified of new entries if the +// state was not properly synchronized between the producer and consumer. This would result in the +// consumer never being woken up and the test hanging. +#[test] +fn ring_buf_epoll_wakeup() { + let mut bpf = Bpf::load(crate::RING_BUF_SCHED_TRACEPOINT).unwrap(); + let rb = bpf.take_map("rb").unwrap(); + let mut rb = RingBuf::try_from(rb).unwrap(); + let prog: &mut TracePoint = bpf.program_mut("tracepoint").unwrap().try_into().unwrap(); + prog.load().unwrap(); + prog.attach("sched", "sched_switch").unwrap(); + + let epoll_fd = epoll::create(false).unwrap(); + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_ADD, + rb.as_raw_fd(), + // The use of EPOLLET is intentional. Without it, level-triggering would result in + // more notifications, and would mask the underlying bug this test reproduced when + // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's + // AsyncFd always uses this flag (as demonstrated in the subsequent test). + epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0), + ) + .unwrap(); + let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1]; + let mut total_events = 0; + while total_events < 1_000_000 { + epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap(); + let mut events_after_wake = 0; + while let Some(read) = rb.next() { + assert_eq!(read.len(), 8); + events_after_wake += 1; + total_events += 1; + } + assert_ne!(events_after_wake, 0); + } +} + +// This test is like the above test but uses tokio and AsyncFd instead of raw epoll. +#[tokio::test] +async fn ring_buf_asyncfd_events() { + let mut bpf = Bpf::load(crate::RING_BUF_SCHED_TRACEPOINT).unwrap(); + let rb = bpf.take_map("rb").unwrap(); + let mut rb = RingBuf::try_from(rb).unwrap(); + let prog: &mut TracePoint = bpf.program_mut("tracepoint").unwrap().try_into().unwrap(); + prog.load().unwrap(); + prog.attach("sched", "sched_switch").unwrap(); + + let async_fd = AsyncFd::new(rb.as_raw_fd()).unwrap(); + let mut total_events = 0; + while total_events < 1_000_000 { + let mut guard = async_fd.readable().await.unwrap(); + let mut events_after_wake = 0; + while let Some(read) = rb.next() { + assert_eq!(read.len(), 8); + events_after_wake += 1; + total_events += 1; + } + guard.clear_ready(); + assert_ne!(events_after_wake, 0); + } +}