From 687498a4c5f443f7bff3a64bdb81adc8f40a0df9 Mon Sep 17 00:00:00 2001 From: Tatsuyuki Ishi Date: Thu, 21 Oct 2021 17:52:01 -0400 Subject: [PATCH] aya: Implement RingBuf This implements the userspace binding for RingBuf. Instead of streaming the samples as heap buffers, the process_ring function takes a callback to which we pass the event's byte region, roughly following [libbpf]'s API design. This avoids a copy and allows marking the consumer pointer in a timely manner. [libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c Additionally, integration tests are added to demonstrate the usage of the new APIs and to ensure that they work end-to-end. Co-authored-by: William Findlay Co-authored-by: Andrew Werner --- aya/src/bpf.rs | 156 +++++++-- aya/src/maps/mod.rs | 6 + aya/src/maps/ring_buf.rs | 301 ++++++++++++++++++ bpf/aya-bpf/src/maps/mod.rs | 2 + bpf/aya-bpf/src/maps/ring_buf.rs | 159 +++++++++ test/integration-ebpf/Cargo.toml | 6 +- test/integration-ebpf/src/ring_buf.rs | 40 +++ test/integration-test/Cargo.toml | 1 + test/integration-test/tests/ring_buf.rs | 56 ++++ test/integration-test/tests/ring_buf_async.rs | 72 +++++ 10 files changed, 780 insertions(+), 19 deletions(-) create mode 100644 aya/src/maps/ring_buf.rs create mode 100644 bpf/aya-bpf/src/maps/ring_buf.rs create mode 100644 test/integration-ebpf/src/ring_buf.rs create mode 100644 test/integration-test/tests/ring_buf.rs create mode 100644 test/integration-test/tests/ring_buf_async.rs diff --git a/aya/src/bpf.rs b/aya/src/bpf.rs index 3c2aa495..52670af2 100644 --- a/aya/src/bpf.rs +++ b/aya/src/bpf.rs @@ -10,6 +10,7 @@ use std::{ use aya_obj::{ btf::{BtfFeatures, BtfRelocationError}, generated::{BPF_F_SLEEPABLE, BPF_F_XDP_HAS_FRAGS}, + maps::InvalidMapTypeError, relocation::BpfRelocationError, BpfSectionKind, Features, }; @@ -39,7 +40,7 @@ use crate::{ is_btf_supported, is_btf_type_tag_supported, is_perf_link_supported, is_probe_read_kernel_supported, is_prog_name_supported, retry_with_verifier_logs, }, - util::{bytes_of, bytes_of_slice, possible_cpus, VerifierLog, POSSIBLE_CPUS}, + util::{bytes_of, bytes_of_slice, page_size, possible_cpus, VerifierLog, POSSIBLE_CPUS}, }; pub(crate) const BPF_OBJ_NAME_LEN: usize = 16; @@ -374,23 +375,18 @@ impl<'a> BpfLoader<'a> { { continue; } - - match self.max_entries.get(name.as_str()) { - Some(size) => obj.set_max_entries(*size), - None => { - if obj.map_type() == BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 - && obj.max_entries() == 0 - { - obj.set_max_entries( - possible_cpus() - .map_err(|error| BpfError::FileError { - path: PathBuf::from(POSSIBLE_CPUS), - error, - })? - .len() as u32, - ); - } - } + let map_type: bpf_map_type = + obj.map_type() + .try_into() + .map_err(|InvalidMapTypeError { map_type }| { + BpfError::MapError(MapError::InvalidMapType { map_type }) + })?; + if let Some(max_entries) = RuntimeSystemInfo::max_entries_override( + map_type, + self.max_entries.get(name.as_str()).cloned(), + || obj.max_entries(), + )? { + obj.set_max_entries(max_entries) } let mut map = MapData { obj, @@ -633,6 +629,7 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { BPF_MAP_TYPE_PERCPU_HASH => Ok(Map::PerCpuHashMap(map)), BPF_MAP_TYPE_LRU_PERCPU_HASH => Ok(Map::PerCpuLruHashMap(map)), BPF_MAP_TYPE_PERF_EVENT_ARRAY => Ok(Map::PerfEventArray(map)), + BPF_MAP_TYPE_RINGBUF => Ok(Map::RingBuf(map)), BPF_MAP_TYPE_SOCKHASH => Ok(Map::SockHash(map)), BPF_MAP_TYPE_SOCKMAP => Ok(Map::SockMap(map)), BPF_MAP_TYPE_BLOOM_FILTER => Ok(Map::BloomFilter(map)), @@ -648,6 +645,129 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { Ok((name, map)) } +// Used to compute runtime map properties based on the properties of the current system. +trait SystemInfo { + fn page_size() -> u32; + fn num_cpus() -> Result; + + /// Computes the value which should be used to override the max_entries value of the map + /// based on the user-provided override and the rules for that map type. + fn max_entries_override( + map_type: bpf_map_type, + user_override: Option, + current_value: F, + ) -> Result, BpfError> + where + F: Fn() -> u32, + { + let max_entries = || user_override.unwrap_or_else(¤t_value); + Ok(match map_type { + BPF_MAP_TYPE_PERF_EVENT_ARRAY if max_entries() == 0 => Some(Self::num_cpus()?), + BPF_MAP_TYPE_RINGBUF => Some(Self::adjust_to_page_size(max_entries())) + .filter(|adjusted| *adjusted != max_entries()) + .or(user_override), + _ => user_override, + }) + } + + // Adjusts the byte size of a RingBuf map to match a power-of-two multiple of the page size. + // + // This mirrors the logic used by libbpf. + // See https://github.com/libbpf/libbpf/blob/ec6f716eda43fd0f4b865ddcebe0ce8cb56bf445/src/libbpf.c#L2461-L2463 + fn adjust_to_page_size(byte_size: u32) -> u32 { + // If the byte_size is zero, return zero and let the verifier reject the map + // when it is loaded. This is the behavior of libbpf. + if byte_size == 0 { + return 0; + } + let page_size = Self::page_size(); + let quotient = byte_size / page_size; + let remainder = byte_size % page_size; + let pages_needed = match remainder { + 0 if quotient.is_power_of_two() => return byte_size, + 0 => quotient, + _ => quotient + 1, + }; + page_size * pages_needed.next_power_of_two() + } +} + +struct RuntimeSystemInfo; + +impl SystemInfo for RuntimeSystemInfo { + fn page_size() -> u32 { + page_size() as u32 + } + + fn num_cpus() -> Result { + Ok(possible_cpus() + .map_err(|error| BpfError::FileError { + path: PathBuf::from(POSSIBLE_CPUS), + error, + })? + .len() as u32) + } +} + +#[cfg(test)] +mod tests { + + use super::SystemInfo; + use crate::generated::bpf_map_type::*; + + struct TestSystemInfo {} + + const PAGE_SIZE: u32 = 4096; + const NUM_CPUS: u32 = 4; + impl SystemInfo for TestSystemInfo { + fn page_size() -> u32 { + PAGE_SIZE + } + fn num_cpus() -> Result { + Ok(NUM_CPUS) + } + } + + #[test] + fn test_adjust_to_page_size() { + [ + (0, 0), + (4096, 1), + (4096, 4095), + (4096, 4096), + (8192, 4097), + (8192, 8192), + (16384, 8193), + ] + .into_iter() + .for_each(|(exp, input)| assert_eq!(exp, TestSystemInfo::adjust_to_page_size(input))) + } + + #[test] + fn test_max_entries_override() { + [ + (BPF_MAP_TYPE_RINGBUF, Some(1), 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, PAGE_SIZE, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 1, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(42), 1, Some(42)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(0), 1, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 0, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 42, None), + (BPF_MAP_TYPE_ARRAY, None, 1, None), + (BPF_MAP_TYPE_ARRAY, Some(2), 1, Some(2)), + ] + .into_iter() + .for_each(|(map_type, user_override, current_value, exp)| { + assert_eq!( + exp, + TestSystemInfo::max_entries_override(map_type, user_override, || { current_value }) + .unwrap() + ) + }) + } +} + impl<'a> Default for BpfLoader<'a> { fn default() -> Self { BpfLoader::new() diff --git a/aya/src/maps/mod.rs b/aya/src/maps/mod.rs index 3f20c503..feb8b67b 100644 --- a/aya/src/maps/mod.rs +++ b/aya/src/maps/mod.rs @@ -68,6 +68,7 @@ pub mod hash_map; pub mod lpm_trie; pub mod perf; pub mod queue; +pub mod ring_buf; pub mod sock; pub mod stack; pub mod stack_trace; @@ -81,6 +82,7 @@ pub use lpm_trie::LpmTrie; pub use perf::AsyncPerfEventArray; pub use perf::PerfEventArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock::{SockHash, SockMap}; pub use stack::Stack; pub use stack_trace::StackTraceMap; @@ -247,6 +249,8 @@ pub enum Map { PerCpuLruHashMap(MapData), /// A [`PerfEventArray`] map PerfEventArray(MapData), + /// A [`RingBuf`] map + RingBuf(MapData), /// A [`SockMap`] map SockMap(MapData), /// A [`SockHash`] map @@ -275,6 +279,7 @@ impl Map { Map::PerCpuHashMap(map) => map.obj.map_type(), Map::PerCpuLruHashMap(map) => map.obj.map_type(), Map::PerfEventArray(map) => map.obj.map_type(), + Map::RingBuf(map) => map.obj.map_type(), Map::SockHash(map) => map.obj.map_type(), Map::SockMap(map) => map.obj.map_type(), Map::BloomFilter(map) => map.obj.map_type(), @@ -336,6 +341,7 @@ impl_try_from_map!( SockMap from Map::SockMap, PerfEventArray from Map::PerfEventArray, StackTraceMap from Map::StackTraceMap, + RingBuf from Map::RingBuf, ); #[cfg(feature = "async")] diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs new file mode 100644 index 00000000..523a8b6a --- /dev/null +++ b/aya/src/maps/ring_buf.rs @@ -0,0 +1,301 @@ +//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs. +//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF +//! programs to userspace. +//! +//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html + +use std::{ + io, + ops::Deref, + os::fd::{AsRawFd, RawFd}, + ptr, + sync::atomic::{fence, AtomicU32, AtomicUsize, Ordering}, +}; + +use libc::{munmap, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE}; + +use crate::{ + generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ}, + maps::{MapData, MapError}, + sys::mmap, +}; + +/// A map that can be used to receive events from eBPF programs. +/// +/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways: +/// * It's shared across all CPUs, which allows a strong ordering between events. It also makes the +/// buffer creation easier. +/// * Data notifications are delivered for every event instead of being sampled for every N event; +/// the eBPF program can also control notification delivery if sampling is desired for performance reasons. +/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly +/// written into the ring without copying from a temporary location. +/// * Dropped sample notifications goes to the eBPF program as the return value of `reserve`/`output`, +/// and not the userspace reader. This might require extra code to handle, but allows for more +/// flexible schemes to handle dropped samples. +/// +/// To receive events you need to: +/// * call [`RingBuf::try_from`] +/// * poll the returned [`RingBuf`] to be notified when events are inserted in the buffer +/// * call [`RingBuf::next`] to read the events +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 5.8. +/// +/// # Examples +/// +/// See the integration tests for usage examples. The ring_buf test demonstrates busy +/// waiting to read from the RingBuf, and the ring_buf_async test demonstrates how to +/// utilize the AsyncFd to recieve notifications. +#[doc(alias = "BPF_MAP_TYPE_RINGBUF")] +pub struct RingBuf { + _map: T, + map_fd: i32, + data_ptr: *const u8, + consumer_pos_ptr: *const AtomicUsize, + producer_pos_ptr: *const AtomicUsize, + // A copy of `*producer_pos_ptr` to reduce cache line contention. + // Might be stale, and should be refreshed once the consumer position has caught up. + producer_pos_cache: usize, + page_size: usize, + mask: usize, +} + +impl> RingBuf { + pub(crate) fn new(map: T) -> Result { + let data: &MapData = map.borrow(); + + // Determine page_size, map_fd, and set mask to map size - 1 + let page_size = crate::util::page_size(); + let map_fd = data.fd_or_err().map_err(MapError::from)?; + let mask = (data.obj.max_entries() - 1) as usize; + + // Map writable consumer page + let consumer_page = unsafe { + mmap( + ptr::null_mut(), + page_size, + PROT_READ | PROT_WRITE, + MAP_SHARED, + map_fd, + 0, + ) + }; + if consumer_page == MAP_FAILED { + return Err(MapError::SyscallError { + call: "mmap".to_string(), + io_error: io::Error::last_os_error(), + }); + } + + // From kernel/bpf/ringbuf.c: + // Each data page is mapped twice to allow "virtual" + // continuous read of samples wrapping around the end of ring + // buffer area: + // ------------------------------------------------------ + // | meta pages | real data pages | same data pages | + // ------------------------------------------------------ + // | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | + // ------------------------------------------------------ + // | | TA DA | TA DA | + // ------------------------------------------------------ + // ^^^^^^^ + // | + // Here, no need to worry about special handling of wrapped-around + // data due to double-mapped data pages. This works both in kernel and + // when mmap()'ed in user-space, simplifying both kernel and + // user-space implementations significantly. + let producer_pages = unsafe { + mmap( + ptr::null_mut(), + page_size + 2 * (mask + 1), + PROT_READ, + MAP_SHARED, + map_fd, + page_size as _, + ) + }; + if producer_pages == MAP_FAILED { + return Err(MapError::SyscallError { + call: "mmap".to_string(), + io_error: io::Error::last_os_error(), + }); + } + + Ok(RingBuf { + _map: map, + map_fd, + data_ptr: unsafe { (producer_pages as *mut u8).add(page_size) }, + consumer_pos_ptr: consumer_page as *mut _, + producer_pos_ptr: producer_pages as *mut _, + producer_pos_cache: 0, + page_size, + mask, + }) + } +} + +impl RingBuf { + /// Try to take a new entry from the ringbuf. + /// + /// Returns `Some(item)` if the ringbuf is not empty. + /// Returns `None` if the ringbuf is empty, in which case the caller may register for + /// availability notifications through `epoll` or other APIs. + // This is a streaming iterator which is not viable without GATs (stabilized in 1.65). + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> Option> { + // If `cb()` is true, do a memory barrier and test again if it's really true. + // Returns true if both tests returns true. + fn confirm_with_mb(mut cb: impl FnMut() -> bool) -> bool { + cb() && { + fence(Ordering::SeqCst); + cb() + } + } + + loop { + // Consumer pos is written by *us*. This means that we'll load the same value regardless + // of the `Ordering`. + let consumer_pos = unsafe { (*self.consumer_pos_ptr).load(Ordering::Relaxed) }; + + #[allow(clippy::blocks_in_if_conditions)] // Meaning is clearer this way + // Have we caught up? + if consumer_pos == self.producer_pos_cache { + // Cache might be stale, so test again. First, test without a costly memory barrier. + // If that says we have caught up, do a memory barrier to ensure the previous write + // is visible and test again. + // + // The memory barrier is necessary before committing to sleep due to possible race + // condition: when the kernel writes n+2, see the consumer index n, while we write + // n+1 and see the producer index n+1. If we then sleep, we'll never be waken up + // because the kernel think we haven't caught up. + if confirm_with_mb(|| { + self.producer_pos_cache = + unsafe { (*self.producer_pos_ptr).load(Ordering::Acquire) }; + consumer_pos == self.producer_pos_cache + }) { + return None; + } + } + + let sample_head = unsafe { self.data_ptr.add(consumer_pos & self.mask) }; + let mut len_and_flags = 0; // Dummy value + + // For reasons same as above, re-test with memory barrier before committing to sleep. + #[allow(clippy::blocks_in_if_conditions)] + if confirm_with_mb(|| { + len_and_flags = + unsafe { (*(sample_head as *mut AtomicU32)).load(Ordering::Acquire) }; + (len_and_flags & BPF_RINGBUF_BUSY_BIT) != 0 + }) { + return None; + } else if (len_and_flags & BPF_RINGBUF_DISCARD_BIT) != 0 { + self.consume(); + } else { + break; + } + } + + Some(RingBufItem(self)) + } + + fn consume(&mut self) { + let consumer_pos = unsafe { (*self.consumer_pos_ptr).load(Ordering::Relaxed) }; + let sample_head = unsafe { self.data_ptr.add(consumer_pos & self.mask) }; + let len_and_flags = unsafe { (*(sample_head as *mut AtomicU32)).load(Ordering::Relaxed) }; + assert_eq!( + (len_and_flags & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)), + 0 + ); + + let new_consumer_pos = consumer_pos + roundup_len(len_and_flags) as usize; + unsafe { + (*self.consumer_pos_ptr).store(new_consumer_pos, Ordering::Release); + } + } +} + +impl Drop for RingBuf { + fn drop(&mut self) { + if !self.consumer_pos_ptr.is_null() { + // SAFETY: `consumer_pos` is not null and consumer page is not null and + // consumer page was mapped with size `self.page_size` + unsafe { munmap(self.consumer_pos_ptr as *mut _, self.page_size) }; + } + + if !self.producer_pos_ptr.is_null() { + // SAFETY: `producer_pos` is not null and producer pages were mapped with size + // `self.page_size + 2 * (self.mask + 1)` + unsafe { + munmap( + self.producer_pos_ptr as *mut _, + self.page_size + 2 * (self.mask + 1), + ) + }; + } + } +} + +impl AsRawFd for RingBuf { + fn as_raw_fd(&self) -> RawFd { + self.map_fd + } +} + +/// An ringbuf item. When this item is dropped, the consumer index in the ringbuf will be updated. +pub struct RingBufItem<'a, T>(&'a mut RingBuf); + +impl<'a, T> Deref for RingBufItem<'a, T> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let consumer_pos = unsafe { (*self.0.consumer_pos_ptr).load(Ordering::Relaxed) }; + let sample_head = unsafe { self.0.data_ptr.add(consumer_pos & self.0.mask) }; + let len_and_flags = unsafe { (*(sample_head as *mut AtomicU32)).load(Ordering::Relaxed) }; + assert_eq!( + (len_and_flags & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)), + 0 + ); + + // Coerce the sample into a &[u8] + let sample_ptr = unsafe { sample_head.add(BPF_RINGBUF_HDR_SZ as usize) }; + unsafe { std::slice::from_raw_parts(sample_ptr, len_and_flags as usize) } + } +} + +impl<'a, T> Drop for RingBufItem<'a, T> { + fn drop(&mut self) { + self.0.consume(); + } +} + +/// Round up a `len` to the nearest 8 byte alignment, adding BPF_RINGBUF_HDR_SZ and +/// clearing out the upper two bits of `len`. +fn roundup_len(mut len: u32) -> u32 { + const LEN_MASK: u32 = !(BPF_RINGBUF_DISCARD_BIT | BPF_RINGBUF_BUSY_BIT); + // clear out the upper two bits (busy and discard) + len &= LEN_MASK; + // add the size of the header prefix + len += BPF_RINGBUF_HDR_SZ; + // round to up to next multiple of 8 + (len + 7) & !7 +} + +#[cfg(test)] +mod tests { + use super::{roundup_len, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ}; + + #[test] + fn test_roundup_len() { + // should always round up to nearest 8 byte alignment + BPF_RINGBUF_HDR_SZ + assert_eq!(roundup_len(0), BPF_RINGBUF_HDR_SZ); + assert_eq!(roundup_len(1), BPF_RINGBUF_HDR_SZ + 8); + assert_eq!(roundup_len(8), BPF_RINGBUF_HDR_SZ + 8); + assert_eq!(roundup_len(9), BPF_RINGBUF_HDR_SZ + 16); + // should discard the upper two bits of len + assert_eq!( + roundup_len(0 | (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)), + BPF_RINGBUF_HDR_SZ + ); + } +} diff --git a/bpf/aya-bpf/src/maps/mod.rs b/bpf/aya-bpf/src/maps/mod.rs index 8fa375dd..f117a777 100644 --- a/bpf/aya-bpf/src/maps/mod.rs +++ b/bpf/aya-bpf/src/maps/mod.rs @@ -13,6 +13,7 @@ pub mod per_cpu_array; pub mod perf; pub mod program_array; pub mod queue; +pub mod ring_buf; pub mod sock_hash; pub mod sock_map; pub mod stack; @@ -26,6 +27,7 @@ pub use per_cpu_array::PerCpuArray; pub use perf::{PerfEventArray, PerfEventByteArray}; pub use program_array::ProgramArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock_hash::SockHash; pub use sock_map::SockMap; pub use stack::Stack; diff --git a/bpf/aya-bpf/src/maps/ring_buf.rs b/bpf/aya-bpf/src/maps/ring_buf.rs new file mode 100644 index 00000000..f38eb362 --- /dev/null +++ b/bpf/aya-bpf/src/maps/ring_buf.rs @@ -0,0 +1,159 @@ +use core::{ + cell::UnsafeCell, + mem, + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; + +use crate::{ + bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF}, + helpers::{ + bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve, + bpf_ringbuf_submit, + }, + maps::PinningType, +}; + +#[repr(transparent)] +pub struct RingBuf { + def: UnsafeCell, +} + +unsafe impl Sync for RingBuf {} + +/// A ring buffer entry, returned from [`RingBuf::reserve`]. +/// +/// You must [`submit`] or [`discard`] this entry before this gets dropped. +/// +/// [`submit`]: RingBufEntry::submit +/// [`discard`]: RingBufEntry::discard +#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"] +pub struct RingBufEntry(&'static mut MaybeUninit); + +impl Deref for RingBufEntry { + type Target = MaybeUninit; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl DerefMut for RingBufEntry { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl RingBufEntry { + /// Discard this ring buffer entry. The entry will be skipped by the userspace reader. + pub fn discard(self, flags: u64) { + unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) }; + } + + /// Commit this ring buffer entry. The entry will be made visible to the userspace reader. + pub fn submit(self, flags: u64) { + unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) }; + } +} + +impl RingBuf { + /// Declare a BPF ring buffer. + /// + /// `byte_size` should be a power-of-2 multiple of the page size. If it is not, it will + /// be coerced to the next largest valid size when the program is loaded.. + pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::None) + } + + /// Declare a pinned BPF ring buffer. + /// + /// `byte_size` should be a power-of-2 multiple of the page size. If it is not, it will + /// be coerced to the next largest valid size when the program is loaded.. + pub const fn pinned(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::ByName) + } + + const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self { + Self { + def: UnsafeCell::new(bpf_map_def { + type_: BPF_MAP_TYPE_RINGBUF, + key_size: 0, + value_size: 0, + max_entries: byte_size, + map_flags: flags, + id: 0, + pinning: pinning_type as u32, + }), + } + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full, or a reference to the allocated memory if the + /// allocation succeeds. + /// + /// If the return value is not None, you must commit or discard the reserved entry through a + /// call to [`RingBufEntry::submit`] or [`RingBufEntry::discard`]. + /// + /// `T` must be aligned to 1, 2, 4 or 8 bytes; it's not possible to fulfill larger alignment + /// requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + pub fn reserve(&self, flags: u64) -> Option> { + // The reserved pointer may be null, which we handle with an Option. + // We also need to ensure that the returned pointer is of a proper sized allocation and + // satisfies T's alignment requirements. + // Finally, cast it to an MaybeUninit as creating a reference to uninitialized memory is UB. + + // ringbuf allocations are aligned to 8 bytes (hardcoded in kernel code). + assert!(8 % mem::align_of::() == 0); + + let ptr = unsafe { + bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::() as _, flags) + as *mut MaybeUninit + }; + match ptr.is_null() { + true => None, + false => Some(RingBufEntry(unsafe { &mut *ptr })), + } + } + + /// Copy `data` to the ring buffer output. + /// + /// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a + /// redundant allocation on and a copy from the stack. + /// + /// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to + /// create in eBPF but still possible, e.g. by slicing an array). + /// + /// `T` must be aligned to 1, 2, 4 or 8 bytes; it's not possible to fulfill larger alignment + /// requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + /// + /// [`reserve`]: RingBuf::reserve + /// [`submit`]: RingBufEntry::submit + pub fn output(&self, data: &T, flags: u64) -> Result<(), i64> { + // See `reserve` for alignment requirements. + assert!(8 % mem::align_of_val(data) == 0); + + let ret = unsafe { + bpf_ringbuf_output( + self.def.get() as *mut _, + data as *const _ as *mut _, + mem::size_of_val(data) as _, + flags, + ) + }; + if ret < 0 { + Err(ret) + } else { + Ok(()) + } + } + + /// Query various information about the ring buffer. + /// + /// Consult `bpf_ringbuf_query` documentation for a list of allowed flags. + pub fn query(&self, flags: u64) -> u64 { + unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) } + } +} diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index 73edf5fd..1ef5645d 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -34,4 +34,8 @@ path = "src/relocations.rs" [[bin]] name = "bpf_probe_read" -path = "src/bpf_probe_read.rs" \ No newline at end of file +path = "src/bpf_probe_read.rs" + +[[bin]] +name = "ring_buf" +path = "src/ring_buf.rs" diff --git a/test/integration-ebpf/src/ring_buf.rs b/test/integration-ebpf/src/ring_buf.rs new file mode 100644 index 00000000..3692c4ac --- /dev/null +++ b/test/integration-ebpf/src/ring_buf.rs @@ -0,0 +1,40 @@ +#![no_std] +#![no_main] + +use aya_bpf::{ + macros::{map, uprobe}, + maps::RingBuf, + programs::ProbeContext, +}; +use core::mem::size_of; + +// Make a buffer large enough to hold MAX_ENTRIES entries at the same time. +// This requires taking into consideration the header size. +type Entry = u64; +const MAX_ENTRIES: usize = 1024; +const HDR_SIZE: usize = aya_bpf::bindings::BPF_RINGBUF_HDR_SZ as usize; + +// Add 1 because the capacity is actually one less than you might think +// because the consumer_pos and producer_pos being equal would mean that +// the buffer is empty. +const RING_BUF_SIZE: usize = ((size_of::() + HDR_SIZE) * MAX_ENTRIES) + 1; + +#[map] +static RING_BUF: RingBuf = RingBuf::with_byte_size(RING_BUF_SIZE as u32, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + // Write the first argument to the function back out to RING_BUF. + let Some(arg): Option = ctx.arg(0) else { + return; + }; + if let Some(mut entry) = RING_BUF.reserve::(0) { + entry.write(arg); + entry.submit(0); + } +} + +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + loop {} +} diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index 07eaa927..0458ebfd 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -16,3 +16,4 @@ rbpf = "0.2.0" regex = "1" tempfile = "3.3.0" tokio = { version = "1.24", features = ["rt", "rt-multi-thread", "sync", "time"] } +rand = { version = "0.8.5" } diff --git a/test/integration-test/tests/ring_buf.rs b/test/integration-test/tests/ring_buf.rs new file mode 100644 index 00000000..5ce8c59f --- /dev/null +++ b/test/integration-test/tests/ring_buf.rs @@ -0,0 +1,56 @@ +use aya::{include_bytes_aligned, maps::ring_buf::RingBuf, programs::UProbe, Bpf}; + +#[test] +fn ring_buf() { + let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf"); + let mut bpf = Bpf::load(bytes).unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); + let mut ring_buf = RingBuf::try_from(ring_buf).unwrap(); + + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + Some("ring_buf_trigger_ebpf_program"), + 0, + "/proc/self/exe", + None, + ) + .unwrap(); + + // Generate some random data. + let data = gen_data(); + + // Call the function that the uprobe is attached to with randomly generated data. + for val in &data { + ring_buf_trigger_ebpf_program(*val); + } + // Read the data back out of the ring buffer. + let mut seen = Vec::::new(); + while seen.len() < data.len() { + if let Some(item) = ring_buf.next() { + let item: [u8; 8] = (*item).try_into().unwrap(); + let arg = u64::from_ne_bytes(item); + seen.push(arg); + } + } + // Ensure that the data that was read matches what was passed. + assert_eq!(seen, data); +} + +#[no_mangle] +#[inline(never)] +pub extern "C" fn ring_buf_trigger_ebpf_program(_arg: u64) {} + +/// Generate a variable length vector of u64s. The number of values is always small enough to fit +/// into the RING_BUF defined in the probe. +pub(crate) fn gen_data() -> Vec { + const DATA_LEN_RANGE: core::ops::RangeInclusive = 1..=1024; + use rand::Rng as _; + let mut rng = rand::thread_rng(); + let n = rng.gen_range(DATA_LEN_RANGE); + std::iter::repeat_with(|| rng.gen()).take(n).collect() +} diff --git a/test/integration-test/tests/ring_buf_async.rs b/test/integration-test/tests/ring_buf_async.rs new file mode 100644 index 00000000..42388f3b --- /dev/null +++ b/test/integration-test/tests/ring_buf_async.rs @@ -0,0 +1,72 @@ +use std::os::fd::AsRawFd as _; + +use aya::maps::RingBuf; + +mod ring_buf; +use aya::{include_bytes_aligned, programs::UProbe, Bpf}; +use ring_buf::{gen_data, ring_buf_trigger_ebpf_program}; +use tokio::{ + io::unix::AsyncFd, + task::spawn, + time::{sleep, Duration}, +}; + +#[tokio::test] +async fn ring_buf_async() { + let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf"); + let mut bpf = Bpf::load(bytes).unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); + let mut ring_buf = RingBuf::try_from(ring_buf).unwrap(); + + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + Some("ring_buf_trigger_ebpf_program"), + 0, + "/proc/self/exe", + None, + ) + .unwrap(); + + // Generate some random data. + let data = gen_data(); + let write_handle = spawn(call_ring_buf_trigger_ebpf_program_over_time(data.clone())); + + // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. + let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap(); + let seen = { + let mut seen = Vec::with_capacity(data.len()); + while seen.len() < data.len() { + // Wait for readiness, then clear the bit before reading so that no notifications + // are missed. + async_fd.readable().await.unwrap().clear_ready(); + while let Some(data) = ring_buf.next() { + let data: [u8; 8] = (*data).try_into().unwrap(); + let arg = u64::from_ne_bytes(data); + seen.push(arg); + } + } + seen + }; + + // Ensure that the data that was read matches what was passed. + assert_eq!(seen, data); + write_handle.await.unwrap(); +} + +async fn call_ring_buf_trigger_ebpf_program_over_time(data: Vec) { + let random_duration = || { + use rand::Rng as _; + let mut rng = rand::thread_rng(); + let micros = rng.gen_range(0..1_000); + Duration::from_micros(micros) + }; + for value in data { + sleep(random_duration()).await; + ring_buf_trigger_ebpf_program(value); + } +}