aya: Implement RingBuf

This implements the userspace binding for RingBuf.

Instead of streaming the samples as heap buffers, the process_ring
function takes a callback to which we pass the event's byte region,
roughly following [libbpf]'s API design. This avoids a copy and allows
marking the consumer pointer in a timely manner.

[libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c

Additionally, integration tests are added to demonstrate the usage
of the new APIs and to ensure that they work end-to-end.

Co-authored-by: William Findlay <william@williamfindlay.com>
Co-authored-by: Tatsuyuki Ishi <ishitatsuyuki@gmail.com>
reviewable/pr629/r12
Andrew Werner 2 years ago
parent 9244b768c3
commit e72a32cb0e

@ -39,7 +39,7 @@ use crate::{
is_btf_supported, is_btf_type_tag_supported, is_perf_link_supported, is_btf_supported, is_btf_type_tag_supported, is_perf_link_supported,
is_probe_read_kernel_supported, is_prog_name_supported, retry_with_verifier_logs, is_probe_read_kernel_supported, is_prog_name_supported, retry_with_verifier_logs,
}, },
util::{bytes_of, bytes_of_slice, possible_cpus, POSSIBLE_CPUS}, util::{bytes_of, bytes_of_slice, page_size, possible_cpus, POSSIBLE_CPUS},
}; };
pub(crate) const BPF_OBJ_NAME_LEN: usize = 16; pub(crate) const BPF_OBJ_NAME_LEN: usize = 16;
@ -382,23 +382,23 @@ impl<'a> BpfLoader<'a> {
{ {
continue; continue;
} }
let num_cpus = || -> Result<u32, BpfError> {
match max_entries.get(name.as_str()) { Ok(possible_cpus()
Some(size) => obj.set_max_entries(*size), .map_err(|error| BpfError::FileError {
None => { path: PathBuf::from(POSSIBLE_CPUS),
if obj.map_type() == BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 error,
&& obj.max_entries() == 0 })?
{ .len() as u32)
obj.set_max_entries( };
possible_cpus() let map_type: bpf_map_type = obj.map_type().try_into().map_err(MapError::from)?;
.map_err(|error| BpfError::FileError { if let Some(max_entries) = max_entries_override(
path: PathBuf::from(POSSIBLE_CPUS), map_type,
error, max_entries.get(name.as_str()).copied(),
})? || obj.max_entries(),
.len() as u32, num_cpus,
); || page_size() as u32,
} )? {
} obj.set_max_entries(max_entries)
} }
let mut map = MapData { let mut map = MapData {
obj, obj,
@ -637,6 +637,7 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> {
BPF_MAP_TYPE_PERCPU_HASH => Ok(Map::PerCpuHashMap(map)), BPF_MAP_TYPE_PERCPU_HASH => Ok(Map::PerCpuHashMap(map)),
BPF_MAP_TYPE_LRU_PERCPU_HASH => Ok(Map::PerCpuLruHashMap(map)), BPF_MAP_TYPE_LRU_PERCPU_HASH => Ok(Map::PerCpuLruHashMap(map)),
BPF_MAP_TYPE_PERF_EVENT_ARRAY => Ok(Map::PerfEventArray(map)), BPF_MAP_TYPE_PERF_EVENT_ARRAY => Ok(Map::PerfEventArray(map)),
BPF_MAP_TYPE_RINGBUF => Ok(Map::RingBuf(map)),
BPF_MAP_TYPE_SOCKHASH => Ok(Map::SockHash(map)), BPF_MAP_TYPE_SOCKHASH => Ok(Map::SockHash(map)),
BPF_MAP_TYPE_SOCKMAP => Ok(Map::SockMap(map)), BPF_MAP_TYPE_SOCKMAP => Ok(Map::SockMap(map)),
BPF_MAP_TYPE_BLOOM_FILTER => Ok(Map::BloomFilter(map)), BPF_MAP_TYPE_BLOOM_FILTER => Ok(Map::BloomFilter(map)),
@ -652,6 +653,106 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> {
Ok((name, map)) Ok((name, map))
} }
/// Computes the value which should be used to override the max_entries value of the map
/// based on the user-provided override and the rules for that map type.
fn max_entries_override(
map_type: bpf_map_type,
user_override: Option<u32>,
current_value: impl Fn() -> u32,
num_cpus: impl Fn() -> Result<u32, BpfError>,
page_size: impl Fn() -> u32,
) -> Result<Option<u32>, BpfError> {
let max_entries = || user_override.unwrap_or_else(&current_value);
Ok(match map_type {
BPF_MAP_TYPE_PERF_EVENT_ARRAY if max_entries() == 0 => Some(num_cpus()?),
BPF_MAP_TYPE_RINGBUF => Some(adjust_to_page_size(max_entries(), page_size()))
.filter(|adjusted| *adjusted != max_entries())
.or(user_override),
_ => user_override,
})
}
// Adjusts the byte size of a RingBuf map to match a power-of-two multiple of the page size.
//
// This mirrors the logic used by libbpf.
// See https://github.com/libbpf/libbpf/blob/ec6f716eda43/src/libbpf.c#L2461-L2463
fn adjust_to_page_size(byte_size: u32, page_size: u32) -> u32 {
// If the byte_size is zero, return zero and let the verifier reject the map
// when it is loaded. This is the behavior of libbpf.
if byte_size == 0 {
return 0;
}
// TODO: Replace with primitive method when int_roundings (https://github.com/rust-lang/rust/issues/88581)
// is stabilized.
fn div_ceil(n: u32, rhs: u32) -> u32 {
let d = n / rhs;
let r = n % rhs;
if r > 0 && rhs > 0 {
d + 1
} else {
d
}
}
let pages_needed = div_ceil(byte_size, page_size);
page_size * pages_needed.next_power_of_two()
}
#[cfg(test)]
mod tests {
use crate::generated::bpf_map_type::*;
const PAGE_SIZE: u32 = 4096;
const NUM_CPUS: u32 = 4;
#[test]
fn test_adjust_to_page_size() {
use super::adjust_to_page_size;
[
(0, 0),
(4096, 1),
(4096, 4095),
(4096, 4096),
(8192, 4097),
(8192, 8192),
(16384, 8193),
]
.into_iter()
.for_each(|(exp, input)| assert_eq!(exp, adjust_to_page_size(input, PAGE_SIZE)))
}
#[test]
fn test_max_entries_override() {
use super::max_entries_override;
[
(BPF_MAP_TYPE_RINGBUF, Some(1), 1, Some(PAGE_SIZE)),
(BPF_MAP_TYPE_RINGBUF, None, 1, Some(PAGE_SIZE)),
(BPF_MAP_TYPE_RINGBUF, None, PAGE_SIZE, None),
(BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 1, None),
(BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(42), 1, Some(42)),
(BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(0), 1, Some(NUM_CPUS)),
(BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 0, Some(NUM_CPUS)),
(BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 42, None),
(BPF_MAP_TYPE_ARRAY, None, 1, None),
(BPF_MAP_TYPE_ARRAY, Some(2), 1, Some(2)),
]
.into_iter()
.for_each(|(map_type, user_override, current_value, exp)| {
assert_eq!(
exp,
max_entries_override(
map_type,
user_override,
|| { current_value },
|| Ok(NUM_CPUS),
|| PAGE_SIZE
)
.unwrap()
)
})
}
}
impl<'a> Default for BpfLoader<'a> { impl<'a> Default for BpfLoader<'a> {
fn default() -> Self { fn default() -> Self {
BpfLoader::new() BpfLoader::new()

@ -70,6 +70,7 @@ pub mod hash_map;
pub mod lpm_trie; pub mod lpm_trie;
pub mod perf; pub mod perf;
pub mod queue; pub mod queue;
pub mod ring_buf;
pub mod sock; pub mod sock;
pub mod stack; pub mod stack;
pub mod stack_trace; pub mod stack_trace;
@ -83,6 +84,7 @@ pub use lpm_trie::LpmTrie;
pub use perf::AsyncPerfEventArray; pub use perf::AsyncPerfEventArray;
pub use perf::PerfEventArray; pub use perf::PerfEventArray;
pub use queue::Queue; pub use queue::Queue;
pub use ring_buf::RingBuf;
pub use sock::{SockHash, SockMap}; pub use sock::{SockHash, SockMap};
pub use stack::Stack; pub use stack::Stack;
pub use stack_trace::StackTraceMap; pub use stack_trace::StackTraceMap;
@ -256,6 +258,8 @@ pub enum Map {
PerCpuLruHashMap(MapData), PerCpuLruHashMap(MapData),
/// A [`PerfEventArray`] map /// A [`PerfEventArray`] map
PerfEventArray(MapData), PerfEventArray(MapData),
/// A [`RingBuf`] map
RingBuf(MapData),
/// A [`SockMap`] map /// A [`SockMap`] map
SockMap(MapData), SockMap(MapData),
/// A [`SockHash`] map /// A [`SockHash`] map
@ -284,6 +288,7 @@ impl Map {
Map::PerCpuHashMap(map) => map.obj.map_type(), Map::PerCpuHashMap(map) => map.obj.map_type(),
Map::PerCpuLruHashMap(map) => map.obj.map_type(), Map::PerCpuLruHashMap(map) => map.obj.map_type(),
Map::PerfEventArray(map) => map.obj.map_type(), Map::PerfEventArray(map) => map.obj.map_type(),
Map::RingBuf(map) => map.obj.map_type(),
Map::SockHash(map) => map.obj.map_type(), Map::SockHash(map) => map.obj.map_type(),
Map::SockMap(map) => map.obj.map_type(), Map::SockMap(map) => map.obj.map_type(),
Map::BloomFilter(map) => map.obj.map_type(), Map::BloomFilter(map) => map.obj.map_type(),
@ -345,6 +350,7 @@ impl_try_from_map!(
SockMap from Map::SockMap, SockMap from Map::SockMap,
PerfEventArray from Map::PerfEventArray, PerfEventArray from Map::PerfEventArray,
StackTraceMap from Map::StackTraceMap, StackTraceMap from Map::StackTraceMap,
RingBuf from Map::RingBuf,
); );
#[cfg(feature = "async")] #[cfg(feature = "async")]

@ -0,0 +1,335 @@
//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs.
//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF
//! programs to userspace.
//!
//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html
use crate::{
generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ},
maps::{MapData, MapError},
sys::mmap,
};
use libc::{c_int, c_void, munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
use std::{
io,
ops::Deref,
os::fd::{AsRawFd, RawFd},
ptr,
ptr::NonNull,
sync::atomic::{fence, AtomicU32, Ordering},
};
/// A map that can be used to receive events from eBPF programs.
///
/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways:
/// * It's shared across all CPUs, which allows a strong ordering between events.
/// * Data notifications are delivered precisely instead of being sampled for every N events;
/// the eBPF program can also control notification delivery if sampling is desired for performance
/// reasons. By default, a notification will be sent if the consumer is caught up at the time of
/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to
/// control this behavior.
/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly
/// written into the ring without copying from a temporary location.
/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`,
/// and not the userspace reader. This might require extra code to handle, but allows for more
/// flexible schemes to handle dropped samples.
///
/// To receive events you need to:
/// * Construct [`RingBuf`] using [`RingBuf::try_from`].
/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`].
///
/// To receive async notifications of data availability, you clients may construct an
/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 5.8.
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
pub struct RingBuf<T> {
_map: T,
map_fd: i32,
consumer: ConsumerPos,
producer: ProducerData,
}
impl<T: core::borrow::Borrow<MapData>> RingBuf<T> {
pub(crate) fn new(map: T) -> Result<Self, MapError> {
let data: &MapData = map.borrow();
let page_size = crate::util::page_size();
let map_fd = data.fd_or_err().map_err(MapError::from)?;
let byte_size = data.obj.max_entries();
let consumer = ConsumerPos::new(map_fd, page_size)?;
let producer = ProducerData::new(map_fd, page_size, byte_size)?;
Ok(RingBuf {
_map: map,
map_fd,
consumer,
producer,
})
}
}
impl<T> RingBuf<T> {
/// Try to take a new entry from the ringbuf.
///
/// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in
/// which case the caller may register for availability notifications through `epoll` or other
/// APIs. Only one RingBufItem may be outstanding at a time.
//
// This is not an implementation of `Iterator` because we need to be able to refer
// to the lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item
// leveraged GATs, one could imagine an implementation of `Iterator` that would work.
// GATs are stabilized in Rust 1.65, but there's not yet a trait that the community
// seems to have standardized around.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
let Self {
consumer, producer, ..
} = self;
producer.next(consumer)
}
}
/// Access to the RawFd can be used to construct an AsyncFd for use with epoll.
impl<T> AsRawFd for RingBuf<T> {
fn as_raw_fd(&self) -> RawFd {
self.map_fd
}
}
/// The current outstanding item read from the ringbuf.
pub struct RingBufItem<'a> {
data: &'a [u8],
consumer: &'a mut ConsumerPos,
}
impl Deref for RingBufItem<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
let Self { data, .. } = self;
data
}
}
impl Drop for RingBufItem<'_> {
fn drop(&mut self) {
let Self { consumer, data } = self;
consumer.consume(data.len());
}
}
// ConsumerPos corresponds to the consumer metadata page of the RingBuf.
struct ConsumerPos(MMap);
impl ConsumerPos {
fn new(fd: RawFd, page_size: usize) -> Result<Self, MapError> {
Ok(Self(MMap::new(
fd,
page_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
0,
)?))
}
fn load(&self) -> u32 {
self.get_ref().load(Ordering::Relaxed)
}
fn consume(&mut self, len: usize) -> u32 {
// TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized.
fn next_multiple_of(n: u32, multiple: u32) -> u32 {
match n % multiple {
0 => n,
rem => n + (multiple - rem),
}
}
let to_add = next_multiple_of(len as u32 + BPF_RINGBUF_HDR_SZ, 8);
to_add + self.get_ref().fetch_add(to_add, Ordering::Release)
}
fn get_ref(&self) -> &AtomicU32 {
let Self(MMap { ptr, .. }) = self;
unsafe { ptr.cast::<AtomicU32>().as_ref() }
}
}
struct ProducerData {
memmap: MMap,
page_size: usize,
mask: u32,
pos_cache: u32,
}
impl ProducerData {
fn new(fd: RawFd, page_size: usize, byte_size: u32) -> Result<Self, MapError> {
// The producer pages have one page of metadata and then the data pages, all mapped
// read-only. Note that the length of the mapping include the data pages twice
// as the kernel will map them two time consecutively to avoid special handling
// of entries cross over the end of the ring buffer.
//
// From kernel/bpf/ringbuf.c [0]:
//
// Each data page is mapped twice to allow "virtual"
// continuous read of samples wrapping around the end of ring
// buffer area:
// ------------------------------------------------------
// | meta pages | real data pages | same data pages |
// ------------------------------------------------------
// | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
// ------------------------------------------------------
// | | TA DA | TA DA |
// ------------------------------------------------------
// ^^^^^^^
// |
// Here, no need to worry about special handling of wrapped-around
// data due to double-mapped data pages. This works both in kernel and
// when mmap()'ed in user-space, simplifying both kernel and
// user-space implementations significantly.
//
// [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124
let len = page_size + 2 * byte_size as usize;
let memmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, page_size as off_t)?;
Ok(Self {
memmap,
page_size,
pos_cache: 0,
mask: byte_size - 1,
})
}
fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
let Self {
ref memmap,
page_size,
pos_cache,
mask,
} = self;
let pos = unsafe { memmap.ptr.cast().as_ref() };
let data: &[u8] = &memmap.as_ref()[*page_size..];
let mut consumer_pos = consumer.load();
while data_available(pos, pos_cache, consumer_pos) {
match read_item(data, *mask, consumer_pos) {
Item::Busy => return None,
Item::Data(data) => return Some(RingBufItem { data, consumer }),
Item::Discard { len } => consumer_pos = consumer.consume(len),
}
}
return None;
bitflags! {
#[derive(Clone, Copy)]
struct Header: u32 {
const BUSY = BPF_RINGBUF_BUSY_BIT;
const DISCARD = BPF_RINGBUF_DISCARD_BIT;
}
}
impl Header {
fn len(self, mask: u32) -> usize {
const LEN_MASK: u32 = !Header::all().bits();
(self.bits() & LEN_MASK & mask) as usize
}
}
enum Item<'a> {
Data(&'a [u8]),
Discard { len: usize },
Busy,
}
fn retry_with_barrier<T: Copy>(f: impl Fn() -> T, should_retry: impl Fn(T) -> bool) -> T {
let val = f();
if !should_retry(val) {
return val;
}
fence(Ordering::SeqCst);
f()
}
fn data_available(producer: &AtomicU32, cache: &mut u32, consumer: u32) -> bool {
debug_assert!(
consumer <= *cache,
"consumer={} > producer={}",
consumer,
*cache
);
if consumer < *cache {
true
} else {
let prev = *cache;
*cache = retry_with_barrier(|| producer.load(Ordering::Acquire), |v| v == prev);
consumer < *cache
}
}
fn read_item(data: &[u8], mask: u32, offset: u32) -> Item {
let offset = offset & mask;
let header_ptr = data[offset as usize..].as_ptr() as *const AtomicU32;
let header_ref = unsafe { &*header_ptr };
let header = retry_with_barrier(
|| Header::from_bits_retain(header_ref.load(Ordering::Acquire)),
|header| header.contains(Header::BUSY),
);
if header.contains(Header::BUSY) {
Item::Busy
} else {
let len = header.len(mask);
if header.contains(Header::DISCARD) {
Item::Discard { len }
} else {
let data_offset = offset as usize + BPF_RINGBUF_HDR_SZ as usize;
Item::Data(&data[data_offset..data_offset + len])
}
}
}
}
}
// MMap corresponds to a memory-mapped region.
// The data is unmapped in Drop.
struct MMap {
ptr: NonNull<c_void>,
len: usize,
}
impl MMap {
fn new(
fd: RawFd,
len: usize,
prot: c_int,
flags: c_int,
offset: off_t,
) -> Result<Self, MapError> {
match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } {
MAP_FAILED => Err(MapError::SyscallError {
call: "mmap",
io_error: io::Error::last_os_error(),
}),
// This should never happen, but to be paranoid, and so we never
// need to talk about a null pointer, we check it anyway.
res => Ok(Self {
ptr: std::ptr::NonNull::new(res).ok_or(MapError::SyscallError {
call: "mmap",
io_error: io::Error::new(io::ErrorKind::Other, "mmap returned null pointer"),
})?,
len,
}),
}
}
}
impl AsRef<[u8]> for MMap {
fn as_ref(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast(), self.len) }
}
}
impl Drop for MMap {
fn drop(&mut self) {
unsafe {
munmap(self.ptr.as_ptr(), self.len);
}
}
}

