From fdc4dad5ff88a419d982f67568f5271c69e73f0a Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Sat, 13 Feb 2021 09:31:19 +0000 Subject: [PATCH] maps: add AsyncPerfMap When the async_tokio or async_std features are enabled, AsyncPerfMap provides an async version of PerfMap which returns a future from read_events() --- aya/Cargo.toml | 12 ++- aya/src/maps/perf_map/async_perf_map.rs | 108 ++++++++++++++++++++++++ aya/src/maps/perf_map/mod.rs | 4 + aya/src/maps/perf_map/perf_buffer.rs | 12 ++- aya/src/maps/perf_map/perf_map.rs | 4 + 5 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 aya/src/maps/perf_map/async_perf_map.rs diff --git a/aya/Cargo.toml b/aya/Cargo.toml index 7f578b46..fe8ab69b 100644 --- a/aya/Cargo.toml +++ b/aya/Cargo.toml @@ -11,6 +11,16 @@ object = "0.23" bytes = "1" lazy_static = "1" parking_lot = { version = "0.11.1", features = ["send_guard"] } +futures = "0.3.12" +tokio = { version = "1.2.0", features = ["macros", "rt", "rt-multi-thread", "net"], optional = true } +async-std = { version = "1.9.0", optional = true } +async-io = { version = "1.3", optional = true } [dev-dependencies] -matches = "0.1.8" \ No newline at end of file +matches = "0.1.8" + +[features] +default = [] +async = [] +async_tokio = ["tokio", "async"] +async_std = ["async-std", "async-io", "async"] \ No newline at end of file diff --git a/aya/src/maps/perf_map/async_perf_map.rs b/aya/src/maps/perf_map/async_perf_map.rs new file mode 100644 index 00000000..42157d59 --- /dev/null +++ b/aya/src/maps/perf_map/async_perf_map.rs @@ -0,0 +1,108 @@ +use bytes::BytesMut; +use std::{ + convert::TryFrom, + ops::DerefMut, + os::unix::prelude::{AsRawFd, RawFd}, +}; + +#[cfg(feature = "async_std")] +use async_io::Async; + +#[cfg(feature = "async_tokio")] +use tokio::io::unix::AsyncFd; + +use crate::maps::{ + perf_map::{Events, PerfBufferError, PerfMap, PerfMapBuffer, PerfMapError}, + Map, MapRefMut, +}; + +pub struct AsyncPerfMap> { + perf_map: PerfMap, +} + +impl> AsyncPerfMap { + pub fn open( + &mut self, + index: u32, + page_count: Option, + ) -> Result, PerfMapError> { + let buf = self.perf_map.open(index, page_count)?; + let fd = buf.as_raw_fd(); + Ok(AsyncPerfMapBuffer { + buf, + + #[cfg(feature = "async_tokio")] + async_fd: AsyncFd::new(fd)?, + + #[cfg(feature = "async_std")] + async_fd: Async::new(fd)?, + }) + } +} + +impl> AsyncPerfMap { + fn new(map: T) -> Result, PerfMapError> { + Ok(AsyncPerfMap { + perf_map: PerfMap::new(map)?, + }) + } +} + +pub struct AsyncPerfMapBuffer> { + buf: PerfMapBuffer, + + #[cfg(feature = "async_tokio")] + async_fd: AsyncFd, + + #[cfg(feature = "async_std")] + async_fd: Async, +} + +#[cfg(feature = "async_tokio")] +impl> AsyncPerfMapBuffer { + pub async fn read_events( + &mut self, + buffers: &mut [BytesMut], + ) -> Result { + loop { + let mut guard = self.async_fd.readable_mut().await?; + + match self.buf.read_events(buffers) { + Ok(events) if events.read > 0 || events.lost > 0 => return Ok(events), + Ok(_) => { + guard.clear_ready(); + continue; + } + Err(e) => return Err(e), + } + } + } +} + +#[cfg(feature = "async_std")] +impl> AsyncPerfMapBuffer { + pub async fn read_events( + &mut self, + buffers: &mut [BytesMut], + ) -> Result { + loop { + if !self.buf.readable() { + let _ = self.async_fd.readable().await?; + } + + match self.buf.read_events(buffers) { + Ok(events) if events.read > 0 || events.lost > 0 => return Ok(events), + Ok(_) => continue, + Err(e) => return Err(e), + } + } + } +} + +impl TryFrom for AsyncPerfMap { + type Error = PerfMapError; + + fn try_from(a: MapRefMut) -> Result, PerfMapError> { + AsyncPerfMap::new(a) + } +} diff --git a/aya/src/maps/perf_map/mod.rs b/aya/src/maps/perf_map/mod.rs index b47bc2eb..550b7674 100644 --- a/aya/src/maps/perf_map/mod.rs +++ b/aya/src/maps/perf_map/mod.rs @@ -1,5 +1,9 @@ +#[cfg(feature = "async")] +mod async_perf_map; mod perf_buffer; mod perf_map; +#[cfg(feature = "async")] +pub use async_perf_map::*; pub use perf_buffer::*; pub use perf_map::*; diff --git a/aya/src/maps/perf_map/perf_buffer.rs b/aya/src/maps/perf_map/perf_buffer.rs index bc700fd6..0759610d 100644 --- a/aya/src/maps/perf_map/perf_buffer.rs +++ b/aya/src/maps/perf_map/perf_buffer.rs @@ -105,7 +105,17 @@ impl PerfBuffer { Ok(perf_buf) } - pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result { + pub(crate) fn readable(&self) -> bool { + let header = self.buf.load(Ordering::SeqCst); + let head = unsafe { (*header).data_head } as usize; + let tail = unsafe { (*header).data_tail } as usize; + head != tail + } + + pub(crate) fn read_events( + &mut self, + buffers: &mut [BytesMut], + ) -> Result { if buffers.is_empty() { return Err(PerfBufferError::NoBuffers); } diff --git a/aya/src/maps/perf_map/perf_map.rs b/aya/src/maps/perf_map/perf_map.rs index d3dcb166..7c151dc0 100644 --- a/aya/src/maps/perf_map/perf_map.rs +++ b/aya/src/maps/perf_map/perf_map.rs @@ -45,6 +45,10 @@ pub struct PerfMapBuffer> { } impl> PerfMapBuffer { + pub fn readable(&self) -> bool { + self.buf.readable() + } + pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result { self.buf.read_events(buffers) }