diff --git a/aya/src/bpf.rs b/aya/src/bpf.rs index 336b908b..b744723a 100644 --- a/aya/src/bpf.rs +++ b/aya/src/bpf.rs @@ -10,6 +10,7 @@ use std::{ use aya_obj::{ btf::{BtfFeatures, BtfRelocationError}, generated::{BPF_F_SLEEPABLE, BPF_F_XDP_HAS_FRAGS}, + maps::InvalidMapTypeError, relocation::BpfRelocationError, BpfSectionKind, Features, }; @@ -39,7 +40,7 @@ use crate::{ is_btf_supported, is_btf_type_tag_supported, is_perf_link_supported, is_probe_read_kernel_supported, is_prog_name_supported, retry_with_verifier_logs, }, - util::{bytes_of, bytes_of_slice, possible_cpus, VerifierLog, POSSIBLE_CPUS}, + util::{bytes_of, bytes_of_slice, page_size, possible_cpus, VerifierLog, POSSIBLE_CPUS}, }; pub(crate) const BPF_OBJ_NAME_LEN: usize = 16; @@ -374,23 +375,24 @@ impl<'a> BpfLoader<'a> { { continue; } - - match self.max_entries.get(name.as_str()) { - Some(size) => obj.set_max_entries(*size), - None => { - if obj.map_type() == BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 - && obj.max_entries() == 0 - { - obj.set_max_entries( - possible_cpus() - .map_err(|error| BpfError::FileError { - path: PathBuf::from(POSSIBLE_CPUS), - error, - })? - .len() as u32, - ); - } - } + let num_cpus = || -> Result { + Ok(possible_cpus() + .map_err(|error| BpfError::FileError { + path: PathBuf::from(POSSIBLE_CPUS), + error, + })? + .len() as u32) + }; + let map_type: bpf_map_type = + obj.map_type().try_into().map_err(Into::::into)?; + if let Some(max_entries) = max_entries_override( + map_type, + self.max_entries.get(name.as_str()).cloned(), + || obj.max_entries(), + num_cpus, + || page_size() as u32, + )? { + obj.set_max_entries(max_entries) } let mut map = MapData { obj, @@ -629,6 +631,7 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { BPF_MAP_TYPE_PERCPU_HASH => Ok(Map::PerCpuHashMap(map)), BPF_MAP_TYPE_LRU_PERCPU_HASH => Ok(Map::PerCpuLruHashMap(map)), BPF_MAP_TYPE_PERF_EVENT_ARRAY => Ok(Map::PerfEventArray(map)), + BPF_MAP_TYPE_RINGBUF => Ok(Map::RingBuf(map)), BPF_MAP_TYPE_SOCKHASH => Ok(Map::SockHash(map)), BPF_MAP_TYPE_SOCKMAP => Ok(Map::SockMap(map)), BPF_MAP_TYPE_BLOOM_FILTER => Ok(Map::BloomFilter(map)), @@ -644,6 +647,100 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { Ok((name, map)) } +/// Computes the value which should be used to override the max_entries value of the map +/// based on the user-provided override and the rules for that map type. +fn max_entries_override( + map_type: bpf_map_type, + user_override: Option, + current_value: impl Fn() -> u32, + num_cpus: impl Fn() -> Result, + page_size: impl Fn() -> u32, +) -> Result, BpfError> { + let max_entries = || user_override.unwrap_or_else(¤t_value); + Ok(match map_type { + BPF_MAP_TYPE_PERF_EVENT_ARRAY if max_entries() == 0 => Some(num_cpus()?), + BPF_MAP_TYPE_RINGBUF => Some(adjust_to_page_size(max_entries(), page_size())) + .filter(|adjusted| *adjusted != max_entries()) + .or(user_override), + _ => user_override, + }) +} + +// Adjusts the byte size of a RingBuf map to match a power-of-two multiple of the page size. +// +// This mirrors the logic used by libbpf. +// See https://github.com/libbpf/libbpf/blob/ec6f716eda43fd0f4b865ddcebe0ce8cb56bf445/src/libbpf.c#L2461-L2463 +fn adjust_to_page_size(byte_size: u32, page_size: u32) -> u32 { + // If the byte_size is zero, return zero and let the verifier reject the map + // when it is loaded. This is the behavior of libbpf. + if byte_size == 0 { + return 0; + } + let pages_needed = (byte_size + page_size - 1) / page_size; + page_size + * if pages_needed.is_power_of_two() { + pages_needed + } else { + pages_needed.next_power_of_two() + } +} + +#[cfg(test)] +mod tests { + + use crate::generated::bpf_map_type::*; + + const PAGE_SIZE: u32 = 4096; + const NUM_CPUS: u32 = 4; + + #[test] + fn test_adjust_to_page_size() { + use super::adjust_to_page_size; + [ + (0, 0), + (4096, 1), + (4096, 4095), + (4096, 4096), + (8192, 4097), + (8192, 8192), + (16384, 8193), + ] + .into_iter() + .for_each(|(exp, input)| assert_eq!(exp, adjust_to_page_size(input, PAGE_SIZE))) + } + + #[test] + fn test_max_entries_override() { + use super::max_entries_override; + [ + (BPF_MAP_TYPE_RINGBUF, Some(1), 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, PAGE_SIZE, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 1, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(42), 1, Some(42)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(0), 1, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 0, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 42, None), + (BPF_MAP_TYPE_ARRAY, None, 1, None), + (BPF_MAP_TYPE_ARRAY, Some(2), 1, Some(2)), + ] + .into_iter() + .for_each(|(map_type, user_override, current_value, exp)| { + assert_eq!( + exp, + max_entries_override( + map_type, + user_override, + || { current_value }, + || Ok(NUM_CPUS), + || PAGE_SIZE + ) + .unwrap() + ) + }) + } +} + impl<'a> Default for BpfLoader<'a> { fn default() -> Self { BpfLoader::new() diff --git a/aya/src/maps/mod.rs b/aya/src/maps/mod.rs index 485c5cce..11711008 100644 --- a/aya/src/maps/mod.rs +++ b/aya/src/maps/mod.rs @@ -69,6 +69,7 @@ pub mod hash_map; pub mod lpm_trie; pub mod perf; pub mod queue; +pub mod ring_buf; pub mod sock; pub mod stack; pub mod stack_trace; @@ -82,6 +83,7 @@ pub use lpm_trie::LpmTrie; pub use perf::AsyncPerfEventArray; pub use perf::PerfEventArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock::{SockHash, SockMap}; pub use stack::Stack; pub use stack_trace::StackTraceMap; @@ -255,6 +257,8 @@ pub enum Map { PerCpuLruHashMap(MapData), /// A [`PerfEventArray`] map PerfEventArray(MapData), + /// A [`RingBuf`] map + RingBuf(MapData), /// A [`SockMap`] map SockMap(MapData), /// A [`SockHash`] map @@ -283,6 +287,7 @@ impl Map { Map::PerCpuHashMap(map) => map.obj.map_type(), Map::PerCpuLruHashMap(map) => map.obj.map_type(), Map::PerfEventArray(map) => map.obj.map_type(), + Map::RingBuf(map) => map.obj.map_type(), Map::SockHash(map) => map.obj.map_type(), Map::SockMap(map) => map.obj.map_type(), Map::BloomFilter(map) => map.obj.map_type(), @@ -344,6 +349,7 @@ impl_try_from_map!( SockMap from Map::SockMap, PerfEventArray from Map::PerfEventArray, StackTraceMap from Map::StackTraceMap, + RingBuf from Map::RingBuf, ); #[cfg(feature = "async")] diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs new file mode 100644 index 00000000..4ae6dbad --- /dev/null +++ b/aya/src/maps/ring_buf.rs @@ -0,0 +1,419 @@ +//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs. +//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF +//! programs to userspace. +//! +//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html + +use crate::{ + generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ}, + maps::{MapData, MapError}, + sys::mmap, +}; +use libc::{c_void, munmap, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE}; +use std::{ + io, + ops::Deref, + os::fd::{AsRawFd, RawFd}, + ptr, + ptr::NonNull, + sync::atomic::{fence, AtomicU32, Ordering}, +}; + +/// A map that can be used to receive events from eBPF programs. +/// +/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways: +/// * It's shared across all CPUs, which allows a strong ordering between events. +/// * Data notifications are delivered more precisely instead of being sampled for every N events; +/// the eBPF program can also control notification delivery if sampling is desired for performance +/// reasons. By default, a notification will be sent if the consumer is caught up at the time of +/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to +/// control this behavior. +/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly +/// written into the ring without copying from a temporary location. +/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`, +/// and not the userspace reader. This might require extra code to handle, but allows for more +/// flexible schemes to handle dropped samples. +/// +/// To receive events you need to: +/// * Construct [`RingBuf`] using [`RingBuf::try_from`] on [`MapData`] something which implements +/// [`core::borrow::Borrow`]`. +/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`]. +/// +/// To receive async notifications of data availability, you clients may +/// construct an AsyncFd from the [`RingBuf`]'s file descriptor and poll it for +/// readiness. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 5.8. +/// +#[doc(alias = "BPF_MAP_TYPE_RINGBUF")] +pub struct RingBuf { + _map: T, + map_fd: i32, + page_size: usize, + consumer: ConsumerMeta, + producer: ProducerMeta, + data: DataPages, +} + +impl> RingBuf { + pub(crate) fn new(map: T) -> Result { + let data: &MapData = map.borrow(); + + let page_size = crate::util::page_size(); + let map_fd = data.fd_or_err().map_err(MapError::from)?; + let byte_size = data.obj.max_entries(); + + let mmap = |len, prot, offset| { + let res = unsafe { mmap(ptr::null_mut(), len, prot, MAP_SHARED, map_fd, offset) }; + match res { + MAP_FAILED => Err(MapError::SyscallError { + call: "mmap", + io_error: io::Error::last_os_error(), + }), + // This should never happen, but to be paranoid, and so we never + // need to talk about a null pointer, we check it anyway. + _ => std::ptr::NonNull::new(res).ok_or(MapError::SyscallError { + call: "mmap", + io_error: io::Error::new(io::ErrorKind::Other, "mmap returned null pointer"), + }), + } + }; + + // The consumer metadata page is mapped once, read-write. + // The producer pages have one page of metadata and then the data + // pages are mapped twice, read-only. From kernel/bpf/ringbuf.c[0]: + // + // Each data page is mapped twice to allow "virtual" + // continuous read of samples wrapping around the end of ring + // buffer area: + // ------------------------------------------------------ + // | meta pages | real data pages | same data pages | + // ------------------------------------------------------ + // | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | + // ------------------------------------------------------ + // | | TA DA | TA DA | + // ------------------------------------------------------ + // ^^^^^^^ + // | + // Here, no need to worry about special handling of wrapped-around + // data due to double-mapped data pages. This works both in kernel and + // when mmap()'ed in user-space, simplifying both kernel and + // user-space implementations significantly. + // + // [0]: https://github.com/torvalds/linux/blob/3f01e9fed8454dcd89727016c3e5b2fbb8f8e50c/kernel/bpf/ringbuf.c#L108-L124 + let consumer_page = mmap(page_size, PROT_READ | PROT_WRITE, 0)?; + let producer_pages_len = page_size + 2 * (byte_size as usize); + let producer_pages = mmap(producer_pages_len, PROT_READ, page_size as i64)?; + let data_pages = unsafe { + // Safe because we know page_size is properly aligned and producer_pages is NonNull. + NonNull::new_unchecked((producer_pages.as_ptr() as usize + page_size) as *mut c_void) + }; + Ok(RingBuf { + _map: map, + map_fd, + consumer: ConsumerMeta::new(consumer_page), + producer: ProducerMeta::new(producer_pages), + data: DataPages::new(data_pages, byte_size - 1), + page_size, + }) + } +} + +impl RingBuf { + /// Try to take a new entry from the ringbuf. + /// + /// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in + /// which case the caller may register for availability notifications through `epoll` or other + /// APIs. Only one RingBufItem may be outstanding at a time. + // + // This is not an implementation of `Iterator` because we need to be able to refer + // to the lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item + // leveraged GATs, one could imagine an implementation of `Iterator` that would work. + // GATs are stabilized in Rust 1.65, but there's not yet a trait that the community + // seems to have standardized around. + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> Option> { + let Self { + _map: _, + map_fd: _, + page_size: _, + consumer, + producer, + data, + } = self; + + loop { + let consumer_pos = consumer.load(); + if producer.caught_up_to(consumer_pos) { + return None; + } + let header = data.load_header(consumer_pos); + match header.state() { + HeaderState::Ready => return Some(RingBufItem(self)), + HeaderState::Busy => return None, + HeaderState::Discard => consumer.consume(header), + } + } + } + + fn consume(&mut self) { + let Self { consumer, data, .. } = self; + consumer.consume(data.load_header(consumer.load())) + } +} + +impl Drop for RingBuf { + fn drop(&mut self) { + let &mut Self { + consumer: ConsumerMeta { + ptr: consumer_pos_ptr, + }, + producer: + ProducerMeta { + ptr: producer_pos_ptr, + .. + }, + page_size, + data: DataPages { offset_mask, .. }, + .. + } = self; + + let consumer_len = page_size; + unsafe { munmap(consumer_pos_ptr.as_ptr() as *mut _, consumer_len) }; + let byte_size = (offset_mask + 1) as usize; + let producer_len = page_size + 2 * byte_size; + unsafe { munmap(producer_pos_ptr.as_ptr() as *mut _, producer_len) }; + } +} + +/// Access to the RawFd can be used to construct an AsyncFd for use with epoll. +impl AsRawFd for RingBuf { + fn as_raw_fd(&self) -> RawFd { + self.map_fd + } +} + +/// The current outstanding item read from the ringbuf. +pub struct RingBufItem<'a, T>(&'a mut RingBuf); + +impl<'a, T> Deref for RingBufItem<'a, T> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let Self(RingBuf { data, consumer, .. }) = self; + data.load_slice(consumer.load()) + } +} + +impl<'a, T> Drop for RingBufItem<'a, T> { + fn drop(&mut self) { + let Self(rb) = self; + rb.consume(); + } +} + +struct ConsumerMeta { + ptr: NonNull, +} + +impl ConsumerMeta { + fn new(ptr: NonNull) -> Self { + Self { ptr: ptr.cast() } + } + + fn load(&self) -> u32 { + let Self { ptr } = self; + + // Consumer pos is written by *us*. This means that we'll load the same value regardless + // of the `Ordering`. + unsafe { ptr.as_ref() }.load(Ordering::Relaxed) + } + + fn consume(&mut self, header: Header) { + let Self { ptr } = self; + unsafe { ptr.as_ref() }.fetch_add(roundup_len(header.len()), Ordering::Release); + } +} + +bitflags! { + #[derive(Debug, Clone, Copy)] + struct HeaderFlags: u32 { + const BUSY = BPF_RINGBUF_BUSY_BIT; + const DISCARD = BPF_RINGBUF_DISCARD_BIT; + } +} + +#[derive(Clone, Copy)] +struct Header(HeaderFlags); + +impl Header { + fn len(self) -> u32 { + let Self(flags) = self; + flags.difference(HeaderFlags::all()).bits() + } + + fn state(self) -> HeaderState { + let Self(flags) = self; + if flags.contains(HeaderFlags::BUSY) { + HeaderState::Busy + } else if flags.contains(HeaderFlags::DISCARD) { + HeaderState::Discard + } else { + HeaderState::Ready + } + } +} + +/// Abstracts the possible states of a ringbuf entry header. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum HeaderState { + /// Data is currently being written. + Busy, + /// The entry has been discarded. + Discard, + /// The entry is ready to be read. + Ready, +} + +/// Represents the pointer to the producer metadata page. +pub(super) struct ProducerMeta { + ptr: std::ptr::NonNull, + + // In common scenarios, the producer position advances more than one message + // by the time the consumer is notified. As a performance optimization, cache + // the position when reading it to minimize the contention on the producer metadata + // cache line. + cache: u32, +} + +impl ProducerMeta { + fn new(ptr: std::ptr::NonNull) -> Self { + Self { + cache: 0, + ptr: ptr.cast(), + } + } + + fn caught_up_to(&mut self, pos: u32) -> bool { + if self.cached_caught_up_to(pos) { + self.refresh_cache(); + self.cached_caught_up_to(pos) + } else { + false + } + } + + fn cached_caught_up_to(&self, pos: u32) -> bool { + let Self { cache, ptr: _ } = self; + debug_assert!(pos <= *cache, "pos: {}, cache: {}", pos, cache); + pos == *cache + } + + fn refresh_cache(&mut self) { + let Self { cache, ptr } = self; + let prev = *cache; + let load = || unsafe { ptr.as_ref() }.load(Ordering::Acquire); + let should_retry = |v| v == prev; + *cache = retry_with_barrier(load, should_retry) + } +} + +struct DataPages { + ptr: NonNull, + + // Used to mask the value of the consumer offset to an offset in the + // ringbuf data pages. + offset_mask: u32, + + // Used to mask the ringbuf message header. + // + // Note: it's unclear whether this masking is necessary, but libbpf takes + // care to always apply the offset mask to the length it reads out of + // message headers, so we will too. The kernel contract is unclear about + // what can possibly appear in the bit which are not in use as a flag today + // and are not covered by the mask for the length of an entry. To avoid + // needing to plumb mask around, we just mask out the bits we don't care + // about when we read the header using this mask. + header_mask: HeaderFlags, +} + +impl DataPages { + fn new(ptr: NonNull, mask: u32) -> Self { + Self { + ptr: ptr.cast(), + offset_mask: mask, + header_mask: HeaderFlags::all() | HeaderFlags::from_bits_retain(mask), + } + } + + fn load_header(&self, offset: u32) -> Header { + self.read_header_from_ptr(self.header_ptr(offset)) + } + + fn load_slice(&self, offset: u32) -> &[u8] { + let header_ptr = self.header_ptr(offset); + let data_ptr = (header_ptr as usize + BPF_RINGBUF_HDR_SZ as usize) as *const _; + let len = self.read_header_from_ptr(header_ptr).len() as usize; + unsafe { core::slice::from_raw_parts(data_ptr, len) } + } + + fn header_ptr(&self, offset: u32) -> *const AtomicU32 { + let Self { + ptr, + offset_mask, + header_mask: _, + } = self; + let offset = (offset & *offset_mask) as usize; + unsafe { ptr.as_ptr().add(offset) as *const AtomicU32 } + } + + fn read_header_from_ptr(&self, header_ptr: *const AtomicU32) -> Header { + let Self { header_mask, .. } = self; + let load = + || HeaderFlags::from_bits_retain(unsafe { (*header_ptr).load(Ordering::Acquire) }); + let should_retry = |v: HeaderFlags| v.contains(HeaderFlags::BUSY); + Header(*header_mask & retry_with_barrier(load, should_retry)) + } +} + +fn retry_with_barrier(f: impl Fn() -> T, should_retry: impl Fn(T) -> bool) -> T { + let val = f(); + if !should_retry(val) { + return val; + } + fence(Ordering::SeqCst); + f() +} + +/// Round up a `len` to the nearest 8 byte alignment, adding BPF_RINGBUF_HDR_SZ and +/// clearing out the upper two bits of `len`. +fn roundup_len(mut len: u32) -> u32 { + const LEN_MASK: u32 = !(BPF_RINGBUF_DISCARD_BIT | BPF_RINGBUF_BUSY_BIT); + // clear out the upper two bits (busy and discard) + len &= LEN_MASK; + // add the size of the header prefix + len += BPF_RINGBUF_HDR_SZ; + // round to up to next multiple of 8 + (len + 7) & !7 +} + +#[cfg(test)] +mod tests { + use super::{roundup_len, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ}; + + #[test] + fn test_roundup_len() { + // should always round up to nearest 8 byte alignment + BPF_RINGBUF_HDR_SZ + assert_eq!(roundup_len(0), BPF_RINGBUF_HDR_SZ); + assert_eq!(roundup_len(1), BPF_RINGBUF_HDR_SZ + 8); + assert_eq!(roundup_len(8), BPF_RINGBUF_HDR_SZ + 8); + assert_eq!(roundup_len(9), BPF_RINGBUF_HDR_SZ + 16); + // should discard the upper two bits of len + assert_eq!( + roundup_len(0 | (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)), + BPF_RINGBUF_HDR_SZ + ); + } +} diff --git a/bpf/aya-bpf/Cargo.toml b/bpf/aya-bpf/Cargo.toml index 3e6b390d..3395b2fb 100644 --- a/bpf/aya-bpf/Cargo.toml +++ b/bpf/aya-bpf/Cargo.toml @@ -11,3 +11,7 @@ aya-bpf-bindings = { path = "../aya-bpf-bindings" } [build-dependencies] rustversion = "1.0" + +[features] +default = [] +const_assert = [] diff --git a/bpf/aya-bpf/src/lib.rs b/bpf/aya-bpf/src/lib.rs index c0edd49a..633211ab 100644 --- a/bpf/aya-bpf/src/lib.rs +++ b/bpf/aya-bpf/src/lib.rs @@ -8,6 +8,11 @@ html_logo_url = "https://aya-rs.dev/assets/images/crabby.svg", html_favicon_url = "https://aya-rs.dev/assets/images/crabby.svg" )] +#![cfg_attr( + feature = "const_assert", + allow(incomplete_features), + feature(generic_const_exprs) +)] #![cfg_attr(unstable, feature(never_type))] #![cfg_attr(target_arch = "bpf", feature(asm_experimental_arch))] #![allow(clippy::missing_safety_doc)] diff --git a/bpf/aya-bpf/src/maps/mod.rs b/bpf/aya-bpf/src/maps/mod.rs index 8fa375dd..f117a777 100644 --- a/bpf/aya-bpf/src/maps/mod.rs +++ b/bpf/aya-bpf/src/maps/mod.rs @@ -13,6 +13,7 @@ pub mod per_cpu_array; pub mod perf; pub mod program_array; pub mod queue; +pub mod ring_buf; pub mod sock_hash; pub mod sock_map; pub mod stack; @@ -26,6 +27,7 @@ pub use per_cpu_array::PerCpuArray; pub use perf::{PerfEventArray, PerfEventByteArray}; pub use program_array::ProgramArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock_hash::SockHash; pub use sock_map::SockMap; pub use stack::Stack; diff --git a/bpf/aya-bpf/src/maps/ring_buf.rs b/bpf/aya-bpf/src/maps/ring_buf.rs new file mode 100644 index 00000000..de49ab21 --- /dev/null +++ b/bpf/aya-bpf/src/maps/ring_buf.rs @@ -0,0 +1,198 @@ +use core::{ + cell::UnsafeCell, + mem, + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; + +use crate::{ + bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF}, + helpers::{ + bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve, + bpf_ringbuf_submit, + }, + maps::PinningType, +}; + +#[repr(transparent)] +pub struct RingBuf { + def: UnsafeCell, +} + +unsafe impl Sync for RingBuf {} + +/// A ring buffer entry, returned from [`RingBuf::reserve`]. +/// +/// You must [`submit`] or [`discard`] this entry before it gets dropped. +/// +/// [`submit`]: RingBufEntry::submit +/// [`discard`]: RingBufEntry::discard +#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"] +pub struct RingBufEntry(&'static mut MaybeUninit); + +impl Deref for RingBufEntry { + type Target = MaybeUninit; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl DerefMut for RingBufEntry { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl RingBufEntry { + /// Discard this ring buffer entry. The entry will be skipped by the userspace reader. + pub fn discard(self, flags: u64) { + unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) }; + } + + /// Commit this ring buffer entry. The entry will be made visible to the userspace reader. + pub fn submit(self, flags: u64) { + unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) }; + } +} + +impl RingBuf { + /// Declare a BPF ring buffer. + /// + /// If `byte_size` is not a power-of-2 multiple of the page size, libbpf and aya will + /// coerced it to the next largest valid size when the program is loaded. This is a + /// requirement of the underlying kernel ring buffer implementation. + pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::None) + } + + /// Declare a pinned BPF ring buffer. + /// + /// If `byte_size` is not a power-of-2 multiple of the page size, libbpf and aya will + /// coerced it to the next largest valid size when the program is loaded. This is a + /// requirement of the underlying kernel ring buffer implementation. + pub const fn pinned(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::ByName) + } + + const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self { + Self { + def: UnsafeCell::new(bpf_map_def { + type_: BPF_MAP_TYPE_RINGBUF, + key_size: 0, + value_size: 0, + max_entries: byte_size, + map_flags: flags, + id: 0, + pinning: pinning_type as u32, + }), + } + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full. + #[cfg(feature = "const_assert")] + pub fn reserve(&self, flags: u64) -> Option> + where + Assert<{ 8 % core::mem::align_of::() == 0 }>: IsTrue, + { + self.reserve_impl(flags) + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full. + /// + /// Note: `T` must be aligned to no more than 8 bytes it's not possible to fulfill larger + /// alignment requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + #[cfg(not(feature = "const_assert"))] + pub fn reserve(&self, flags: u64) -> Option> { + assert!(8 % core::mem::align_of::() == 0); + self.reserve_impl(flags) + } + + fn reserve_impl(&self, flags: u64) -> Option> { + let ptr = unsafe { + bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::() as _, flags) + as *mut MaybeUninit + }; + match ptr.is_null() { + true => None, + false => Some(RingBufEntry(unsafe { &mut *ptr })), + } + } + + /// Copy `data` to the ring buffer output. + /// + /// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a + /// copy from either a map buffer or the stack. + /// + /// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to + /// create in eBPF but still possible, e.g. by slicing an array). + /// + /// [`reserve`]: RingBuf::reserve + /// [`submit`]: RingBufEntry::submit + #[cfg(feature = "const_assert")] + pub fn output(&self, data: &T, flags: u64) -> Result<(), i64> + where + Assert<{ 8 % core::mem::align_of::<&T>() == 0 }>: IsTrue, + { + assert!(8 % core::mem::align_of::<&T>() == 0); + self.output_impl(data, flags) + } + + /// Copy `data` to the ring buffer output. + /// + /// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a + /// copy from either a map buffer or the stack. + /// + /// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to + /// create in eBPF but still possible, e.g. by slicing an array). + /// + /// Note: `T` must be aligned to no more than 8 bytes it's not possible to fulfill larger + /// alignment requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + /// + /// [`reserve`]: RingBuf::reserve + /// [`submit`]: RingBufEntry::submit + #[cfg(not(feature = "const_assert"))] + pub fn output(&self, data: &T, flags: u64) -> Result<(), i64> { + assert!(8 % core::mem::align_of::<&T>() == 0); + self.output_impl(data, flags) + } + + fn output_impl(&self, data: &T, flags: u64) -> Result<(), i64> { + // See `reserve` for alignment requirements. + assert!(8 % mem::align_of_val(data) == 0); + + let ret = unsafe { + bpf_ringbuf_output( + self.def.get() as *mut _, + data as *const _ as *mut _, + mem::size_of_val(data) as _, + flags, + ) + }; + if ret < 0 { + Err(ret) + } else { + Ok(()) + } + } + + /// Query various information about the ring buffer. + /// + /// Consult `bpf_ringbuf_query` documentation for a list of allowed flags. + pub fn query(&self, flags: u64) -> u64 { + unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) } + } +} + +#[cfg(feature = "const_assert")] +pub struct Assert {} +#[cfg(feature = "const_assert")] +pub trait IsTrue {} +#[cfg(feature = "const_assert")] +impl IsTrue for Assert {} diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index 73edf5fd..1ef5645d 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -34,4 +34,8 @@ path = "src/relocations.rs" [[bin]] name = "bpf_probe_read" -path = "src/bpf_probe_read.rs" \ No newline at end of file +path = "src/bpf_probe_read.rs" + +[[bin]] +name = "ring_buf" +path = "src/ring_buf.rs" diff --git a/test/integration-ebpf/src/ring_buf.rs b/test/integration-ebpf/src/ring_buf.rs new file mode 100644 index 00000000..a9fa7f19 --- /dev/null +++ b/test/integration-ebpf/src/ring_buf.rs @@ -0,0 +1,42 @@ +#![no_std] +#![no_main] + +use aya_bpf::{ + macros::{map, uprobe}, + maps::RingBuf, + programs::ProbeContext, +}; +use core::mem::size_of; + +// Make a buffer large enough to hold MAX_ENTRIES entries at the same time. +// This requires taking into consideration the header size. +type Entry = u64; +const MAX_ENTRIES: usize = 1024; +const HDR_SIZE: usize = aya_bpf::bindings::BPF_RINGBUF_HDR_SZ as usize; + +// Add 1 because the capacity at any given time is actually one less than +// you might think because the consumer_pos and producer_pos being equal +// would mean that the buffer is empty. The synchronous test fills the +// buffer, hence this logic. +const RING_BUF_SIZE: usize = ((size_of::() + HDR_SIZE) * MAX_ENTRIES) + 1; + +#[map] +static RING_BUF: RingBuf = RingBuf::with_byte_size(RING_BUF_SIZE as u32, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + // Write the first argument to the function back out to RING_BUF. + let arg: Entry = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if let Some(mut entry) = RING_BUF.reserve::(0) { + entry.write(arg); + entry.submit(0); + } +} + +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + loop {} +} diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index 07eaa927..19b41e8d 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -12,6 +12,7 @@ aya-obj = { path = "../../aya-obj" } libc = { version = "0.2.105" } log = "0.4" object = { version = "0.31", default-features = false, features = ["std", "read_core", "elf"] } +rand = { version = "0.8.5" } rbpf = "0.2.0" regex = "1" tempfile = "3.3.0" diff --git a/test/integration-test/tests/ring_buf.rs b/test/integration-test/tests/ring_buf.rs new file mode 100644 index 00000000..1b5d8437 --- /dev/null +++ b/test/integration-test/tests/ring_buf.rs @@ -0,0 +1,126 @@ +use anyhow::{Context as _, Context}; +use aya::{include_bytes_aligned, maps::ring_buf::RingBuf, programs::UProbe, Bpf}; +use std::os::fd::AsRawFd as _; +use tokio::{ + io::unix::AsyncFd, + task::spawn, + time::{sleep, Duration}, +}; + +#[test] +fn ring_buf() { + let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf"); + let mut bpf = Bpf::load(bytes).unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); + let mut ring_buf = RingBuf::try_from(ring_buf).unwrap(); + + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + Some("ring_buf_trigger_ebpf_program"), + 0, + "/proc/self/exe", + None, + ) + .unwrap(); + + // Generate some random data. + let data = gen_data(); + + // Call the function that the uprobe is attached to with randomly generated data. + for val in &data { + ring_buf_trigger_ebpf_program(*val); + } + // Read the data back out of the ring buffer. + let mut seen = Vec::::new(); + while seen.len() < data.len() { + if let Some(item) = ring_buf.next() { + let item: [u8; 8] = (*item).try_into().unwrap(); + let arg = u64::from_ne_bytes(item); + seen.push(arg); + } + } + // Ensure that the data that was read matches what was passed. + assert_eq!(seen, data); +} + +#[no_mangle] +#[inline(never)] +pub extern "C" fn ring_buf_trigger_ebpf_program(_arg: u64) {} + +/// Generate a variable length vector of u64s. The number of values is always small enough to fit +/// into the RING_BUF defined in the probe. +pub(crate) fn gen_data() -> Vec { + const DATA_LEN_RANGE: core::ops::RangeInclusive = 1..=1024; + use rand::Rng as _; + let mut rng = rand::thread_rng(); + let n = rng.gen_range(DATA_LEN_RANGE); + std::iter::repeat_with(|| rng.gen()).take(n).collect() +} + +#[tokio::test] +async fn ring_buf_async() { + let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf"); + let mut bpf = Bpf::load(bytes).unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); + let mut ring_buf = RingBuf::try_from(ring_buf).unwrap(); + + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + Some("ring_buf_trigger_ebpf_program"), + 0, + "/proc/self/exe", + None, + ) + .unwrap(); + + // Generate some random data. + let data = gen_data(); + let write_handle = spawn(call_ring_buf_trigger_ebpf_program_over_time(data.clone())); + + // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. + let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap(); + let seen = { + let mut seen = Vec::with_capacity(data.len()); + while seen.len() < data.len() { + // Wait for readiness, then clear the bit before reading so that no notifications + // are missed. + async_fd.readable().await.unwrap().clear_ready(); + while let Some(data) = ring_buf.next() { + let data: [u8; 8] = (*data) + .try_into() + .context(format!("data: {:?}", (&*data).len())) + .unwrap(); + let arg = u64::from_ne_bytes(data); + seen.push(arg); + } + } + seen + }; + + // Ensure that the data that was read matches what was passed. + assert_eq!(seen, data); + write_handle.await.unwrap(); +} + +async fn call_ring_buf_trigger_ebpf_program_over_time(data: Vec) { + let random_duration = || { + use rand::Rng as _; + let mut rng = rand::thread_rng(); + let micros = rng.gen_range(0..1_000); + Duration::from_micros(micros) + }; + for value in data { + sleep(random_duration()).await; + ring_buf_trigger_ebpf_program(value); + } +}