@ -11,3 +11,7 @@ aya-bpf-bindings = { path = "../aya-bpf-bindings" }
[build-dependencies] [build-dependencies]
rustversion = "1.0" rustversion = "1.0"
[features]
default = []
const_assert = []

@ -8,6 +8,11 @@
html_logo_url = "https://aya-rs.dev/assets/images/crabby.svg", html_logo_url = "https://aya-rs.dev/assets/images/crabby.svg",
html_favicon_url = "https://aya-rs.dev/assets/images/crabby.svg" html_favicon_url = "https://aya-rs.dev/assets/images/crabby.svg"
)] )]
#![cfg_attr(
feature = "const_assert",
allow(incomplete_features),
feature(generic_const_exprs)
)]
#![cfg_attr(unstable, feature(never_type))] #![cfg_attr(unstable, feature(never_type))]
#![cfg_attr(target_arch = "bpf", feature(asm_experimental_arch))] #![cfg_attr(target_arch = "bpf", feature(asm_experimental_arch))]
#![allow(clippy::missing_safety_doc)] #![allow(clippy::missing_safety_doc)]

@ -13,6 +13,7 @@ pub mod per_cpu_array;
pub mod perf; pub mod perf;
pub mod program_array; pub mod program_array;
pub mod queue; pub mod queue;
pub mod ring_buf;
pub mod sock_hash; pub mod sock_hash;
pub mod sock_map; pub mod sock_map;
pub mod stack; pub mod stack;
@ -26,6 +27,7 @@ pub use per_cpu_array::PerCpuArray;
pub use perf::{PerfEventArray, PerfEventByteArray}; pub use perf::{PerfEventArray, PerfEventByteArray};
pub use program_array::ProgramArray; pub use program_array::ProgramArray;
pub use queue::Queue; pub use queue::Queue;
pub use ring_buf::RingBuf;
pub use sock_hash::SockHash; pub use sock_hash::SockHash;
pub use sock_map::SockMap; pub use sock_map::SockMap;
pub use stack::Stack; pub use stack::Stack;

