diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index c93a7118..4b08eb11 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -303,6 +303,10 @@ impl ProducerData { let len = page_size + 2 * usize::try_from(byte_size).unwrap(); let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + // The producer position may be non-zero if the map is being loaded from a pin, or the map + // has been used previously; load the initial value of the producer position from the mmap. + let pos_cache = load_producer_pos(&mmap); + // byte_size is required to be a power of two multiple of page_size (which implicitly is a // power of 2), so subtracting one will create a bitmask for values less than byte_size. debug_assert!(byte_size.is_power_of_two()); @@ -310,7 +314,7 @@ impl ProducerData { Ok(Self { mmap, data_offset: page_size, - pos_cache: 0, + pos_cache, mask, }) } @@ -322,7 +326,6 @@ impl ProducerData { ref mut pos_cache, ref mut mask, } = self; - let pos = unsafe { mmap.ptr().cast().as_ref() }; let mmap_data = mmap.as_ref(); let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| { panic!( @@ -331,7 +334,7 @@ impl ProducerData { mmap_data.len() ) }); - while data_available(pos, pos_cache, consumer) { + while data_available(mmap, pos_cache, consumer) { match read_item(data_pages, *mask, consumer) { Item::Busy => return None, Item::Discard { len } => consumer.consume(len), @@ -347,17 +350,15 @@ impl ProducerData { } fn data_available( - producer: &AtomicUsize, - cache: &mut usize, + producer: &MMap, + producer_cache: &mut usize, consumer: &ConsumerPos, ) -> bool { let ConsumerPos { pos: consumer, .. } = consumer; - if consumer == cache { - // This value is written using Release by the kernel [1], and should be read with - // Acquire to ensure that the prior writes to the entry header are visible. - // - // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 - *cache = producer.load(Ordering::Acquire); + // Refresh the producer position cache if it appears that the consumer is caught up + // with the producer position. + if consumer == producer_cache { + *producer_cache = load_producer_pos(producer); } // Note that we don't compare the order of the values because the producer position may @@ -369,7 +370,7 @@ impl ProducerData { // producer position has wrapped around. // // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 - consumer != cache + consumer != producer_cache } fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> { @@ -403,3 +404,12 @@ impl ProducerData { } } } + +// Loads the producer position from the shared memory mmap. +fn load_producer_pos(producer: &MMap) -> usize { + // This value is written using Release by the kernel [1], and should be read with + // Acquire to ensure that the prior writes to the entry header are visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 + unsafe { producer.ptr().cast::().as_ref() }.load(Ordering::Acquire) +} diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 2c56f736..f91b8698 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,6 +1,7 @@ use std::{ mem, os::fd::AsRawFd as _, + path::Path, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -19,6 +20,7 @@ use aya::{ use aya_obj::generated::BPF_RINGBUF_HDR_SZ; use integration_common::ring_buf::Registers; use rand::Rng as _; +use scopeguard::defer; use tokio::io::{Interest, unix::AsyncFd}; struct RingBufTest { @@ -35,14 +37,25 @@ const RING_BUF_MAX_ENTRIES: usize = 512; impl RingBufTest { fn new() -> Self { + Self::new_with_mutators(|_loader| {}, |_bpf| {}) + } + + // Allows the test to mutate the EbpfLoader before it loads the object file from disk, and to + // mutate the loaded Ebpf object after it has been loaded from disk but before it is loaded + // into the kernel. + fn new_with_mutators<'loader>( + loader_fn: impl FnOnce(&mut EbpfLoader<'loader>), + bpf_fn: impl FnOnce(&mut Ebpf), + ) -> Self { const RING_BUF_BYTE_SIZE: u32 = (RING_BUF_MAX_ENTRIES * (mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) as u32; // Use the loader API to control the size of the ring_buf. - let mut bpf = EbpfLoader::new() - .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) - .load(crate::RING_BUF) - .unwrap(); + let mut loader = EbpfLoader::new(); + loader.set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE); + loader_fn(&mut loader); + let mut bpf = loader.load(crate::RING_BUF).unwrap(); + bpf_fn(&mut bpf); let ring_buf = bpf.take_map("RING_BUF").unwrap(); let ring_buf = RingBuf::try_from(ring_buf).unwrap(); let regs = bpf.take_map("REGISTERS").unwrap(); @@ -412,3 +425,70 @@ impl WriterThread { thread.join().unwrap(); } } + +// This tests that a ring buffer can be pinned and then re-opened and attached to a subsequent +// program. It ensures that the producer position is properly synchronized between the two +// programs, and that no unread data is lost. +#[tokio::test(flavor = "multi_thread")] +#[test_log::test] +async fn ring_buf_pinned() { + let pin_path = + Path::new("/sys/fs/bpf/").join(format!("ring_buf_{}", rand::rng().random::())); + + let RingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = RingBufTest::new_with_mutators( + |_loader| {}, + |bpf| { + let ring_buf = bpf.map_mut("RING_BUF").unwrap(); + ring_buf.pin(&pin_path).unwrap(); + }, + ); + defer! { std::fs::remove_file(&pin_path).unwrap() } + + // Write a few items to the ring buffer. + let to_write_before_reopen = [2, 4, 6, 8]; + for v in to_write_before_reopen { + ring_buf_trigger_ebpf_program(v); + } + let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2); + for v in to_read_before_reopen { + let item = ring_buf.next().unwrap(); + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + } + drop(ring_buf); + drop(_bpf); + + // Reopen the ring buffer using the pinned map. + let RingBufTest { + mut ring_buf, + regs: _, + _bpf, + } = RingBufTest::new_with_mutators( + |loader| { + loader.map_pin_path("RING_BUF", &pin_path); + }, + |_bpf| {}, + ); + let to_write_after_reopen = [10, 12]; + + // Write some more data. + for v in to_write_after_reopen { + ring_buf_trigger_ebpf_program(v); + } + // Read both the data that was written before the ring buffer was reopened and the data that + // was written after it was reopened. + for v in to_read_after_reopen + .iter() + .chain(to_write_after_reopen.iter()) + { + let item = ring_buf.next().unwrap(); + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + } + // Make sure there is nothing else in the ring buffer. + assert_matches!(ring_buf.next(), None); +}