diff --git a/aya/src/maps/mod.rs b/aya/src/maps/mod.rs index 2e36c95c..9ec7b170 100644 --- a/aya/src/maps/mod.rs +++ b/aya/src/maps/mod.rs @@ -40,12 +40,12 @@ use crate::{ pub mod hash_map; mod map_lock; -pub mod perf_map; +pub mod perf; pub mod program_array; pub use hash_map::HashMap; pub use map_lock::*; -pub use perf_map::PerfMap; +pub use perf::PerfEventArray; pub use program_array::ProgramArray; #[derive(Error, Debug)] diff --git a/aya/src/maps/perf_map/async_perf_map.rs b/aya/src/maps/perf/async_perf_event_array.rs similarity index 65% rename from aya/src/maps/perf_map/async_perf_map.rs rename to aya/src/maps/perf/async_perf_event_array.rs index 42157d59..afe1924f 100644 --- a/aya/src/maps/perf_map/async_perf_map.rs +++ b/aya/src/maps/perf/async_perf_event_array.rs @@ -12,23 +12,23 @@ use async_io::Async; use tokio::io::unix::AsyncFd; use crate::maps::{ - perf_map::{Events, PerfBufferError, PerfMap, PerfMapBuffer, PerfMapError}, - Map, MapRefMut, + perf::{Events, PerfBufferError, PerfEventArray, PerfEventArrayBuffer}, + Map, MapError, MapRefMut, }; -pub struct AsyncPerfMap> { - perf_map: PerfMap, +pub struct AsyncPerfEventArray> { + perf_map: PerfEventArray, } -impl> AsyncPerfMap { +impl> AsyncPerfEventArray { pub fn open( &mut self, index: u32, page_count: Option, - ) -> Result, PerfMapError> { + ) -> Result, PerfBufferError> { let buf = self.perf_map.open(index, page_count)?; let fd = buf.as_raw_fd(); - Ok(AsyncPerfMapBuffer { + Ok(AsyncPerfEventArrayBuffer { buf, #[cfg(feature = "async_tokio")] @@ -40,16 +40,16 @@ impl> AsyncPerfMap { } } -impl> AsyncPerfMap { - fn new(map: T) -> Result, PerfMapError> { - Ok(AsyncPerfMap { - perf_map: PerfMap::new(map)?, +impl> AsyncPerfEventArray { + fn new(map: T) -> Result, MapError> { + Ok(AsyncPerfEventArray { + perf_map: PerfEventArray::new(map)?, }) } } -pub struct AsyncPerfMapBuffer> { - buf: PerfMapBuffer, +pub struct AsyncPerfEventArrayBuffer> { + buf: PerfEventArrayBuffer, #[cfg(feature = "async_tokio")] async_fd: AsyncFd, @@ -59,7 +59,7 @@ pub struct AsyncPerfMapBuffer> { } #[cfg(feature = "async_tokio")] -impl> AsyncPerfMapBuffer { +impl> AsyncPerfEventArrayBuffer { pub async fn read_events( &mut self, buffers: &mut [BytesMut], @@ -80,7 +80,7 @@ impl> AsyncPerfMapBuffer { } #[cfg(feature = "async_std")] -impl> AsyncPerfMapBuffer { +impl> AsyncPerfEventArrayBuffer { pub async fn read_events( &mut self, buffers: &mut [BytesMut], @@ -99,10 +99,10 @@ impl> AsyncPerfMapBuffer { } } -impl TryFrom for AsyncPerfMap { - type Error = PerfMapError; +impl TryFrom for AsyncPerfEventArray { + type Error = MapError; - fn try_from(a: MapRefMut) -> Result, PerfMapError> { - AsyncPerfMap::new(a) + fn try_from(a: MapRefMut) -> Result, MapError> { + AsyncPerfEventArray::new(a) } } diff --git a/aya/src/maps/perf/mod.rs b/aya/src/maps/perf/mod.rs new file mode 100644 index 00000000..8fae9fc6 --- /dev/null +++ b/aya/src/maps/perf/mod.rs @@ -0,0 +1,12 @@ +//! Receive events from eBPF programs using the linux `perf` API. +//! +//! See the [`PerfEventArray` documentation](self::PerfEventArray). +#[cfg(feature = "async")] +mod async_perf_event_array; +mod perf_buffer; +mod perf_event_array; + +#[cfg(feature = "async")] +pub use async_perf_event_array::*; +pub use perf_buffer::*; +pub use perf_event_array::*; diff --git a/aya/src/maps/perf_map/perf_buffer.rs b/aya/src/maps/perf/perf_buffer.rs similarity index 96% rename from aya/src/maps/perf_map/perf_buffer.rs rename to aya/src/maps/perf/perf_buffer.rs index 77017db1..575e890d 100644 --- a/aya/src/maps/perf_map/perf_buffer.rs +++ b/aya/src/maps/perf/perf_buffer.rs @@ -19,42 +19,54 @@ use crate::{ PERF_EVENT_IOC_DISABLE, PERF_EVENT_IOC_ENABLE, }; +/// Perf buffer error. #[derive(Error, Debug)] pub enum PerfBufferError { + /// the page count value passed to [`PerfEventArray::open`] is invalid. #[error("invalid page count {page_count}, the value must be a power of two")] InvalidPageCount { page_count: usize }, + /// `perf_event_open` failed. #[error("perf_event_open failed: {io_error}")] OpenError { #[source] io_error: io::Error, }, + /// `mmap`-ping the buffer failed. #[error("mmap failed: {io_error}")] MMapError { #[source] io_error: io::Error, }, + /// The `PERF_EVENT_IOC_ENABLE` ioctl failed #[error("PERF_EVENT_IOC_ENABLE failed: {io_error}")] PerfEventEnableError { #[source] io_error: io::Error, }, + /// `read_events()` was called with no output buffers. #[error("read_events() was called with no output buffers")] NoBuffers, + /// `read_events()` was called with a buffer that is not large enough to + /// contain the next event in the perf buffer. #[error("the buffer needs to be of at least {size} bytes")] MoreSpaceNeeded { size: usize }, + /// An IO error occurred. #[error(transparent)] IOError(#[from] io::Error), } +/// Return type of `read_events()`. #[derive(Debug, PartialEq)] pub struct Events { + /// The number of events read. pub read: usize, + /// The number of events lost. pub lost: usize, } @@ -282,14 +294,14 @@ unsafe fn mmap( #[derive(Debug)] #[repr(C)] -pub struct Sample { +struct Sample { header: perf_event_header, pub size: u32, } #[repr(C)] #[derive(Debug)] -pub struct LostSamples { +struct LostSamples { header: perf_event_header, pub id: u64, pub count: u64, diff --git a/aya/src/maps/perf/perf_event_array.rs b/aya/src/maps/perf/perf_event_array.rs new file mode 100644 index 00000000..e760ffb9 --- /dev/null +++ b/aya/src/maps/perf/perf_event_array.rs @@ -0,0 +1,209 @@ +//! A map that can be used to receive events from eBPF programs using the linux [`perf`] API +//! +//! [`perf`]: https://perf.wiki.kernel.org/index.php/Main_Page. +use std::{ + convert::TryFrom, + ops::DerefMut, + os::unix::io::{AsRawFd, RawFd}, + sync::Arc, +}; + +use bytes::BytesMut; +use libc::{sysconf, _SC_PAGESIZE}; + +use crate::{ + generated::bpf_map_type::BPF_MAP_TYPE_PERF_EVENT_ARRAY, + maps::{ + perf::{Events, PerfBuffer, PerfBufferError}, + Map, MapError, MapRefMut, + }, + sys::bpf_map_update_elem, +}; + +/// A buffer that can receive events from eBPF programs. +/// +/// [`PerfEventArrayBuffer`] is a ring buffer that can receive events from eBPF +/// programs that use `bpf_perf_event_output()`. It's returned by [`PerfEventArray::open`]. +/// +/// See the [`PerfEventArray` documentation](PerfEventArray) for an overview of how to use +/// perf buffers. +pub struct PerfEventArrayBuffer> { + _map: Arc, + buf: PerfBuffer, +} + +impl> PerfEventArrayBuffer { + /// Returns true if the buffer contains events that haven't been read. + pub fn readable(&self) -> bool { + self.buf.readable() + } + + /// Reads events from the buffer. + /// + /// This method reads events into the provided slice of buffers, filling + /// each buffer in order stopping when there are no more events to read or + /// all the buffers have been filled. + /// + /// Returns the number of events read and the number of events lost. Events + /// are lost when user space doesn't read events fast enough and the ring + /// buffer fills up. + /// + /// # Errors + /// + /// [`PerfBufferError::NoBuffers`] is returned when `out_bufs` is empty. + /// + /// [`PerfBufferError::MoreSpaceNeeded { size }`](PerfBufferError) is returned when the size of the events is + /// bigger than the size of the out_bufs provided. + pub fn read_events(&mut self, out_bufs: &mut [BytesMut]) -> Result { + self.buf.read_events(out_bufs) + } +} + +impl> AsRawFd for PerfEventArrayBuffer { + fn as_raw_fd(&self) -> RawFd { + self.buf.as_raw_fd() + } +} + +/// A map that can be used to receive events from eBPF programs using the linux [`perf`] API. +/// +/// Each element of a [`PerfEventArray`] is a separate [`PerfEventArrayBuffer`] which can be used +/// to receive events sent by eBPF programs that use `bpf_perf_event_output()`. +/// +/// To receive events you need to: +/// * call [`PerfEventArray::open`] +/// * poll the returned [`PerfEventArrayBuffer`] to be notified when events are +/// inserted in the buffer +/// * call [`PerfEventArrayBuffer::read_events`] to read the events +/// +/// # Example +/// +/// A common way to use a perf array is to have one perf buffer for each +/// available CPU: +/// +/// ```no_run +/// # use aya::maps::perf::PerfEventArrayBuffer; +/// # use aya::maps::Map; +/// # use std::ops::DerefMut; +/// # struct Poll { _t: std::marker::PhantomData }; +/// # impl> Poll { +/// # fn poll_readable(&self) -> &mut [PerfEventArrayBuffer] { +/// # &mut [] +/// # } +/// # } +/// # fn poll_buffers>(bufs: Vec>) -> Poll { +/// # Poll { _t: std::marker::PhantomData } +/// # } +/// # #[derive(thiserror::Error, Debug)] +/// # enum Error { +/// # #[error(transparent)] +/// # IO(#[from] std::io::Error), +/// # #[error(transparent)] +/// # Map(#[from] aya::maps::MapError), +/// # #[error(transparent)] +/// # Bpf(#[from] aya::BpfError), +/// # #[error(transparent)] +/// # PerfBuf(#[from] aya::maps::perf::PerfBufferError), +/// # } +/// # let bpf = aya::Bpf::load(&[], None)?; +/// use aya::maps::PerfEventArray; +/// use aya::util::online_cpus; +/// use std::convert::{TryFrom, TryInto}; +/// use bytes::BytesMut; +/// +/// let mut perf_array = PerfEventArray::try_from(bpf.map_mut("EVENTS")?)?; +/// +/// // eBPF programs are going to write to the EVENTS perf array, using the id of the CPU they're +/// // running on as the array index. +/// let mut perf_buffers = Vec::new(); +/// for cpu_id in online_cpus()? { +/// // this perf buffer will receive events generated on the CPU with id cpu_id +/// perf_buffers.push(perf_array.open(cpu_id, None)?); +/// } +/// +/// let mut out_bufs = [BytesMut::with_capacity(1024)]; +/// +/// // poll the buffers to know when they have queued events +/// let poll = poll_buffers(perf_buffers); +/// loop { +/// for read_buf in poll.poll_readable() { +/// read_buf.read_events(&mut out_bufs)?; +/// // process out_bufs +/// } +/// } +/// +/// # Ok::<(), Error>(()) +/// ``` +/// +/// # Polling and avoiding lost events +/// +/// In the example above the implementation of `poll_buffers` and `poll.poll_readable()` is not +/// given. [`PerfEventArrayBuffer`] implements the [`AsRawFd`] trait, so you can implement polling +/// using any crate that can poll file descriptors, like [epoll], [mio] etc. +/// +/// Perf buffers are internally implemented as ring buffers. If your eBPF programs produce large +/// amounts of data, in order not to lose events you might want to process each +/// [`PerfEventArrayBuffer`] on a different thread. +/// +/// # Async +/// +/// If you are using [tokio] or [async-std], you should use `AsyncPerfEventArray` which +/// efficiently integrates with those and provides a nicer `Future` based API. +/// +/// [`perf`]: https://perf.wiki.kernel.org/index.php/Main_Page +/// [epoll]: https://docs.rs/epoll +/// [mio]: https://docs.rs/mio +/// [tokio]: https://docs.rs/tokio +/// [async-std]: https://docs.rs/async-std +pub struct PerfEventArray> { + map: Arc, + page_size: usize, +} + +impl> PerfEventArray { + pub(crate) fn new(map: T) -> Result, MapError> { + let map_type = map.obj.def.map_type; + if map_type != BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 { + return Err(MapError::InvalidMapType { + map_type: map_type as u32, + })?; + } + let _fd = map.fd_or_err()?; + + Ok(PerfEventArray { + map: Arc::new(map), + // Safety: libc + page_size: unsafe { sysconf(_SC_PAGESIZE) } as usize, + }) + } + + /// Opens the perf buffer at the given index. + /// + /// The returned buffer will receive all the events eBPF programs send at the given index. + pub fn open( + &mut self, + index: u32, + page_count: Option, + ) -> Result, PerfBufferError> { + // FIXME: keep track of open buffers + + // this cannot fail as new() checks that the fd is open + let map_fd = self.map.fd_or_err().unwrap(); + let buf = PerfBuffer::open(index, self.page_size, page_count.unwrap_or(2))?; + bpf_map_update_elem(map_fd, &index, &buf.as_raw_fd(), 0) + .map_err(|(_, io_error)| io_error)?; + + Ok(PerfEventArrayBuffer { + buf, + _map: self.map.clone(), + }) + } +} + +impl TryFrom for PerfEventArray { + type Error = MapError; + + fn try_from(a: MapRefMut) -> Result, MapError> { + PerfEventArray::new(a) + } +} diff --git a/aya/src/maps/perf_map/mod.rs b/aya/src/maps/perf_map/mod.rs deleted file mode 100644 index 550b7674..00000000 --- a/aya/src/maps/perf_map/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[cfg(feature = "async")] -mod async_perf_map; -mod perf_buffer; -mod perf_map; - -#[cfg(feature = "async")] -pub use async_perf_map::*; -pub use perf_buffer::*; -pub use perf_map::*; diff --git a/aya/src/maps/perf_map/perf_map.rs b/aya/src/maps/perf_map/perf_map.rs deleted file mode 100644 index 59a3c832..00000000 --- a/aya/src/maps/perf_map/perf_map.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::{ - convert::TryFrom, - io, - ops::DerefMut, - os::unix::io::{AsRawFd, RawFd}, - sync::Arc, -}; - -use bytes::BytesMut; -use libc::{sysconf, _SC_PAGESIZE}; -use thiserror::Error; - -use crate::{ - generated::bpf_map_type::BPF_MAP_TYPE_PERF_EVENT_ARRAY, - maps::{ - perf_map::{Events, PerfBuffer, PerfBufferError}, - Map, MapError, MapRefMut, - }, - sys::bpf_map_update_elem, -}; - -#[derive(Error, Debug)] -pub enum PerfMapError { - #[error("error parsing /sys/devices/system/cpu/online")] - InvalidOnlineCpuFile, - - #[error("no CPUs specified")] - NoCpus, - - #[error("invalid cpu {cpu_id}")] - InvalidCpu { cpu_id: u32 }, - - #[error("map error: {0}")] - MapError(#[from] MapError), - - #[error("perf buffer error: {0}")] - PerfBufferError(#[from] PerfBufferError), - - #[error(transparent)] - IOError(#[from] io::Error), - - #[error("bpf_map_update_elem failed: {io_error}")] - UpdateElementError { - #[source] - io_error: io::Error, - }, -} - -pub struct PerfMapBuffer> { - _map: Arc, - buf: PerfBuffer, -} - -impl> PerfMapBuffer { - pub fn readable(&self) -> bool { - self.buf.readable() - } - - pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result { - self.buf.read_events(buffers) - } -} - -impl> AsRawFd for PerfMapBuffer { - fn as_raw_fd(&self) -> RawFd { - self.buf.as_raw_fd() - } -} - -pub struct PerfMap> { - map: Arc, - page_size: usize, -} - -impl> PerfMap { - pub fn new(map: T) -> Result, PerfMapError> { - let map_type = map.obj.def.map_type; - if map_type != BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 { - return Err(MapError::InvalidMapType { - map_type: map_type as u32, - })?; - } - let _fd = map.fd_or_err()?; - - Ok(PerfMap { - map: Arc::new(map), - // Safety: libc - page_size: unsafe { sysconf(_SC_PAGESIZE) } as usize, - }) - } - - pub fn open( - &mut self, - index: u32, - page_count: Option, - ) -> Result, PerfMapError> { - // FIXME: keep track of open buffers - - let map_fd = self.map.fd_or_err()?; - let buf = PerfBuffer::open(index, self.page_size, page_count.unwrap_or(2))?; - bpf_map_update_elem(map_fd, &index, &buf.as_raw_fd(), 0) - .map_err(|(_, io_error)| PerfMapError::UpdateElementError { io_error })?; - - Ok(PerfMapBuffer { - buf, - _map: self.map.clone(), - }) - } -} - -impl TryFrom for PerfMap { - type Error = PerfMapError; - - fn try_from(a: MapRefMut) -> Result, PerfMapError> { - PerfMap::new(a) - } -}