@ -0,0 +1,194 @@
use core::{
cell::UnsafeCell,
mem,
mem::MaybeUninit,
ops::{Deref, DerefMut},
};
use crate::{
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF},
helpers::{
bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve,
bpf_ringbuf_submit,
},
maps::PinningType,
};
#[repr(transparent)]
pub struct RingBuf {
def: UnsafeCell<bpf_map_def>,
}
unsafe impl Sync for RingBuf {}
/// A ring buffer entry, returned from [`RingBuf::reserve`].
///
/// You must [`submit`] or [`discard`] this entry before it gets dropped.
///
/// [`submit`]: RingBufEntry::submit
/// [`discard`]: RingBufEntry::discard
#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"]
pub struct RingBufEntry<T: 'static>(&'static mut MaybeUninit<T>);
impl<T> Deref for RingBufEntry<T> {
type Target = MaybeUninit<T>;
fn deref(&self) -> &Self::Target {
self.0
}
}
impl<T> DerefMut for RingBufEntry<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
}
}
impl<T> RingBufEntry<T> {
/// Discard this ring buffer entry. The entry will be skipped by the userspace reader.
pub fn discard(self, flags: u64) {
unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) };
}
/// Commit this ring buffer entry. The entry will be made visible to the userspace reader.
pub fn submit(self, flags: u64) {
unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) };
}
}
impl RingBuf {
/// Declare a BPF ring buffer.
///
/// The linux kernel requires that the `byte_size` be a power-of-2 multiple of the page size.
/// The loading program may coerce the size when loading the map.
pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self {
Self::new(byte_size, flags, PinningType::None)
}
/// Declare a pinned BPF ring buffer.
///
/// The linux kernel requires that the `byte_size` be a power-of-2 multiple of the page size.
/// The loading program may coerce the size when loading the map.
pub const fn pinned(byte_size: u32, flags: u32) -> Self {
Self::new(byte_size, flags, PinningType::ByName)
}
const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self {
Self {
def: UnsafeCell::new(bpf_map_def {
type_: BPF_MAP_TYPE_RINGBUF,
key_size: 0,
value_size: 0,
max_entries: byte_size,
map_flags: flags,
id: 0,
pinning: pinning_type as u32,
}),
}
}
/// Reserve memory in the ring buffer that can fit `T`.
///
/// Returns `None` if the ring buffer is full.
#[cfg(feature = "const_assert")]
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>>
where
Assert<{ 8 % core::mem::align_of::<T>() == 0 }>: IsTrue,
{
self.reserve_impl(flags)
}
/// Reserve memory in the ring buffer that can fit `T`.
///
/// Returns `None` if the ring buffer is full.
///
/// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger
/// alignment requests. If you use this with a `T` that isn't properly aligned, this function will
/// be compiled to a panic and silently make your eBPF program fail to load.
/// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418).
#[cfg(not(feature = "const_assert"))]
pub fn reserve<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
assert_eq!(8 % core::mem::align_of::<T>(), 0);
self.reserve_impl(flags)
}
fn reserve_impl<T: 'static>(&self, flags: u64) -> Option<RingBufEntry<T>> {
let ptr = unsafe {
bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::<T>() as _, flags)
as *mut MaybeUninit<T>
};
match ptr.is_null() {
true => None,
false => Some(RingBufEntry(unsafe { &mut *ptr })),
}
}
/// Copy `data` to the ring buffer output.
///
/// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a
/// copy from either a map buffer or the stack.
///
/// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to
/// create in eBPF but still possible, e.g. by slicing an array).
///
/// [`reserve`]: RingBuf::reserve
/// [`submit`]: RingBufEntry::submit
#[cfg(feature = "const_assert")]
pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64>
where
Assert<{ 8 % core::mem::align_of::<&T>() == 0 }>: IsTrue,
{
self.output_impl(data, flags)
}
/// Copy `data` to the ring buffer output.
///
/// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a
/// copy from either a map buffer or the stack.
///
/// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to
/// create in eBPF but still possible, e.g. by slicing an array).
///
/// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger
/// alignment requests. If you use this with a `T` that isn't properly aligned, this function will
/// be compiled to a panic and silently make your eBPF program fail to load.
/// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418).
///
/// [`reserve`]: RingBuf::reserve
/// [`submit`]: RingBufEntry::submit
#[cfg(not(feature = "const_assert"))]
pub fn output<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> {
assert_eq!(8 % core::mem::align_of::<&T>(), 0);
self.output_impl(data, flags)
}
fn output_impl<T: ?Sized>(&self, data: &T, flags: u64) -> Result<(), i64> {
let ret = unsafe {
bpf_ringbuf_output(
self.def.get() as *mut _,
data as *const _ as *mut _,
mem::size_of_val(data) as _,
flags,
)
};
if ret < 0 {
Err(ret)
} else {
Ok(())
}
}
/// Query various information about the ring buffer.
///
/// Consult `bpf_ringbuf_query` documentation for a list of allowed flags.
pub fn query(&self, flags: u64) -> u64 {
unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) }
}
}
#[cfg(feature = "const_assert")]
pub struct Assert<const COND: bool> {}
#[cfg(feature = "const_assert")]
pub trait IsTrue {}
#[cfg(feature = "const_assert")]
impl IsTrue for Assert<true> {}

