@ -4,6 +4,10 @@ use std::{
os ::fd ::{ AsRawFd , RawFd } ,
} ;
// See https://doc.rust-lang.org/cargo/reference/features.html#mutually-exclusive-features.
//
// We should eventually split async functionality out into separate crates "aya-async-tokio" and
// "async-async-std". Presently we arbitrarily choose tokio over async-std when both are requested.
#[ cfg(all(not(feature = " async_tokio " ), feature = " async_std " )) ]
use async_io ::Async ;
@ -98,16 +102,17 @@ impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArray<T> {
index : u32 ,
page_count : Option < usize > ,
) -> Result < AsyncPerfEventArrayBuffer < T > , PerfBufferError > {
let buf = self . perf_map . open ( index , page_count ) ? ;
let Self { perf_map } = self ;
let buf = perf_map . open ( index , page_count ) ? ;
let fd = buf . as_raw_fd ( ) ;
Ok ( AsyncPerfEventArrayBuffer {
buf ,
#[ cfg(feature = " async_tokio " ) ]
async_ fd: AsyncFd ::new ( fd ) ? ,
async_ tokio_ fd: AsyncFd ::new ( fd ) ? ,
#[ cfg(all(not(feature = " async_tokio " ), feature = " async_std " )) ]
async_ fd: Async ::new ( fd ) ? ,
async_ std_ fd: Async ::new ( fd ) ? ,
} )
}
}
@ -131,13 +136,12 @@ pub struct AsyncPerfEventArrayBuffer<T> {
buf : PerfEventArrayBuffer < T > ,
#[ cfg(feature = " async_tokio " ) ]
async_ fd: AsyncFd < RawFd > ,
async_ tokio_ fd: AsyncFd < RawFd > ,
#[ cfg(all(not(feature = " async_tokio " ), feature = " async_std " )) ]
async_ fd: Async < RawFd > ,
async_ std_ fd: Async < RawFd > ,
}
#[ cfg(feature = " async_tokio " ) ]
impl < T : BorrowMut < MapData > + Borrow < MapData > > AsyncPerfEventArrayBuffer < T > {
/// Reads events from the buffer.
///
@ -152,46 +156,30 @@ impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArrayBuffer<T> {
& mut self ,
buffers : & mut [ BytesMut ] ,
) -> Result < Events , PerfBufferError > {
let Self {
buf ,
#[ cfg(feature = " async_tokio " ) ]
async_tokio_fd ,
#[ cfg(all(not(feature = " async_tokio " ), feature = " async_std " )) ]
async_std_fd ,
} = self ;
loop {
let mut guard = self . async_fd . readable_mut ( ) . await ? ;
#[ cfg(feature = " async_tokio " ) ]
let mut guard = async_tokio_fd . readable_mut ( ) . await ? ;
match self . buf . read_events ( buffers ) {
Ok ( events ) if events . read > 0 | | events . lost > 0 = > return Ok ( events ) ,
Ok ( _ ) = > {
guard . clear_ready ( ) ;
continue ;
}
Err ( e ) = > return Err ( e ) ,
#[ cfg(all(not(feature = " async_tokio " ), feature = " async_std " )) ]
if ! buf . readable ( ) {
async_std_fd . readable ( ) . await ? ;
}
}
}
}
#[ cfg(all(not(feature = " async_tokio " ), feature = " async_std " )) ]
impl < T : BorrowMut < MapData > + Borrow < MapData > > AsyncPerfEventArrayBuffer < T > {
/// Reads events from the buffer.
///
/// This method reads events into the provided slice of buffers, filling
/// each buffer in order stopping when there are no more events to read or
/// all the buffers have been filled.
///
/// Returns the number of events read and the number of events lost. Events
/// are lost when user space doesn't read events fast enough and the ring
/// buffer fills up.
pub async fn read_events (
& mut self ,
buffers : & mut [ BytesMut ] ,
) -> Result < Events , PerfBufferError > {
loop {
if ! self . buf . readable ( ) {
let _ = self . async_fd . readable ( ) . await ? ;
let events = buf . read_events ( buffers ) ? ;
const EMPTY : Events = Events { read : 0 , lost : 0 } ;
if events ! = EMPTY {
break Ok ( events ) ;
}
match self . buf . read_events ( buffers ) {
Ok ( events ) if events . read > 0 | | events . lost > 0 = > return Ok ( events ) ,
Ok ( _ ) = > continue ,
Err ( e ) = > return Err ( e ) ,
}
#[ cfg(feature = " async_tokio " ) ]
guard . clear_ready ( ) ;
}
}
}