feat(aya): Add task storage map type (in the user-space)

Task storage is a type of map which uses `task_struct` kernel type as
a key. When the task (process) stops, the corresponding entry is
automatically removed.

This change add support only in the user-space and tests the
functionality with a C program.
reviewable/pr1161/r3
Michal Rostecki 3 weeks ago
parent 76ca85c8c3
commit 730b8d2f32

@ -734,6 +734,7 @@ fn parse_map(
BPF_MAP_TYPE_DEVMAP => Map::DevMap(map),
BPF_MAP_TYPE_DEVMAP_HASH => Map::DevMapHash(map),
BPF_MAP_TYPE_XSKMAP => Map::XskMap(map),
BPF_MAP_TYPE_TASK_STORAGE => Map::TaskStorage(map),
m_type => {
if allow_unsupported_maps {
Map::Unsupported(map)

@ -85,6 +85,7 @@ pub mod ring_buf;
pub mod sock;
pub mod stack;
pub mod stack_trace;
pub mod task_storage;
pub mod xdp;
pub use array::{Array, PerCpuArray, ProgramArray};
@ -101,6 +102,7 @@ pub use ring_buf::RingBuf;
pub use sock::{SockHash, SockMap};
pub use stack::Stack;
pub use stack_trace::StackTraceMap;
pub use task_storage::TaskStorage;
pub use xdp::{CpuMap, DevMap, DevMapHash, XskMap};
#[derive(Error, Debug)]
@ -312,6 +314,8 @@ pub enum Map {
Stack(MapData),
/// A [`StackTraceMap`] map.
StackTraceMap(MapData),
/// A [`TaskStorage`] map.
TaskStorage(MapData),
/// An unsupported map type.
Unsupported(MapData),
/// A [`XskMap`] map.
@ -341,6 +345,7 @@ impl Map {
Self::SockMap(map) => map.obj.map_type(),
Self::Stack(map) => map.obj.map_type(),
Self::StackTraceMap(map) => map.obj.map_type(),
Self::TaskStorage(map) => map.obj.map_type(),
Self::Unsupported(map) => map.obj.map_type(),
Self::XskMap(map) => map.obj.map_type(),
}
@ -371,6 +376,7 @@ impl Map {
Self::SockMap(map) => map.pin(path),
Self::Stack(map) => map.pin(path),
Self::StackTraceMap(map) => map.pin(path),
Self::TaskStorage(map) => map.pin(path),
Self::Unsupported(map) => map.pin(path),
Self::XskMap(map) => map.pin(path),
}
@ -420,6 +426,7 @@ impl_map_pin!((V) {
BloomFilter,
Queue,
Stack,
TaskStorage,
});
impl_map_pin!((K, V) {
@ -502,6 +509,7 @@ impl_try_from_map!((V) {
Queue,
SockHash,
Stack,
TaskStorage,
});
impl_try_from_map!((K, V) {

@ -0,0 +1,152 @@
//! Task storage.
use std::{
borrow::Borrow,
ffi::c_int,
marker::PhantomData,
os::fd::{AsFd, AsRawFd},
};
use crate::{
maps::{check_kv_size, MapData, MapError},
sys::{bpf_map_lookup_elem, PidFd, SyscallError},
Pod,
};
/// Task storage is a type of map which uses `task_struct` kernel type as a
/// key. When the task (process) stops, the corresponding entry is
/// automatically removed.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 5.12.
///
/// # Examples
///
/// ```no_run
/// # let mut ebpf = aya::Ebpf::load(&[])?;
/// use aya::maps::TaskStorage;
///
/// let mut task_storage: TaskStorage<_, u32> = TaskStorage::try_from(ebpf.map_mut("TASK_STORAGE").unwrap())?;
///
/// let pid = 0;
/// let value = task_storage.get(&pid, 0)?;
/// # Ok::<(), aya::EbpfError>(())
/// ```
#[doc(alias = "BPF_MAP_TYPE_TASK_STORAGE")]
#[derive(Debug)]
pub struct TaskStorage<T, V> {
pub(crate) inner: T,
_v: PhantomData<V>,
}
impl<T: Borrow<MapData>, V: Pod> TaskStorage<T, V> {
pub(crate) fn new(map: T) -> Result<Self, MapError> {
let data = map.borrow();
check_kv_size::<c_int, V>(data)?;
Ok(Self {
inner: map,
_v: PhantomData,
})
}
/// Returns the value stored for the given `pid`.
pub fn get(&self, pid: &u32, flags: u64) -> Result<V, MapError> {
let pidfd = PidFd::open(*pid, 0).map_err(|(_, io_error)| SyscallError {
call: "pidfd_open",
io_error,
})?;
let map_fd = self.inner.borrow().fd().as_fd();
let value =
bpf_map_lookup_elem(map_fd, &pidfd.as_raw_fd(), flags).map_err(|(_, io_error)| {
SyscallError {
call: "bpf_map_lookup_elem",
io_error,
}
})?;
value.ok_or(MapError::KeyNotFound)
}
}
#[cfg(test)]
mod tests {
use std::io;
use assert_matches::assert_matches;
use aya_obj::generated::bpf_map_type::BPF_MAP_TYPE_TASK_STORAGE;
use libc::EFAULT;
use super::*;
use crate::{
maps::{
test_utils::{self, new_map},
Map,
},
sys::{override_syscall, SysResult, Syscall},
};
fn new_obj_map() -> aya_obj::Map {
test_utils::new_obj_map::<u32>(BPF_MAP_TYPE_TASK_STORAGE)
}
fn sys_error(value: i32) -> SysResult<i64> {
Err((-1, io::Error::from_raw_os_error(value)))
}
#[test]
fn test_wrong_value_size() {
let map = new_map(new_obj_map());
let map = Map::TaskStorage(map);
assert_matches!(
TaskStorage::<_, u16>::try_from(&map),
Err(MapError::InvalidValueSize {
size: 2,
expected: 4
})
);
}
#[test]
fn test_try_from_wrong_map() {
let map = new_map(new_obj_map());
let map = Map::Array(map);
assert_matches!(
TaskStorage::<_, u32>::try_from(&map),
Err(MapError::InvalidMapType { .. })
);
}
#[test]
fn test_new_ok() {
let map = new_map(new_obj_map());
assert!(TaskStorage::<_, u32>::new(&map).is_ok());
}
#[test]
fn test_try_from_ok() {
let map = new_map(new_obj_map());
let map = Map::TaskStorage(map);
assert!(TaskStorage::<_, u32>::try_from(&map).is_ok());
}
#[test]
fn test_get_pidfd_syscall_error() {
let mut map = new_map(new_obj_map());
let map = TaskStorage::<_, u32>::new(&mut map).unwrap();
override_syscall(|call| match call {
Syscall::Ebpf { .. } => Ok(1),
Syscall::PidfdOpen { .. } => sys_error(EFAULT),
_ => sys_error(EFAULT),
});
assert_matches!(
map.get(&1, 0), Err(MapError::SyscallError(
SyscallError {
call: "pidfd_open",
io_error
}
))
if io_error.raw_os_error() == Some(EFAULT)
);
}
}

@ -3,6 +3,7 @@
mod bpf;
mod netlink;
mod perf_event;
mod pid_fd;
#[cfg(test)]
mod fake;
@ -17,11 +18,12 @@ use aya_obj::generated::{bpf_attr, bpf_cmd, perf_event_attr};
pub(crate) use bpf::*;
#[cfg(test)]
pub(crate) use fake::*;
use libc::{pid_t, SYS_bpf, SYS_ioctl, SYS_perf_event_open};
use libc::{pid_t, SYS_bpf, SYS_ioctl, SYS_perf_event_open, SYS_pidfd_open};
#[doc(hidden)]
pub use netlink::netlink_set_link_up;
pub(crate) use netlink::*;
pub(crate) use perf_event::*;
pub(crate) use pid_fd::*;
use thiserror::Error;
pub(crate) type SysResult<T> = Result<T, (c_long, io::Error)>;
@ -43,6 +45,10 @@ pub(crate) enum Syscall<'a> {
request: c_int,
arg: c_int,
},
PidfdOpen {
pid: pid_t,
flags: u32,
},
}
/// A system call error.
@ -84,6 +90,11 @@ impl std::fmt::Debug for Syscall<'_> {
.field("request", request)
.field("arg", arg)
.finish(),
Self::PidfdOpen { pid, flags } => f
.debug_struct("Syscall::PidfdOpen")
.field("pid", pid)
.field("flags", flags)
.finish(),
}
}
}
@ -109,6 +120,7 @@ fn syscall(call: Syscall<'_>) -> SysResult<c_long> {
Syscall::PerfEventIoctl { fd, request, arg } => {
libc::syscall(SYS_ioctl, fd.as_raw_fd(), request, arg)
}
Syscall::PidfdOpen { pid, flags } => libc::syscall(SYS_pidfd_open, pid, flags),
}
};

@ -0,0 +1,40 @@
use std::os::fd::{AsRawFd, FromRawFd, RawFd};
use libc::pid_t;
use crate::{
sys::{SysResult, Syscall},
MockableFd,
};
/// A file descriptor of a process.
///
/// A similar type is provided by the Rust standard library as
/// [`std::os::linux::process`] as a nigtly-only experimental API. We are
/// planning to migrate to it once it stabilizes.
pub(crate) struct PidFd(MockableFd);
impl PidFd {
pub(crate) fn open(pid: u32, flags: u32) -> SysResult<Self> {
let pid_fd = pidfd_open(pid, flags)? as RawFd;
let pid_fd = unsafe { MockableFd::from_raw_fd(pid_fd) };
Ok(Self(pid_fd))
}
}
impl AsRawFd for PidFd {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
fn pidfd_open(pid: u32, flags: u32) -> SysResult<i64> {
let call = Syscall::PidfdOpen {
pid: pid as pid_t,
flags,
};
#[cfg(not(test))]
return crate::sys::syscall(call);
#[cfg(test)]
return crate::sys::TEST_SYSCALL.with(|test_impl| unsafe { test_impl.borrow()(call) });
}

@ -0,0 +1,26 @@
// clang-format off
#include <vmlinux.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_core_read.h>
#include <bpf/bpf_tracing.h>
// clang-format on
char _license[] SEC("license") = "GPL";
struct {
__uint(type, BPF_MAP_TYPE_TASK_STORAGE);
__uint(map_flags, BPF_F_NO_PREALLOC);
__type(key, int);
__type(value, __u32);
} task_storage SEC(".maps");
SEC("fexit/sched_post_fork")
int BPF_PROG(sched_post_fork, struct task_struct *task) {
__u32 value = 1;
bpf_task_storage_get(&task_storage, task, &value,
BPF_LOCAL_STORAGE_GET_F_CREATE);
pid_t pid = BPF_CORE_READ(task, pid);
bpf_printk("sched_post_fork: id: %d\n", pid);
return 0;
}

@ -67,6 +67,7 @@ fn main() -> Result<()> {
("main.bpf.c", false),
("multimap-btf.bpf.c", false),
("reloc.bpf.c", true),
("task_storage.bpf.c", true),
("text_64_64_reloc.c", false),
("variables_reloc.bpf.c", false),
];

@ -8,6 +8,8 @@ pub const MULTIMAP_BTF: &[u8] =
pub const RELOC_BPF: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/reloc.bpf.o"));
pub const RELOC_BTF: &[u8] =
include_bytes_aligned!(concat!(env!("OUT_DIR"), "/reloc.bpf.target.o"));
pub const TASK_STORAGE: &[u8] =
include_bytes_aligned!(concat!(env!("OUT_DIR"), "/task_storage.bpf.o"));
pub const TEXT_64_64_RELOC: &[u8] =
include_bytes_aligned!(concat!(env!("OUT_DIR"), "/text_64_64_reloc.o"));
pub const VARIABLES_RELOC: &[u8] =

@ -11,6 +11,7 @@ mod relocations;
mod ring_buf;
mod smoke;
mod strncmp;
mod task_storage;
mod tcx;
mod uprobe_cookie;
mod xdp;

@ -0,0 +1,67 @@
use std::{
process,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Condvar, Mutex,
},
thread::{self, sleep},
time::Duration,
};
use aya::{maps::TaskStorage, programs::FExit, Btf, Ebpf};
use test_log::test;
#[test]
fn test_task_storage_get() {
let mut ebpf = Ebpf::load(crate::TASK_STORAGE).unwrap();
let prog: &mut FExit = ebpf
.program_mut("sched_post_fork")
.unwrap()
.try_into()
.unwrap();
let btf = Btf::from_sys_fs().unwrap();
prog.load("sched_post_fork", &btf).unwrap();
prog.attach().unwrap();
let task_storage: TaskStorage<_, u32> =
TaskStorage::try_from(ebpf.map_mut("task_storage").unwrap()).unwrap();
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let stop = Arc::new(AtomicBool::new(false));
let child = thread::spawn({
let pair = Arc::clone(&pair);
let stop = Arc::clone(&stop);
move || {
// `task_struct.pid`[0] in the kernel doesn't differentiate between
// PID and TID
let pid = unsafe { libc::getpid() } as u32;
let tid = unsafe { libc::gettid() } as u32;
println!("pid: {pid}, tid: {tid}");
let (lock, cvar) = &*pair;
*lock.lock().unwrap() = Some(tid);
cvar.notify_one();
while !stop.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
}
});
let (lock, cvar) = &*pair;
let mut tid = lock.lock().unwrap();
while tid.is_none() {
tid = cvar.wait(tid).unwrap();
}
let tid = tid.unwrap();
sleep(Duration::from_millis(100));
let value = task_storage.get(&tid, 0).unwrap();
assert_eq!(value, 1);
stop.store(true, Ordering::Relaxed);
child.join().unwrap();
}
Loading…
Cancel
Save