@ -34,4 +34,8 @@ path = "src/relocations.rs"
[[bin]] [[bin]]
name = "bpf_probe_read" name = "bpf_probe_read"
path = "src/bpf_probe_read.rs" path = "src/bpf_probe_read.rs"
[[bin]]
name = "ring_buf"
path = "src/ring_buf.rs"

@ -0,0 +1,56 @@
#![no_std]
#![no_main]
use aya_bpf::{
macros::{map, uprobe},
maps::{Array, RingBuf},
programs::ProbeContext,
};
use core::mem::size_of;
// Make a buffer large enough to hold MAX_ENTRIES entries at the same time.
// This requires taking into consideration the header size.
type Entry = u64;
const MAX_ENTRIES: usize = 1024;
const HDR_SIZE: usize = aya_bpf::bindings::BPF_RINGBUF_HDR_SZ as usize;
// Add 1 because the capacity at any given time is actually one less than
// you might think because the consumer_pos and producer_pos being equal
// would mean that the buffer is empty. The synchronous test fills the
// buffer, hence this logic.
const RING_BUF_SIZE: usize = ((size_of::<Entry>() + HDR_SIZE) * MAX_ENTRIES) + 1;
#[map]
static RING_BUF: RingBuf = RingBuf::with_byte_size(RING_BUF_SIZE as u32, 0);
#[map]
static REJECTED: Array<u32> = Array::with_max_entries(1, 0);
#[uprobe]
pub fn ring_buf_test(ctx: ProbeContext) {
let mut entry = match RING_BUF.reserve::<Entry>(0) {
Some(entry) => entry,
None => return,
};
// Write the first argument to the function back out to RING_BUF if it is even,
// otherwise increment the counter in REJECTED. This exercises discarding data.
let arg: Entry = match ctx.arg(0) {
Some(arg) => arg,
None => return,
};
if arg % 2 == 0 {
entry.write(arg);
entry.submit(0);
} else {
entry.discard(0);
if let Some(v) = REJECTED.get_ptr_mut(0) {
unsafe { *v += 1 }
};
}
}
#[panic_handler]
fn panic(_info: &core::panic::PanicInfo) -> ! {
loop {}
}

