aya/maps/ring_buf: fix producer position initialization

The RingBuf caches the last value it read of the producer so it doesn't
need to constantly contend on the actual producer cache line if lots of
messages have yet to be consumed. It was bogus to initialize this cache
at 0. This patch initializes it properly and adds testing.

Fixes: #1309
reviewable/pr1371/r1
Andrew Werner 3 weeks ago committed by ajwerner
parent 5802dc7a23
commit 17171647f7

@ -303,6 +303,10 @@ impl ProducerData {
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
// The producer position may be non-zero if the map is being loaded from a pin, or the map
// has been used previously; load the initial value of the producer position from the mmap.
let pos_cache = load_producer_pos(&mmap);
// byte_size is required to be a power of two multiple of page_size (which implicitly is a
// power of 2), so subtracting one will create a bitmask for values less than byte_size.
debug_assert!(byte_size.is_power_of_two());
@ -310,7 +314,7 @@ impl ProducerData {
Ok(Self {
mmap,
data_offset: page_size,
pos_cache: 0,
pos_cache,
mask,
})
}
@ -322,7 +326,6 @@ impl ProducerData {
ref mut pos_cache,
ref mut mask,
} = self;
let pos = unsafe { mmap.ptr().cast().as_ref() };
let mmap_data = mmap.as_ref();
let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
panic!(
@ -331,7 +334,7 @@ impl ProducerData {
mmap_data.len()
)
});
while data_available(pos, pos_cache, consumer) {
while data_available(mmap, pos_cache, consumer) {
match read_item(data_pages, *mask, consumer) {
Item::Busy => return None,
Item::Discard { len } => consumer.consume(len),
@ -347,17 +350,15 @@ impl ProducerData {
}
fn data_available(
producer: &AtomicUsize,
cache: &mut usize,
producer: &MMap,
producer_cache: &mut usize,
consumer: &ConsumerPos,
) -> bool {
let ConsumerPos { pos: consumer, .. } = consumer;
if consumer == cache {
// This value is written using Release by the kernel [1], and should be read with
// Acquire to ensure that the prior writes to the entry header are visible.
//
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448
*cache = producer.load(Ordering::Acquire);
// Refresh the producer position cache if it appears that the consumer is caught up
// with the producer position.
if consumer == producer_cache {
*producer_cache = load_producer_pos(producer);
}
// Note that we don't compare the order of the values because the producer position may
@ -369,7 +370,7 @@ impl ProducerData {
// producer position has wrapped around.
//
// [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
consumer != cache
consumer != producer_cache
}
fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
@ -403,3 +404,12 @@ impl ProducerData {
}
}
}
// Loads the producer position from the shared memory mmap.
fn load_producer_pos(producer: &MMap) -> usize {
// This value is written using Release by the kernel [1], and should be read with
// Acquire to ensure that the prior writes to the entry header are visible.
//
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448
unsafe { producer.ptr().cast::<AtomicUsize>().as_ref() }.load(Ordering::Acquire)
}

@ -1,6 +1,7 @@
use std::{
mem,
os::fd::AsRawFd as _,
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
@ -19,6 +20,7 @@ use aya::{
use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
use integration_common::ring_buf::Registers;
use rand::Rng as _;
use scopeguard::defer;
use tokio::io::{Interest, unix::AsyncFd};
struct RingBufTest {
@ -35,14 +37,25 @@ const RING_BUF_MAX_ENTRIES: usize = 512;
impl RingBufTest {
fn new() -> Self {
Self::new_with_mutators(|_loader| {}, |_bpf| {})
}
// Allows the test to mutate the EbpfLoader before it loads the object file from disk, and to
// mutate the loaded Ebpf object after it has been loaded from disk but before it is loaded
// into the kernel.
fn new_with_mutators<'loader>(
loader_fn: impl FnOnce(&mut EbpfLoader<'loader>),
bpf_fn: impl FnOnce(&mut Ebpf),
) -> Self {
const RING_BUF_BYTE_SIZE: u32 =
(RING_BUF_MAX_ENTRIES * (mem::size_of::<u64>() + BPF_RINGBUF_HDR_SZ as usize)) as u32;
// Use the loader API to control the size of the ring_buf.
let mut bpf = EbpfLoader::new()
.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE)
.load(crate::RING_BUF)
.unwrap();
let mut loader = EbpfLoader::new();
loader.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE);
loader_fn(&mut loader);
let mut bpf = loader.load(crate::RING_BUF).unwrap();
bpf_fn(&mut bpf);
let ring_buf = bpf.take_map("RING_BUF").unwrap();
let ring_buf = RingBuf::try_from(ring_buf).unwrap();
let regs = bpf.take_map("REGISTERS").unwrap();
@ -412,3 +425,70 @@ impl WriterThread {
thread.join().unwrap();
}
}
// This tests that a ring buffer can be pinned and then re-opened and attached to a subsequent
// program. It ensures that the producer position is properly synchronized between the two
// programs, and that no unread data is lost.
#[tokio::test(flavor = "multi_thread")]
#[test_log::test]
async fn ring_buf_pinned() {
let pin_path =
Path::new("/sys/fs/bpf/").join(format!("ring_buf_{}", rand::rng().random::<u64>()));
let RingBufTest {
mut ring_buf,
regs: _,
_bpf,
} = RingBufTest::new_with_mutators(
|_loader| {},
|bpf| {
let ring_buf = bpf.map_mut("RING_BUF").unwrap();
ring_buf.pin(&pin_path).unwrap();
},
);
defer! { std::fs::remove_file(&pin_path).unwrap() }
// Write a few items to the ring buffer.
let to_write_before_reopen = [2, 4, 6, 8];
for v in to_write_before_reopen {
ring_buf_trigger_ebpf_program(v);
}
let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2);
for v in to_read_before_reopen {
let item = ring_buf.next().unwrap();
let item: [u8; 8] = item.as_ref().try_into().unwrap();
assert_eq!(item, v.to_ne_bytes());
}
drop(ring_buf);
drop(_bpf);
// Reopen the ring buffer using the pinned map.
let RingBufTest {
mut ring_buf,
regs: _,
_bpf,
} = RingBufTest::new_with_mutators(
|loader| {
loader.map_pin_path("RING_BUF", &pin_path);
},
|_bpf| {},
);
let to_write_after_reopen = [10, 12];
// Write some more data.
for v in to_write_after_reopen {
ring_buf_trigger_ebpf_program(v);
}
// Read both the data that was written before the ring buffer was reopened and the data that
// was written after it was reopened.
for v in to_read_after_reopen
.iter()
.chain(to_write_after_reopen.iter())
{
let item = ring_buf.next().unwrap();
let item: [u8; 8] = item.as_ref().try_into().unwrap();
assert_eq!(item, v.to_ne_bytes());
}
// Make sure there is nothing else in the ring buffer.
assert_matches!(ring_buf.next(), None);
}

Loading…
Cancel
Save