bpf, perf_map: make maps usable from multiple threads

Change PerfMap API so that individual buffers can be read from multiple
threads.

Change the way maps are stored in the `Bpf` struct from RefCell to a
custom RwLock.
pull/1/head
Alessandro Decina 4 years ago
parent d7c91efb2d
commit d4e282535b

@ -10,6 +10,7 @@ thiserror = "1"
object = "0.23"
bytes = "1"
lazy_static = "1"
parking_lot = { version = "0.11.1", features = ["send_guard"] }
[dev-dependencies]
matches = "0.1.8"

@ -1,15 +1,9 @@
use std::{
cell::{Ref, RefCell, RefMut},
collections::HashMap,
convert::TryFrom,
error::Error,
io,
};
use std::{collections::HashMap, convert::TryFrom, error::Error, io};
use thiserror::Error;
use crate::{
maps::{Map, MapError},
maps::{Map, MapError, MapLock, MapRef, MapRefMut},
obj::{btf::BtfError, Object, ParseError},
programs::{KProbe, Program, ProgramData, ProgramError, SocketFilter, TracePoint, UProbe, Xdp},
sys::bpf_map_update_elem_ptr,
@ -46,7 +40,7 @@ pub(crate) struct bpf_map_def {
#[derive(Debug)]
pub struct Bpf {
maps: HashMap<String, RefCell<Map>>,
maps: HashMap<String, MapLock>,
programs: HashMap<String, Program>,
}
@ -97,29 +91,43 @@ impl Bpf {
Ok(Bpf {
maps: maps
.drain(..)
.map(|map| (map.obj.name.clone(), RefCell::new(map)))
.map(|map| (map.obj.name.clone(), MapLock::new(map)))
.collect(),
programs,
})
}
pub fn map<'a, 'slf: 'a, T: TryFrom<Ref<'a, Map>>>(
&'slf self,
pub fn map<T: TryFrom<MapRef>>(
&self,
name: &str,
) -> Result<Option<T>, <T as TryFrom<Ref<'a, Map>>>::Error> {
) -> Result<Option<T>, <T as TryFrom<MapRef>>::Error>
where
<T as TryFrom<MapRef>>::Error: From<MapError>,
{
self.maps
.get(name)
.map(|cell| T::try_from(cell.borrow()))
.map(|lock| {
T::try_from(lock.try_read().map_err(|_| MapError::BorrowError {
name: name.to_owned(),
})?)
})
.transpose()
}
pub fn map_mut<'a, 'slf: 'a, T: TryFrom<RefMut<'a, Map>>>(
&'slf self,
pub fn map_mut<T: TryFrom<MapRefMut>>(
&self,
name: &str,
) -> Result<Option<T>, <T as TryFrom<RefMut<'a, Map>>>::Error> {
) -> Result<Option<T>, <T as TryFrom<MapRefMut>>::Error>
where
<T as TryFrom<MapRefMut>>::Error: From<MapError>,
{
self.maps
.get(name)
.map(|cell| T::try_from(cell.borrow_mut()))
.map(|lock| {
T::try_from(lock.try_write().map_err(|_| MapError::BorrowError {
name: name.to_owned(),
})?)
})
.transpose()
}

@ -9,5 +9,6 @@ pub mod maps;
mod obj;
pub mod programs;
mod sys;
pub mod util;
pub use bpf::*;

@ -1,5 +1,4 @@
use std::{
cell::{Ref, RefMut},
convert::TryFrom,
marker::PhantomData,
mem,
@ -8,7 +7,7 @@ use std::{
use crate::{
generated::bpf_map_type::BPF_MAP_TYPE_HASH,
maps::{IterableMap, Map, MapError, MapIter, MapKeys},
maps::{IterableMap, Map, MapError, MapIter, MapKeys, MapRef, MapRefMut},
sys::{
bpf_map_delete_elem, bpf_map_lookup_and_delete_elem, bpf_map_lookup_elem,
bpf_map_update_elem,
@ -97,35 +96,35 @@ impl<T: Deref<Target = Map>, K: Pod, V: Pod> IterableMap<K, V> for HashMap<T, K,
}
}
impl<'a, K: Pod, V: Pod> TryFrom<Ref<'a, Map>> for HashMap<Ref<'a, Map>, K, V> {
impl<K: Pod, V: Pod> TryFrom<MapRef> for HashMap<MapRef, K, V> {
type Error = MapError;
fn try_from(inner: Ref<'a, Map>) -> Result<HashMap<Ref<'a, Map>, K, V>, MapError> {
HashMap::new(inner)
fn try_from(a: MapRef) -> Result<HashMap<MapRef, K, V>, MapError> {
HashMap::new(a)
}
}
impl<'a, K: Pod, V: Pod> TryFrom<RefMut<'a, Map>> for HashMap<RefMut<'a, Map>, K, V> {
impl<K: Pod, V: Pod> TryFrom<MapRefMut> for HashMap<MapRefMut, K, V> {
type Error = MapError;
fn try_from(inner: RefMut<'a, Map>) -> Result<HashMap<RefMut<'a, Map>, K, V>, MapError> {
HashMap::new(inner)
fn try_from(a: MapRefMut) -> Result<HashMap<MapRefMut, K, V>, MapError> {
HashMap::new(a)
}
}
impl<'a, K: Pod, V: Pod> TryFrom<&'a Map> for HashMap<&'a Map, K, V> {
type Error = MapError;
fn try_from(inner: &'a Map) -> Result<HashMap<&'a Map, K, V>, MapError> {
HashMap::new(inner)
fn try_from(a: &'a Map) -> Result<HashMap<&'a Map, K, V>, MapError> {
HashMap::new(a)
}
}
impl<'a, K: Pod, V: Pod> TryFrom<&'a mut Map> for HashMap<&'a mut Map, K, V> {
type Error = MapError;
fn try_from(inner: &'a mut Map) -> Result<HashMap<&'a mut Map, K, V>, MapError> {
HashMap::new(inner)
fn try_from(a: &'a mut Map) -> Result<HashMap<&'a mut Map, K, V>, MapError> {
HashMap::new(a)
}
}

@ -0,0 +1,79 @@
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{
mem,
ops::{Deref, DerefMut},
sync::Arc,
};
use crate::maps::Map;
pub(crate) struct MapLockError;
/* FIXME: write a full RwLock implementation that doesn't use borrowing guards
* so that try_read() and try_write() don't have to use the ugly lifetime
* extension hack */
#[derive(Debug)]
pub(crate) struct MapLock {
inner: Arc<RwLock<Map>>,
}
impl MapLock {
pub(crate) fn new(map: Map) -> MapLock {
MapLock {
inner: Arc::new(RwLock::new(map)),
}
}
pub(crate) fn try_read(&self) -> Result<MapRef, MapLockError> {
let lock: Option<RwLockReadGuard<'static, Map>> =
unsafe { mem::transmute(self.inner.try_read()) };
lock.map(|guard| MapRef {
_lock: self.inner.clone(),
guard,
})
.ok_or(MapLockError)
}
pub(crate) fn try_write(&self) -> Result<MapRefMut, MapLockError> {
let lock: Option<RwLockWriteGuard<'static, Map>> =
unsafe { mem::transmute(self.inner.try_write()) };
lock.map(|guard| MapRefMut {
_lock: self.inner.clone(),
guard,
})
.ok_or(MapLockError)
}
}
pub struct MapRef {
_lock: Arc<RwLock<Map>>,
guard: RwLockReadGuard<'static, Map>,
}
pub struct MapRefMut {
_lock: Arc<RwLock<Map>>,
guard: RwLockWriteGuard<'static, Map>,
}
impl Deref for MapRef {
type Target = Map;
fn deref(&self) -> &Map {
&*self.guard
}
}
impl Deref for MapRefMut {
type Target = Map;
fn deref(&self) -> &Map {
&*self.guard
}
}
impl DerefMut for MapRefMut {
fn deref_mut(&mut self) -> &mut Map {
&mut *self.guard
}
}

@ -8,11 +8,13 @@ use crate::{
};
mod hash_map;
mod map_lock;
mod perf_map;
mod program_array;
pub use hash_map::*;
pub use perf_map::*;
pub(crate) use map_lock::*;
pub use perf_map::{PerfMap, PerfMapBuffer};
pub use program_array::*;
#[derive(Error, Debug)]
@ -64,6 +66,12 @@ pub enum MapError {
#[error("the BPF_MAP_GET_NEXT_KEY syscall failed with code {code} io_error {io_error}")]
GetNextKeyFailed { code: i64, io_error: io::Error },
#[error("map `{name}` is borrowed mutably")]
BorrowError { name: String },
#[error("map `{name}` is already borrowed")]
BorrowMutError { name: String },
}
#[derive(Debug)]

@ -1,12 +1,13 @@
use std::{
cell::RefMut,
convert::TryFrom,
ffi::c_void,
fs, io, mem,
io, mem,
ops::DerefMut,
ptr, slice,
str::FromStr,
sync::atomic::{self, AtomicPtr, Ordering},
sync::{
atomic::{self, AtomicPtr, Ordering},
Arc,
},
};
use bytes::BytesMut;
@ -20,13 +21,11 @@ use crate::{
bpf_map_type::BPF_MAP_TYPE_PERF_EVENT_ARRAY, perf_event_header, perf_event_mmap_page,
perf_event_type::*,
},
maps::{Map, MapError},
maps::{Map, MapError, MapLockWriteGuard},
sys::{bpf_map_update_elem, perf_event_ioctl, perf_event_open},
RawFd, PERF_EVENT_IOC_DISABLE, PERF_EVENT_IOC_ENABLE,
};
const ONLINE_CPUS: &str = "/sys/devices/system/cpu/online";
#[derive(Error, Debug)]
pub enum PerfBufferError {
#[error("invalid page count {page_count}, the value must be a power of two")]
@ -274,18 +273,24 @@ pub enum PerfMapError {
},
}
pub struct PerfMapBuffer<T: DerefMut<Target = Map>> {
_map: Arc<T>,
buf: PerfBuffer,
}
impl<T: DerefMut<Target = Map>> PerfMapBuffer<T> {
pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result<Events, PerfBufferError> {
self.buf.read_events(buffers)
}
}
pub struct PerfMap<T: DerefMut<Target = Map>> {
map: T,
cpu_fds: Vec<(u32, RawFd)>,
buffers: Vec<Option<PerfBuffer>>,
map: Arc<T>,
page_size: usize,
}
impl<T: DerefMut<Target = Map>> PerfMap<T> {
pub fn new(
map: T,
cpu_ids: Option<Vec<u32>>,
page_count: Option<usize>,
) -> Result<PerfMap<T>, PerfMapError> {
pub fn new(map: T) -> Result<PerfMap<T>, PerfMapError> {
let map_type = map.obj.def.map_type;
if map_type != BPF_MAP_TYPE_PERF_EVENT_ARRAY {
return Err(MapError::InvalidMapType {
@ -293,95 +298,38 @@ impl<T: DerefMut<Target = Map>> PerfMap<T> {
})?;
}
let mut cpu_ids = match cpu_ids {
Some(ids) => ids,
None => online_cpus().map_err(|_| PerfMapError::InvalidOnlineCpuFile)?,
};
if cpu_ids.is_empty() {
return Err(PerfMapError::NoCpus);
}
cpu_ids.sort();
let min_cpu = cpu_ids.first().unwrap();
let max_cpu = cpu_ids.last().unwrap();
let mut buffers = (*min_cpu..=*max_cpu).map(|_| None).collect::<Vec<_>>();
let map_fd = map.fd_or_err()?;
let page_size = unsafe { sysconf(_SC_PAGESIZE) } as usize;
let mut cpu_fds = Vec::new();
for cpu_id in &cpu_ids {
let buf = PerfBuffer::open(*cpu_id, page_size, page_count.unwrap_or(2))?;
bpf_map_update_elem(map_fd, cpu_id, &buf.fd, 0)
.map_err(|(_, io_error)| PerfMapError::UpdateElementFailed { io_error })?;
cpu_fds.push((*cpu_id, buf.fd));
buffers[*cpu_id as usize] = Some(buf);
}
Ok(PerfMap {
map,
cpu_fds,
buffers,
map: Arc::new(map),
// Safety: libc
page_size: unsafe { sysconf(_SC_PAGESIZE) } as usize,
})
}
pub fn cpu_file_descriptors(&self) -> &[(u32, RawFd)] {
self.cpu_fds.as_slice()
}
pub fn read_cpu_events(
pub fn open(
&mut self,
cpu_id: u32,
buffers: &mut [BytesMut],
) -> Result<Events, PerfMapError> {
let buf = match self.buffers.get_mut(cpu_id as usize) {
None | Some(None) => return Err(PerfMapError::InvalidCpu { cpu_id }),
Some(Some(buf)) => buf,
};
Ok(buf.read_events(buffers)?)
}
}
index: u32,
page_count: Option<usize>,
) -> Result<PerfMapBuffer<T>, PerfMapError> {
// FIXME: keep track of open buffers
impl<'a> TryFrom<RefMut<'a, Map>> for PerfMap<RefMut<'a, Map>> {
type Error = PerfMapError;
let map_fd = self.map.fd_or_err()?;
let buf = PerfBuffer::open(index, self.page_size, page_count.unwrap_or(2))?;
bpf_map_update_elem(map_fd, &index, &buf.fd, 0)
.map_err(|(_, io_error)| PerfMapError::UpdateElementFailed { io_error })?;
fn try_from(a: RefMut<'a, Map>) -> Result<PerfMap<RefMut<'a, Map>>, PerfMapError> {
PerfMap::new(a, None, None)
Ok(PerfMapBuffer {
buf,
_map: self.map.clone(),
})
}
}
impl<'a> TryFrom<&'a mut Map> for PerfMap<&'a mut Map> {
impl TryFrom<MapLockWriteGuard> for PerfMap<MapLockWriteGuard> {
type Error = PerfMapError;
fn try_from(a: &'a mut Map) -> Result<PerfMap<&'a mut Map>, PerfMapError> {
PerfMap::new(a, None, None)
}
}
pub fn online_cpus() -> Result<Vec<u32>, ()> {
let data = fs::read_to_string(ONLINE_CPUS).map_err(|_| ())?;
parse_online_cpus(data.trim())
}
fn parse_online_cpus(data: &str) -> Result<Vec<u32>, ()> {
let mut cpus = Vec::new();
for range in data.split(',') {
cpus.extend({
match range
.splitn(2, '-')
.map(u32::from_str)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| ())?
.as_slice()
{
&[] | &[_, _, _, ..] => return Err(()),
&[start] => start..=start,
&[start, end] => start..=end,
}
})
fn try_from(a: MapLockWriteGuard) -> Result<PerfMap<MapLockWriteGuard>, PerfMapError> {
PerfMap::new(a)
}
Ok(cpus)
}
#[cfg_attr(test, allow(unused_variables))]
@ -425,21 +373,7 @@ mod tests {
generated::perf_event_mmap_page,
sys::{override_syscall, Syscall, TEST_MMAP_RET},
};
use std::{convert::TryInto, fmt::Debug, iter::FromIterator, mem};
#[test]
fn test_parse_online_cpus() {
assert_eq!(parse_online_cpus("0").unwrap(), vec![0]);
assert_eq!(parse_online_cpus("0,1").unwrap(), vec![0, 1]);
assert_eq!(parse_online_cpus("0,1,2").unwrap(), vec![0, 1, 2]);
assert_eq!(parse_online_cpus("0-7").unwrap(), Vec::from_iter(0..=7));
assert_eq!(parse_online_cpus("0-3,4-7").unwrap(), Vec::from_iter(0..=7));
assert_eq!(parse_online_cpus("0-5,6,7").unwrap(), Vec::from_iter(0..=7));
assert!(parse_online_cpus("").is_err());
assert!(parse_online_cpus("0-1,2-").is_err());
assert!(parse_online_cpus("foo").is_err());
}
use std::{convert::TryInto, fmt::Debug, mem};
const PAGE_SIZE: usize = 4096;
union MMappedBuf {

@ -0,0 +1,117 @@
use std::{
convert::TryFrom,
io,
ops::DerefMut,
os::unix::io::{AsRawFd, RawFd},
sync::Arc,
};
use bytes::BytesMut;
use libc::{sysconf, _SC_PAGESIZE};
use thiserror::Error;
use crate::{
generated::bpf_map_type::BPF_MAP_TYPE_PERF_EVENT_ARRAY,
maps::{
perf_map::{Events, PerfBuffer, PerfBufferError},
Map, MapError, MapRefMut,
},
sys::bpf_map_update_elem,
};
#[derive(Error, Debug)]
pub enum PerfMapError {
#[error("error parsing /sys/devices/system/cpu/online")]
InvalidOnlineCpuFile,
#[error("no CPUs specified")]
NoCpus,
#[error("invalid cpu {cpu_id}")]
InvalidCpu { cpu_id: u32 },
#[error("map error: {0}")]
MapError(#[from] MapError),
#[error("perf buffer error: {0}")]
PerfBufferError(#[from] PerfBufferError),
#[error(transparent)]
IOError(#[from] io::Error),
#[error("bpf_map_update_elem failed: {io_error}")]
UpdateElementError {
#[source]
io_error: io::Error,
},
}
pub struct PerfMapBuffer<T: DerefMut<Target = Map>> {
_map: Arc<T>,
buf: PerfBuffer,
}
impl<T: DerefMut<Target = Map>> PerfMapBuffer<T> {
pub fn readable(&self) -> bool {
self.buf.readable()
}
pub fn read_events(&mut self, buffers: &mut [BytesMut]) -> Result<Events, PerfBufferError> {
self.buf.read_events(buffers)
}
}
impl<T: DerefMut<Target = Map>> AsRawFd for PerfMapBuffer<T> {
fn as_raw_fd(&self) -> RawFd {
self.buf.as_raw_fd()
}
}
pub struct PerfMap<T: DerefMut<Target = Map>> {
map: Arc<T>,
page_size: usize,
}
impl<T: DerefMut<Target = Map>> PerfMap<T> {
pub fn new(map: T) -> Result<PerfMap<T>, PerfMapError> {
let map_type = map.obj.def.map_type;
if map_type != BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 {
return Err(MapError::InvalidMapType {
map_type: map_type as u32,
})?;
}
let _fd = map.fd_or_err()?;
Ok(PerfMap {
map: Arc::new(map),
// Safety: libc
page_size: unsafe { sysconf(_SC_PAGESIZE) } as usize,
})
}
pub fn open(
&mut self,
index: u32,
page_count: Option<usize>,
) -> Result<PerfMapBuffer<T>, PerfMapError> {
// FIXME: keep track of open buffers
let map_fd = self.map.fd_or_err()?;
let buf = PerfBuffer::open(index, self.page_size, page_count.unwrap_or(2))?;
bpf_map_update_elem(map_fd, &index, &buf.as_raw_fd(), 0)
.map_err(|(_, io_error)| PerfMapError::UpdateElementError { io_error })?;
Ok(PerfMapBuffer {
buf,
_map: self.map.clone(),
})
}
}
impl TryFrom<MapRefMut> for PerfMap<MapRefMut> {
type Error = PerfMapError;
fn try_from(a: MapRefMut) -> Result<PerfMap<MapRefMut>, PerfMapError> {
PerfMap::new(a)
}
}

@ -1,5 +1,4 @@
use std::{
cell::{Ref, RefMut},
convert::TryFrom,
mem,
ops::{Deref, DerefMut},
@ -8,7 +7,7 @@ use std::{
use crate::{
generated::bpf_map_type::BPF_MAP_TYPE_PROG_ARRAY,
maps::{IterableMap, Map, MapError, MapIter, MapKeys},
maps::{IterableMap, Map, MapError, MapIter, MapKeys, MapRef, MapRefMut},
programs::ProgramFd,
sys::{
bpf_map_delete_elem, bpf_map_lookup_and_delete_elem, bpf_map_lookup_elem,
@ -111,18 +110,18 @@ impl<T: Deref<Target = Map>> IterableMap<u32, RawFd> for ProgramArray<T> {
}
}
impl<'a> TryFrom<Ref<'a, Map>> for ProgramArray<Ref<'a, Map>> {
impl TryFrom<MapRef> for ProgramArray<MapRef> {
type Error = MapError;
fn try_from(inner: Ref<'a, Map>) -> Result<ProgramArray<Ref<'a, Map>>, MapError> {
ProgramArray::new(inner)
fn try_from(a: MapRef) -> Result<ProgramArray<MapRef>, MapError> {
ProgramArray::new(a)
}
}
impl<'a> TryFrom<RefMut<'a, Map>> for ProgramArray<RefMut<'a, Map>> {
impl TryFrom<MapRefMut> for ProgramArray<MapRefMut> {
type Error = MapError;
fn try_from(inner: RefMut<'a, Map>) -> Result<ProgramArray<RefMut<'a, Map>>, MapError> {
ProgramArray::new(inner)
fn try_from(a: MapRefMut) -> Result<ProgramArray<MapRefMut>, MapError> {
ProgramArray::new(a)
}
}

@ -50,7 +50,6 @@ fn syscall(call: Syscall) -> SysResult {
#[cfg(not(test))]
unsafe fn syscall_impl(call: Syscall) -> SysResult {
use libc::{SYS_bpf, SYS_perf_event_open};
use std::mem;
use Syscall::*;
let ret = match call {

@ -0,0 +1,54 @@
use std::{fs, io, str::FromStr};
const ONLINE_CPUS: &str = "/sys/devices/system/cpu/online";
pub fn online_cpus() -> Result<Vec<u32>, io::Error> {
let data = fs::read_to_string(ONLINE_CPUS)?;
parse_online_cpus(data.trim()).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
format!("unexpected {} format", ONLINE_CPUS),
)
})
}
fn parse_online_cpus(data: &str) -> Result<Vec<u32>, ()> {
let mut cpus = Vec::new();
for range in data.split(',') {
cpus.extend({
match range
.splitn(2, '-')
.map(u32::from_str)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| ())?
.as_slice()
{
&[] | &[_, _, _, ..] => return Err(()),
&[start] => start..=start,
&[start, end] => start..=end,
}
})
}
Ok(cpus)
}
#[cfg(test)]
mod tests {
use std::iter::FromIterator;
use super::*;
#[test]
fn test_parse_online_cpus() {
assert_eq!(parse_online_cpus("0").unwrap(), vec![0]);
assert_eq!(parse_online_cpus("0,1").unwrap(), vec![0, 1]);
assert_eq!(parse_online_cpus("0,1,2").unwrap(), vec![0, 1, 2]);
assert_eq!(parse_online_cpus("0-7").unwrap(), Vec::from_iter(0..=7));
assert_eq!(parse_online_cpus("0-3,4-7").unwrap(), Vec::from_iter(0..=7));
assert_eq!(parse_online_cpus("0-5,6,7").unwrap(), Vec::from_iter(0..=7));
assert!(parse_online_cpus("").is_err());
assert!(parse_online_cpus("0-1,2-").is_err());
assert!(parse_online_cpus("foo").is_err());
}
}
Loading…
Cancel
Save