@ -9,9 +9,11 @@ anyhow = "1"
aya = { path = "../../aya" } aya = { path = "../../aya" }
aya-log = { path = "../../aya-log" } aya-log = { path = "../../aya-log" }
aya-obj = { path = "../../aya-obj" } aya-obj = { path = "../../aya-obj" }
futures = "0.3.28"
libc = { version = "0.2.105" } libc = { version = "0.2.105" }
log = "0.4" log = "0.4"
object = { version = "0.31", default-features = false, features = ["std", "read_core", "elf"] } object = { version = "0.31", default-features = false, features = ["std", "read_core", "elf"] }
rand = { version = "0.8.5" }
rbpf = "0.2.0" rbpf = "0.2.0"
tempfile = "3.3.0" tempfile = "3.3.0"
tokio = { version = "1.24", features = ["rt", "rt-multi-thread", "sync", "time"] } tokio = { version = "1.24", features = ["rt", "rt-multi-thread", "sync", "time"] }

@ -0,0 +1,160 @@
use anyhow::Context as _;
use aya::{
include_bytes_aligned,
maps::{array::Array, ring_buf::RingBuf},
programs::UProbe,
Bpf, BpfLoader, Btf,
};
use std::os::fd::AsRawFd as _;
use tokio::{
io::unix::AsyncFd,
time::{sleep, Duration},
};
#[test]
fn ring_buf() {
let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf");
// Add 1 because the capacity at any given time is actually one less than
// you might think because the consumer_pos and producer_pos being equal
// would mean that the buffer is empty.
let ring_buf_max_entries = RING_BUF_MAX_ENTRIES + 1;
BpfLoader::new()
.btf(Btf::from_sys_fs().ok().as_ref())
.set_max_entries("RING_BUF", ring_buf_max_entries)
.load(bytes);
let mut bpf = Bpf::load(bytes).unwrap();
let ring_buf = bpf.take_map("RING_BUF").unwrap();
let mut ring_buf = RingBuf::try_from(ring_buf).unwrap();
let rejected = bpf.take_map("REJECTED").unwrap();
let rejected = Array::<_, u32>::try_from(rejected).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
Some("ring_buf_trigger_ebpf_program"),
0,
"/proc/self/exe",
None,
)
.unwrap();
// Generate some random data.
let data = gen_data();
// Call the function that the uprobe is attached to with randomly generated data.
for val in &data {
ring_buf_trigger_ebpf_program(*val);
}
// Read the data back out of the ring buffer, expect only the even numbers.
let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
let mut seen = Vec::<u64>::new();
while seen.len() < expected.len() {
if let Some(item) = ring_buf.next() {
let item: [u8; 8] = (*item).try_into().unwrap();
let arg = u64::from_ne_bytes(item);
seen.push(arg);
}
}
// Ensure that the data that was read matches what was passed, and the
// rejected count was set properly.
assert_eq!(seen, expected);
let rejected: usize = rejected.get(&0, 0).unwrap().try_into().unwrap();
let expected_rejected = data.len() - expected.len();
assert_eq!(rejected, expected_rejected)
}
#[no_mangle]
#[inline(never)]
pub extern "C" fn ring_buf_trigger_ebpf_program(_arg: u64) {}
const RING_BUF_MAX_ENTRIES: u32 = 1024; // corresponds to probe ringbuf size config
/// Generate a variable length vector of u64s. The number of values is always small enough to fit
/// into the RING_BUF defined in the probe.
pub(crate) fn gen_data() -> Vec<u64> {
const DATA_LEN_RANGE: core::ops::RangeInclusive<usize> = 1..=RING_BUF_MAX_ENTRIES as usize;
use rand::Rng as _;
let mut rng = rand::thread_rng();
let n = rng.gen_range(DATA_LEN_RANGE);
std::iter::repeat_with(|| rng.gen()).take(n).collect()
}
#[tokio::test]
async fn ring_buf_async() {
let bytes = include_bytes_aligned!("../../../target/bpfel-unknown-none/release/ring_buf");
BpfLoader::new()
.btf(Btf::from_sys_fs().ok().as_ref())
.set_max_entries("RING_BUF", RING_BUF_MAX_ENTRIES)
.load(bytes);
let mut bpf = Bpf::load(bytes).unwrap();
let ring_buf = bpf.take_map("RING_BUF").unwrap();
let mut ring_buf = RingBuf::try_from(ring_buf).unwrap();
let rejected = bpf.take_map("REJECTED").unwrap();
let rejected = Array::<_, u32>::try_from(rejected).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
Some("ring_buf_trigger_ebpf_program"),
0,
"/proc/self/exe",
None,
)
.unwrap();
// Generate some random data.
let data = gen_data();
let data = &data;
let writer = call_ring_buf_trigger_ebpf_program_over_time(data);
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap();
let expected: Vec<u64> = data.iter().cloned().filter(|v| *v % 2 == 0).collect();
let expected_count = expected.len();
let reader = async {
let mut seen = Vec::with_capacity(expected_count);
while seen.len() < expected_count {
// Wait for readiness, then clear the bit before reading so that no notifications
// are missed.
let res = async_fd.readable().await.unwrap().clear_ready();
while let Some(read) = ring_buf.next() {
let read: [u8; 8] = (*read)
.try_into()
.context(format!("data: {:?}", (&*read).len()))
.unwrap();
let arg = u64::from_ne_bytes(read);
seen.push(arg);
}
}
seen
};
let ((), seen) = futures::future::join(writer, reader).await;
// Ensure that the data that was read matches what was passed.
assert_eq!(&seen, &expected);
let rejected: usize = rejected.get(&0, 0).unwrap().try_into().unwrap();
let expected_rejected = data.len() - expected.len();
assert_eq!(rejected, expected_rejected)
}
async fn call_ring_buf_trigger_ebpf_program_over_time(data: &[u64]) {
use rand::Rng as _;
let mut rng = rand::thread_rng();
let mut random_duration = || {
let micros = rng.gen_range(0..1_000);
Duration::from_micros(micros)
};
for value in data {
sleep(random_duration()).await;
ring_buf_trigger_ebpf_program(*value);
}
}
Loading…
Cancel
Save