@ -1,6 +1,7 @@
use std ::{
use std ::{
mem ,
mem ,
os ::fd ::AsRawFd as _ ,
os ::fd ::AsRawFd as _ ,
path ::Path ,
sync ::{
sync ::{
Arc ,
Arc ,
atomic ::{ AtomicBool , Ordering } ,
atomic ::{ AtomicBool , Ordering } ,
@ -13,12 +14,13 @@ use anyhow::Context as _;
use assert_matches ::assert_matches ;
use assert_matches ::assert_matches ;
use aya ::{
use aya ::{
Ebpf , EbpfLoader ,
Ebpf , EbpfLoader ,
maps ::{ MapData , array ::PerCpuArray , ring_buf ::RingBuf } ,
maps ::{ MapData , array ::PerCpuArray , ring_buf ::RingBuf } ,
programs ::UProbe ,
programs ::UProbe ,
} ;
} ;
use aya_obj ::generated ::BPF_RINGBUF_HDR_SZ ;
use aya_obj ::generated ::BPF_RINGBUF_HDR_SZ ;
use integration_common ::ring_buf ::Registers ;
use integration_common ::ring_buf ::Registers ;
use rand ::Rng as _ ;
use rand ::Rng as _ ;
use scopeguard ::defer ;
use tokio ::io ::{ Interest , unix ::AsyncFd } ;
use tokio ::io ::{ Interest , unix ::AsyncFd } ;
struct RingBufTest {
struct RingBufTest {
@ -35,14 +37,25 @@ const RING_BUF_MAX_ENTRIES: usize = 512;
impl RingBufTest {
impl RingBufTest {
fn new ( ) -> Self {
fn new ( ) -> Self {
Self ::new_with_mutators ( | _loader | { } , | _bpf | { } )
}
// Allows the test to mutate the EbpfLoader before it loads the object file from disk, and to
// mutate the loaded Ebpf object after it has been loaded from disk but before it is loaded
// into the kernel.
fn new_with_mutators (
loader_fn : impl FnOnce ( & mut EbpfLoader ) ,
bpf_fn : impl FnOnce ( & mut Ebpf ) ,
) -> Self {
const RING_BUF_BYTE_SIZE : u32 =
const RING_BUF_BYTE_SIZE : u32 =
( RING_BUF_MAX_ENTRIES * ( mem ::size_of ::< u64 > ( ) + BPF_RINGBUF_HDR_SZ as usize ) ) as u32 ;
( RING_BUF_MAX_ENTRIES * ( mem ::size_of ::< u64 > ( ) + BPF_RINGBUF_HDR_SZ as usize ) ) as u32 ;
// Use the loader API to control the size of the ring_buf.
// Use the loader API to control the size of the ring_buf.
let mut bpf = EbpfLoader ::new ( )
let mut loader = EbpfLoader ::new ( ) ;
. set_max_entries ( "RING_BUF" , RING_BUF_BYTE_SIZE )
loader . set_max_entries ( "RING_BUF" , RING_BUF_BYTE_SIZE ) ;
. load ( crate ::RING_BUF )
loader_fn ( & mut loader ) ;
. unwrap ( ) ;
let mut bpf = loader . load ( crate ::RING_BUF ) . unwrap ( ) ;
bpf_fn ( & mut bpf ) ;
let ring_buf = bpf . take_map ( "RING_BUF" ) . unwrap ( ) ;
let ring_buf = bpf . take_map ( "RING_BUF" ) . unwrap ( ) ;
let ring_buf = RingBuf ::try_from ( ring_buf ) . unwrap ( ) ;
let ring_buf = RingBuf ::try_from ( ring_buf ) . unwrap ( ) ;
let regs = bpf . take_map ( "REGISTERS" ) . unwrap ( ) ;
let regs = bpf . take_map ( "REGISTERS" ) . unwrap ( ) ;
@ -412,3 +425,78 @@ impl WriterThread {
thread . join ( ) . unwrap ( ) ;
thread . join ( ) . unwrap ( ) ;
}
}
}
}
// This tests that a ring buffer can be pinned and then re-opened and attached to a subsequent
// program. It ensures that the producer position is properly synchronized between the two
// programs, and that no unread data is lost.
#[ tokio::test(flavor = " multi_thread " ) ]
#[ test_log::test ]
async fn ring_buf_pinned ( ) {
let pin_path =
Path ::new ( "/sys/fs/bpf/" ) . join ( format! ( "ring_buf_{}" , rand ::rng ( ) . random ::< u64 > ( ) ) ) ;
let pin_path = & pin_path ;
defer ! {
match std ::fs ::remove_file ( & pin_path ) {
Ok ( _ ) = > ( ) ,
Err ( e ) = > match e . kind ( ) {
std ::io ::ErrorKind ::NotFound = > { }
_ = > panic! ( "failed to remove pin path {}: {e}" , pin_path . display ( ) ) ,
} ,
}
}
let RingBufTest {
mut ring_buf ,
regs : _ ,
_bpf ,
} = RingBufTest ::new_with_mutators (
| _loader | { } ,
move | bpf | {
let ring_buf = bpf . map_mut ( "RING_BUF" ) . unwrap ( ) ;
ring_buf . pin ( & pin_path ) . unwrap ( ) ;
} ,
) ;
// Write a few items to the ring buffer.
let to_write_before_reopen = [ 2 , 4 , 6 , 8 ] ;
for v in to_write_before_reopen {
ring_buf_trigger_ebpf_program ( v ) ;
}
let ( to_read_before_reopen , to_read_after_reopen ) = to_write_before_reopen . split_at ( 2 ) ;
for v in to_read_before_reopen {
let item = ring_buf . next ( ) . unwrap ( ) ;
let item : [ u8 ; 8 ] = item . as_ref ( ) . try_into ( ) . unwrap ( ) ;
assert_eq! ( item , v . to_ne_bytes ( ) ) ;
}
drop ( ring_buf ) ;
drop ( _bpf ) ;
// Reopen the ring buffer using the pinned map.
let RingBufTest {
mut ring_buf ,
regs : _ ,
_bpf ,
} = RingBufTest ::new_with_mutators (
| loader | {
loader . set_map_pin_path ( "RING_BUF" , pin_path . to_owned ( ) ) ;
} ,
| _bpf | { } ,
) ;
let to_write_after_reopen = [ 10 , 12 ] ;
// Write some more data.
for v in to_write_after_reopen {
ring_buf_trigger_ebpf_program ( v ) ;
}
// Read both the data that was written before the ring buffer was reopened and the data that
// was written after it was reopened.
for v in to_read_after_reopen
. iter ( )
. chain ( to_write_after_reopen . iter ( ) )
{
let item = ring_buf . next ( ) . unwrap ( ) ;
let item : [ u8 ; 8 ] = item . as_ref ( ) . try_into ( ) . unwrap ( ) ;
assert_eq! ( item , v . to_ne_bytes ( ) ) ;
}
assert_matches ! ( ring_buf . next ( ) , None ) ;
}