|
|
|
@ -16,6 +16,75 @@ use crate::maps::{
|
|
|
|
|
Map, MapError, MapRefMut,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// A `Future` based map that can be used to receive events from eBPF programs using the linux
|
|
|
|
|
/// [`perf`](https://perf.wiki.kernel.org/index.php/Main_Page) API.
|
|
|
|
|
///
|
|
|
|
|
/// This is the async version of [`PerfEventArray`], which provides integration
|
|
|
|
|
/// with [tokio](https://docs.rs/tokio) and [async-std](https:/docs.rs/async-std) and a nice `Future` based API.
|
|
|
|
|
///
|
|
|
|
|
/// To receive events you need to:
|
|
|
|
|
/// * call [`AsyncPerfEventArray::open`]
|
|
|
|
|
/// * call [`AsyncPerfEventArrayBuffer::read_events`] to read the events
|
|
|
|
|
///
|
|
|
|
|
/// # Example
|
|
|
|
|
///
|
|
|
|
|
/// ```no_run
|
|
|
|
|
/// # #[derive(thiserror::Error, Debug)]
|
|
|
|
|
/// # enum Error {
|
|
|
|
|
/// # #[error(transparent)]
|
|
|
|
|
/// # IO(#[from] std::io::Error),
|
|
|
|
|
/// # #[error(transparent)]
|
|
|
|
|
/// # Map(#[from] aya::maps::MapError),
|
|
|
|
|
/// # #[error(transparent)]
|
|
|
|
|
/// # Bpf(#[from] aya::BpfError),
|
|
|
|
|
/// # #[error(transparent)]
|
|
|
|
|
/// # PerfBuf(#[from] aya::maps::perf::PerfBufferError),
|
|
|
|
|
/// # }
|
|
|
|
|
/// # async fn try_main() -> Result<(), Error> {
|
|
|
|
|
/// # use async_std::task;
|
|
|
|
|
/// # let bpf = aya::Bpf::load(&[], None)?;
|
|
|
|
|
/// use aya::maps::perf::{AsyncPerfEventArray, PerfBufferError};
|
|
|
|
|
/// use aya::util::online_cpus;
|
|
|
|
|
/// use std::convert::TryFrom;
|
|
|
|
|
/// use futures::future;
|
|
|
|
|
/// use bytes::BytesMut;
|
|
|
|
|
///
|
|
|
|
|
/// // try to convert the PERF_ARRAY map to an AsyncPerfEventArray
|
|
|
|
|
/// let mut perf_array = AsyncPerfEventArray::try_from(bpf.map_mut("PERF_ARRAY")?)?;
|
|
|
|
|
///
|
|
|
|
|
/// let mut futs = Vec::new();
|
|
|
|
|
/// for cpu_id in online_cpus()? {
|
|
|
|
|
/// // open a separate perf buffer for each cpu
|
|
|
|
|
/// let mut buf = perf_array.open(cpu_id, None)?;
|
|
|
|
|
///
|
|
|
|
|
/// // process each perf buffer in a separate task
|
|
|
|
|
/// // NOTE: use async_std::task::spawn with async-std and tokio::spawn with tokio
|
|
|
|
|
/// futs.push(task::spawn(async move {
|
|
|
|
|
/// let mut buffers = (0..10)
|
|
|
|
|
/// .map(|_| BytesMut::with_capacity(1024))
|
|
|
|
|
/// .collect::<Vec<_>>();
|
|
|
|
|
///
|
|
|
|
|
/// loop {
|
|
|
|
|
/// // wait for events
|
|
|
|
|
/// let events = buf.read_events(&mut buffers).await?;
|
|
|
|
|
///
|
|
|
|
|
/// // events.read contains the number of events that have been read,
|
|
|
|
|
/// // and is always <= buffers.len()
|
|
|
|
|
/// for i in 0..events.read {
|
|
|
|
|
/// let buf = &mut buffers[i];
|
|
|
|
|
/// // process buf
|
|
|
|
|
/// }
|
|
|
|
|
/// }
|
|
|
|
|
///
|
|
|
|
|
/// Ok::<_, PerfBufferError>(())
|
|
|
|
|
/// }));
|
|
|
|
|
/// }
|
|
|
|
|
///
|
|
|
|
|
///
|
|
|
|
|
/// future::join_all(futs).await;
|
|
|
|
|
/// # Ok(())
|
|
|
|
|
/// # }
|
|
|
|
|
/// ```
|
|
|
|
|
pub struct AsyncPerfEventArray<T: DerefMut<Target = Map>> {
|
|
|
|
|
perf_map: PerfEventArray<T>,
|
|
|
|
|
}
|
|
|
|
@ -34,7 +103,7 @@ impl<T: DerefMut<Target = Map>> AsyncPerfEventArray<T> {
|
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
|
async_fd: AsyncFd::new(fd)?,
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_std")]
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
|
async_fd: Async::new(fd)?,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -48,13 +117,20 @@ impl<T: DerefMut<Target = Map>> AsyncPerfEventArray<T> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A `Future` based ring buffer that can receive events from eBPF programs.
|
|
|
|
|
///
|
|
|
|
|
/// [`AsyncPerfEventArrayBuffer`] is a ring buffer that can receive events from eBPF programs that
|
|
|
|
|
/// use `bpf_perf_event_output()`. It's returned by [`AsyncPerfEventArray::open`].
|
|
|
|
|
///
|
|
|
|
|
/// See the [`AsyncPerfEventArray` documentation](AsyncPerfEventArray) for an overview of how to
|
|
|
|
|
/// use perf buffers.
|
|
|
|
|
pub struct AsyncPerfEventArrayBuffer<T: DerefMut<Target = Map>> {
|
|
|
|
|
buf: PerfEventArrayBuffer<T>,
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_tokio")]
|
|
|
|
|
async_fd: AsyncFd<RawFd>,
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_std")]
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
|
async_fd: Async<RawFd>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -79,7 +155,7 @@ impl<T: DerefMut<Target = Map>> AsyncPerfEventArrayBuffer<T> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "async_std")]
|
|
|
|
|
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
|
|
|
|
|
impl<T: DerefMut<Target = Map>> AsyncPerfEventArrayBuffer<T> {
|
|
|
|
|
pub async fn read_events(
|
|
|
|
|
&mut self,
|
|
|
|
|