|
|
@ -4,6 +4,10 @@ use std::{
|
|
|
|
os::fd::{AsRawFd, RawFd},
|
|
|
|
os::fd::{AsRawFd, RawFd},
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// See https://doc.rust-lang.org/cargo/reference/features.html#mutually-exclusive-features.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// We should eventually split async functionality out into separate crates "aya-async-tokio" and
|
|
|
|
|
|
|
|
// "async-async-std". Presently we arbitrarily choose tokio over async-std when both are requested.
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
use async_io::Async;
|
|
|
|
use async_io::Async;
|
|
|
|
|
|
|
|
|
|
|
@ -98,16 +102,17 @@ impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArray<T> {
|
|
|
|
index: u32,
|
|
|
|
index: u32,
|
|
|
|
page_count: Option<usize>,
|
|
|
|
page_count: Option<usize>,
|
|
|
|
) -> Result<AsyncPerfEventArrayBuffer<T>, PerfBufferError> {
|
|
|
|
) -> Result<AsyncPerfEventArrayBuffer<T>, PerfBufferError> {
|
|
|
|
let buf = self.perf_map.open(index, page_count)?;
|
|
|
|
let Self { perf_map } = self;
|
|
|
|
|
|
|
|
let buf = perf_map.open(index, page_count)?;
|
|
|
|
let fd = buf.as_raw_fd();
|
|
|
|
let fd = buf.as_raw_fd();
|
|
|
|
Ok(AsyncPerfEventArrayBuffer {
|
|
|
|
Ok(AsyncPerfEventArrayBuffer {
|
|
|
|
buf,
|
|
|
|
buf,
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
async_fd: AsyncFd::new(fd)?,
|
|
|
|
async_tokio_fd: AsyncFd::new(fd)?,
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
async_fd: Async::new(fd)?,
|
|
|
|
async_std_fd: Async::new(fd)?,
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -131,13 +136,12 @@ pub struct AsyncPerfEventArrayBuffer<T> {
|
|
|
|
buf: PerfEventArrayBuffer<T>,
|
|
|
|
buf: PerfEventArrayBuffer<T>,
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
async_fd: AsyncFd<RawFd>,
|
|
|
|
async_tokio_fd: AsyncFd<RawFd>,
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
async_fd: Async<RawFd>,
|
|
|
|
async_std_fd: Async<RawFd>,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
|
|
|
|
impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArrayBuffer<T> {
|
|
|
|
impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArrayBuffer<T> {
|
|
|
|
/// Reads events from the buffer.
|
|
|
|
/// Reads events from the buffer.
|
|
|
|
///
|
|
|
|
///
|
|
|
@ -152,46 +156,30 @@ impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArrayBuffer<T> {
|
|
|
|
&mut self,
|
|
|
|
&mut self,
|
|
|
|
buffers: &mut [BytesMut],
|
|
|
|
buffers: &mut [BytesMut],
|
|
|
|
) -> Result<Events, PerfBufferError> {
|
|
|
|
) -> Result<Events, PerfBufferError> {
|
|
|
|
|
|
|
|
let Self {
|
|
|
|
|
|
|
|
buf,
|
|
|
|
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
|
|
|
|
async_tokio_fd,
|
|
|
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
|
|
|
|
async_std_fd,
|
|
|
|
|
|
|
|
} = self;
|
|
|
|
loop {
|
|
|
|
loop {
|
|
|
|
let mut guard = self.async_fd.readable_mut().await?;
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
|
|
|
|
let mut guard = async_tokio_fd.readable_mut().await?;
|
|
|
|
|
|
|
|
|
|
|
|
match self.buf.read_events(buffers) {
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
Ok(events) if events.read > 0 || events.lost > 0 => return Ok(events),
|
|
|
|
if !buf.readable() {
|
|
|
|
Ok(_) => {
|
|
|
|
async_std_fd.readable().await?;
|
|
|
|
guard.clear_ready();
|
|
|
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Err(e) => return Err(e),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
let events = buf.read_events(buffers)?;
|
|
|
|
impl<T: BorrowMut<MapData> + Borrow<MapData>> AsyncPerfEventArrayBuffer<T> {
|
|
|
|
const EMPTY: Events = Events { read: 0, lost: 0 };
|
|
|
|
/// Reads events from the buffer.
|
|
|
|
if events != EMPTY {
|
|
|
|
///
|
|
|
|
break Ok(events);
|
|
|
|
/// This method reads events into the provided slice of buffers, filling
|
|
|
|
|
|
|
|
/// each buffer in order stopping when there are no more events to read or
|
|
|
|
|
|
|
|
/// all the buffers have been filled.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Returns the number of events read and the number of events lost. Events
|
|
|
|
|
|
|
|
/// are lost when user space doesn't read events fast enough and the ring
|
|
|
|
|
|
|
|
/// buffer fills up.
|
|
|
|
|
|
|
|
pub async fn read_events(
|
|
|
|
|
|
|
|
&mut self,
|
|
|
|
|
|
|
|
buffers: &mut [BytesMut],
|
|
|
|
|
|
|
|
) -> Result<Events, PerfBufferError> {
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
|
|
|
if !self.buf.readable() {
|
|
|
|
|
|
|
|
let _ = self.async_fd.readable().await?;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
match self.buf.read_events(buffers) {
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
Ok(events) if events.read > 0 || events.lost > 0 => return Ok(events),
|
|
|
|
guard.clear_ready();
|
|
|
|
Ok(_) => continue,
|
|
|
|
|
|
|
|
Err(e) => return Err(e),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|