pull/1328/merge
Thibaut Lapierre 2 days ago committed by GitHub
commit c88dffb883
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -16,6 +16,7 @@ mod perf_event;
mod raw_tracepoint;
mod sk_lookup;
mod sk_msg;
mod sk_reuseport;
mod sk_skb;
mod sock_ops;
mod socket_filter;
@ -42,6 +43,7 @@ use proc_macro::TokenStream;
use raw_tracepoint::RawTracePoint;
use sk_lookup::SkLookup;
use sk_msg::SkMsg;
use sk_reuseport::SkReuseport;
use sk_skb::{SkSkb, SkSkbKind};
use sock_ops::SockOps;
use socket_filter::SocketFilter;
@ -612,6 +614,67 @@ pub fn sk_lookup(attrs: TokenStream, item: TokenStream) -> TokenStream {
.into()
}
/// Marks a function as an eBPF Socket Reuseport program that can be attached to
/// a socket with SO_REUSEPORT set.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 4.19
///
/// # Examples
///
/// Basic usage allowing kernel to handle socket selection:
///
/// ```no_run
/// use aya_ebpf::{macros::sk_reuseport, programs::{SkReuseportContext, SK_PASS, SK_DROP}};
///
/// #[sk_reuseport]
/// pub fn select_socket(_ctx: SkReuseportContext) -> u32 {
/// // Return SK_DROP to drop packet, SK_PASS to let kernel handle selection
/// SK_PASS
/// }
/// ```
///
/// Advanced usage with custom socket selection:
///
/// ```no_run
/// use aya_ebpf::{
/// macros::{sk_reuseport, map},
/// programs::{SkReuseportContext, SK_PASS, SK_DROP},
/// helpers::bpf_sk_select_reuseport,
/// maps::ReusePortSockArray,
/// EbpfContext,
/// };
///
/// #[map(name = "socket_map")]
/// static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(4, 0);
///
/// #[sk_reuseport]
/// pub fn load_balance(ctx: SkReuseportContext) -> u32 {
/// // Use packet hash for consistent load balancing
/// let socket_idx = ctx.hash() % 4;
///
/// let ret = unsafe {
/// bpf_sk_select_reuseport(
/// ctx.as_ptr() as *mut _,
/// SOCKET_MAP.as_ptr(),
/// &socket_idx as *const _ as *mut _,
/// 0
/// )
/// };
///
/// if ret == 0 { SK_PASS } else { SK_DROP }
/// }
/// ```
#[proc_macro_attribute]
pub fn sk_reuseport(attrs: TokenStream, item: TokenStream) -> TokenStream {
match SkReuseport::parse(attrs.into(), item.into()) {
Ok(prog) => prog.expand(),
Err(err) => err.emit_as_expr_tokens(),
}
.into()
}
/// Marks a function as a cgroup device eBPF program that can be attached to a
/// cgroup.
///

@ -0,0 +1,71 @@
use proc_macro2::TokenStream;
use proc_macro2_diagnostics::{Diagnostic, SpanDiagnosticExt as _};
use quote::quote;
use syn::{ItemFn, spanned::Spanned as _};
pub(crate) struct SkReuseport {
item: ItemFn,
}
impl SkReuseport {
pub(crate) fn parse(attrs: TokenStream, item: TokenStream) -> Result<Self, Diagnostic> {
if !attrs.is_empty() {
return Err(attrs.span().error("unexpected attribute"));
}
let item = syn::parse2(item)?;
Ok(Self { item })
}
pub(crate) fn expand(&self) -> TokenStream {
let Self { item } = self;
let ItemFn {
attrs: _,
vis,
sig,
block: _,
} = item;
let fn_name = &sig.ident;
quote! {
#[unsafe(no_mangle)]
#[unsafe(link_section = "sk_reuseport")]
#vis fn #fn_name(ctx: *mut ::aya_ebpf::bindings::sk_reuseport_md) -> u32 {
return #fn_name(::aya_ebpf::programs::SkReuseportContext::new(ctx));
#item
}
}
}
}
#[cfg(test)]
mod tests {
use syn::parse_quote;
use super::*;
#[test]
fn test_sk_reuseport() {
let prog = SkReuseport::parse(
parse_quote! {},
parse_quote! {
fn prog(ctx: &mut ::aya_ebpf::programs::SkReuseportContext) -> u32 {
0
}
},
)
.unwrap();
let expanded = prog.expand();
let expected = quote! {
#[unsafe(no_mangle)]
#[unsafe(link_section = "sk_reuseport")]
fn prog(ctx: *mut ::aya_ebpf::bindings::sk_reuseport_md) -> u32 {
return prog(::aya_ebpf::programs::SkReuseportContext::new(ctx));
fn prog(ctx: &mut ::aya_ebpf::programs::SkReuseportContext) -> u32 {
0
}
}
};
assert_eq!(expected.to_string(), expanded.to_string());
}
}

@ -215,7 +215,7 @@ pub struct Function {
/// - `lwt_in`, `lwt_out`, `lwt_seg6local`, `lwt_xmit`
/// - `raw_tp.w+`, `raw_tracepoint.w+`
/// - `action`
/// - `sk_reuseport/migrate`, `sk_reuseport`
/// - `sk_reuseport/migrate`
/// - `syscall`
/// - `struct_ops+`
/// - `fmod_ret+`, `fmod_ret.s+`
@ -268,6 +268,7 @@ pub enum ProgramSection {
FlowDissector,
Extension,
SkLookup,
SkReuseport,
CgroupSock {
attach_type: CgroupSockAttachType,
},
@ -427,6 +428,7 @@ impl FromStr for ProgramSection {
"flow_dissector" => FlowDissector,
"freplace" => Extension,
"sk_lookup" => SkLookup,
"sk_reuseport" => SkReuseport,
"iter" => Iter { sleepable: false },
"iter.s" => Iter { sleepable: true },
_ => {

@ -25,8 +25,8 @@ use crate::{
BtfTracePoint, CgroupDevice, CgroupSkb, CgroupSkbAttachType, CgroupSock, CgroupSockAddr,
CgroupSockopt, CgroupSysctl, Extension, FEntry, FExit, FlowDissector, Iter, KProbe,
LircMode2, Lsm, PerfEvent, ProbeKind, Program, ProgramData, ProgramError, RawTracePoint,
SchedClassifier, SkLookup, SkMsg, SkSkb, SkSkbKind, SockOps, SocketFilter, TracePoint,
UProbe, Xdp,
SchedClassifier, SkLookup, SkMsg, SkReuseport, SkSkb, SkSkbKind, SockOps, SocketFilter,
TracePoint, UProbe, Xdp,
},
sys::{
bpf_load_btf, is_bpf_cookie_supported, is_bpf_global_data_supported,
@ -429,6 +429,7 @@ impl<'a> EbpfLoader<'a> {
| ProgramSection::PerfEvent
| ProgramSection::RawTracePoint
| ProgramSection::SkLookup
| ProgramSection::SkReuseport
| ProgramSection::FlowDissector
| ProgramSection::CgroupSock { attach_type: _ }
| ProgramSection::CgroupDevice => {}
@ -670,6 +671,9 @@ impl<'a> EbpfLoader<'a> {
ProgramSection::SkLookup => Program::SkLookup(SkLookup {
data: ProgramData::new(prog_name, obj, btf_fd, *verifier_log_level),
}),
ProgramSection::SkReuseport => Program::SkReuseport(SkReuseport {
data: ProgramData::new(prog_name, obj, btf_fd, *verifier_log_level),
}),
ProgramSection::CgroupSock { attach_type, .. } => {
Program::CgroupSock(CgroupSock {
data: ProgramData::new(prog_name, obj, btf_fd, *verifier_log_level),
@ -716,6 +720,7 @@ fn parse_map(
BPF_MAP_TYPE_PERCPU_HASH => Map::PerCpuHashMap(map),
BPF_MAP_TYPE_LRU_PERCPU_HASH => Map::PerCpuLruHashMap(map),
BPF_MAP_TYPE_PERF_EVENT_ARRAY => Map::PerfEventArray(map),
BPF_MAP_TYPE_REUSEPORT_SOCKARRAY => Map::ReusePortSockArray(map),
BPF_MAP_TYPE_RINGBUF => Map::RingBuf(map),
BPF_MAP_TYPE_SOCKHASH => Map::SockHash(map),
BPF_MAP_TYPE_SOCKMAP => Map::SockMap(map),

@ -95,7 +95,7 @@ pub use lpm_trie::LpmTrie;
pub use perf::PerfEventArray;
pub use queue::Queue;
pub use ring_buf::RingBuf;
pub use sock::{SockHash, SockMap};
pub use sock::{ReusePortSockArray, SockHash, SockMap};
pub use stack::Stack;
pub use stack_trace::StackTraceMap;
pub use xdp::{CpuMap, DevMap, DevMapHash, XskMap};
@ -297,6 +297,8 @@ pub enum Map {
ProgramArray(MapData),
/// A [`Queue`] map.
Queue(MapData),
/// A [`ReusePortSockArray`] map.
ReusePortSockArray(MapData),
/// A [`RingBuf`] map.
RingBuf(MapData),
/// A [`SockHash`] map
@ -331,6 +333,7 @@ impl Map {
Self::PerfEventArray(map) => map.obj.map_type(),
Self::ProgramArray(map) => map.obj.map_type(),
Self::Queue(map) => map.obj.map_type(),
Self::ReusePortSockArray(map) => map.obj.map_type(),
Self::RingBuf(map) => map.obj.map_type(),
Self::SockHash(map) => map.obj.map_type(),
Self::SockMap(map) => map.obj.map_type(),
@ -362,6 +365,7 @@ impl Map {
Self::ProgramArray(map) => map.pin(path),
Self::Queue(map) => map.pin(path),
Self::RingBuf(map) => map.pin(path),
Self::ReusePortSockArray(map) => map.pin(path),
Self::SockHash(map) => map.pin(path),
Self::SockMap(map) => map.pin(path),
Self::Stack(map) => map.pin(path),
@ -480,6 +484,7 @@ impl_try_from_map!(() {
DevMapHash,
PerfEventArray,
ProgramArray,
ReusePortSockArray,
RingBuf,
SockMap,
StackTraceMap,

@ -1,4 +1,5 @@
//! Socket maps.
mod reuseport_sock_array;
mod sock_hash;
mod sock_map;
@ -7,6 +8,7 @@ use std::{
os::fd::{AsFd, BorrowedFd},
};
pub use reuseport_sock_array::ReusePortSockArray;
pub use sock_hash::SockHash;
pub use sock_map::SockMap;

@ -0,0 +1,171 @@
//! An array of sockets for SO_REUSEPORT selection.
use std::{
borrow::{Borrow, BorrowMut},
os::fd::{AsFd as _, AsRawFd, RawFd},
};
use crate::{
maps::{MapData, MapError, MapFd, MapKeys, check_bounds, check_kv_size, sock::SockMapFd},
sys::{SyscallError, bpf_map_delete_elem, bpf_map_update_elem},
};
/// An array of sockets that can be shared between eBPF programs and user space.
///
/// `ReusePortSockArray` stores sockets that participate in SO_REUSEPORT groups.
/// eBPF programs of type `BPF_PROG_TYPE_SK_REUSEPORT` can use this map with the
/// `bpf_sk_select_reuseport()` helper to select specific sockets for incoming
/// connections.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 4.19.
///
/// # Examples
///
/// ```no_run
/// # let mut bpf = aya::Ebpf::load(&[])?;
/// use aya::maps::ReusePortSockArray;
/// use aya::programs::SkReuseport;
/// use std::os::fd::{AsRawFd, FromRawFd};
/// use std::net::TcpListener;
/// use libc::{socket, setsockopt, bind, listen, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEPORT, sockaddr_in};
///
/// // Create socket with SO_REUSEPORT enabled
/// let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
/// let enable = 1i32;
/// unsafe {
/// setsockopt(
/// socket_fd,
/// SOL_SOCKET,
/// SO_REUSEPORT,
/// &enable as *const _ as *const _,
/// std::mem::size_of_val(&enable) as u32,
/// );
/// }
///
/// // Bind and listen (setup details omitted for brevity)
/// // ... bind(socket_fd, &addr, addr_len) and listen(socket_fd, backlog) ...
/// let socket = unsafe { TcpListener::from_raw_fd(socket_fd) };
///
/// // Load the socket array map and populate it
/// let mut socket_array: ReusePortSockArray<_> = bpf.take_map("socket_map").unwrap().try_into()?;
/// socket_array.set(0, &socket, 0)?;
///
/// // Load and attach the SK_REUSEPORT program
/// let prog: &mut SkReuseport = bpf.program_mut("select_socket").unwrap().try_into()?;
/// prog.load()?;
/// prog.attach(&socket)?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
///
/// # Complete Setup Example
///
/// This example shows proper SO_REUSEPORT socket group setup:
///
/// ```no_run
/// # let mut bpf = aya::Ebpf::load(&[])?;
/// use aya::maps::ReusePortSockArray;
/// use aya::programs::SkReuseport;
/// use std::net::TcpListener;
/// use std::os::fd::{AsRawFd, FromRawFd};
/// use libc::{socket, setsockopt, bind, listen, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEPORT, sockaddr_in};
///
/// // Create multiple sockets in SO_REUSEPORT group
/// let port = 8080u16;
/// let addr = sockaddr_in {
/// sin_family: AF_INET as u16,
/// sin_port: port.to_be(),
/// sin_addr: libc::in_addr { s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be() },
/// sin_zero: [0; 8],
/// };
///
/// let enable = 1i32;
/// let mut sockets = Vec::new();
///
/// // Create 4 SO_REUSEPORT sockets
/// for _ in 0..4 {
/// let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
/// unsafe {
/// // Set SO_REUSEPORT before binding (required for reuseport groups)
/// setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT,
/// &enable as *const _ as *const _, std::mem::size_of_val(&enable) as u32);
/// bind(socket_fd, &addr as *const _ as *const libc::sockaddr,
/// std::mem::size_of::<sockaddr_in>() as u32);
/// listen(socket_fd, 1024);
/// }
/// sockets.push(unsafe { TcpListener::from_raw_fd(socket_fd) });
/// }
///
/// // Load and populate the socket array map
/// let mut socket_array: ReusePortSockArray<_> = bpf.take_map("socket_map").unwrap().try_into()?;
/// for (i, socket) in sockets.iter().enumerate() {
/// socket_array.set(i as u32, socket, 0)?;
/// }
///
/// // Load and attach the SK_REUSEPORT program to first socket in group
/// let prog: &mut SkReuseport = bpf.program_mut("load_balancer").unwrap().try_into()?;
/// prog.load()?;
/// prog.attach(&sockets[0])?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[doc(alias = "BPF_MAP_TYPE_REUSEPORT_SOCKARRAY")]
pub struct ReusePortSockArray<T> {
pub(crate) inner: T,
}
impl<T: Borrow<MapData>> ReusePortSockArray<T> {
pub(crate) fn new(map: T) -> Result<Self, MapError> {
let data = map.borrow();
check_kv_size::<u32, RawFd>(data)?;
Ok(Self { inner: map })
}
/// An iterator over the indices of the array that point to a socket. The iterator item type
/// is `Result<u32, MapError>`.
pub fn indices(&self) -> MapKeys<'_, u32> {
MapKeys::new(self.inner.borrow())
}
/// Returns the map's file descriptor.
///
/// The returned file descriptor can be used with [`SkReuseport`](crate::programs::SkReuseport) programs.
pub fn fd(&self) -> &SockMapFd {
let fd: &MapFd = self.inner.borrow().fd();
// TODO(https://github.com/rust-lang/rfcs/issues/3066): avoid this unsafe.
// SAFETY: `SockMapFd` is #[repr(transparent)] over `MapFd`.
unsafe { std::mem::transmute(fd) }
}
}
impl<T: BorrowMut<MapData>> ReusePortSockArray<T> {
/// Stores a socket into the map at the given index.
///
/// The socket will be available for selection by `bpf_sk_select_reuseport()` helper
/// using the provided index.
pub fn set<I: AsRawFd>(&mut self, index: u32, socket: &I, flags: u64) -> Result<(), MapError> {
let data = self.inner.borrow_mut();
let fd = data.fd().as_fd();
check_bounds(data, index)?;
bpf_map_update_elem(fd, Some(&index), &socket.as_raw_fd(), flags)
.map_err(|io_error| SyscallError {
call: "bpf_map_update_elem",
io_error,
})
.map_err(Into::into)
}
/// Removes the socket stored at `index` from the map.
pub fn clear_index(&mut self, index: &u32) -> Result<(), MapError> {
let data = self.inner.borrow_mut();
let fd = data.fd().as_fd();
check_bounds(data, *index)?;
bpf_map_delete_elem(fd, index)
.map_err(|io_error| SyscallError {
call: "bpf_map_delete_elem",
io_error,
})
.map_err(Into::into)
}
}

@ -63,7 +63,7 @@ impl<T: Borrow<MapData>> SockMap<T> {
Ok(Self { inner: map })
}
/// An iterator over the indices of the array that point to a program. The iterator item type
/// An iterator over the indices of the array that point to a socket. The iterator item type
/// is `Result<u32, MapError>`.
pub fn indices(&self) -> MapKeys<'_, u32> {
MapKeys::new(self.inner.borrow())

@ -62,6 +62,7 @@ pub mod perf_event;
pub mod raw_trace_point;
pub mod sk_lookup;
pub mod sk_msg;
pub mod sk_reuseport;
pub mod sk_skb;
pub mod sock_ops;
pub mod socket_filter;
@ -114,6 +115,7 @@ pub use crate::programs::{
raw_trace_point::RawTracePoint,
sk_lookup::SkLookup,
sk_msg::SkMsg,
sk_reuseport::{SkReuseport, SkReuseportError},
sk_skb::{SkSkb, SkSkbKind},
sock_ops::SockOps,
socket_filter::{SocketFilter, SocketFilterError},
@ -200,6 +202,10 @@ pub enum ProgramError {
#[error(transparent)]
SocketFilterError(#[from] SocketFilterError),
/// An error occurred while working with a [`SkReuseport`] program.
#[error(transparent)]
SkReuseportError(#[from] SkReuseportError),
/// An error occurred while working with an [`Xdp`] program.
#[error(transparent)]
XdpError(#[from] XdpError),
@ -319,6 +325,8 @@ pub enum Program {
Extension(Extension),
/// A [`SkLookup`] program
SkLookup(SkLookup),
/// A [`SkReuseport`] program
SkReuseport(SkReuseport),
/// A [`CgroupSock`] program
CgroupSock(CgroupSock),
/// A [`CgroupDevice`] program
@ -353,6 +361,7 @@ impl Program {
Self::Extension(_) => Extension::PROGRAM_TYPE,
Self::CgroupSockAddr(_) => CgroupSockAddr::PROGRAM_TYPE,
Self::SkLookup(_) => SkLookup::PROGRAM_TYPE,
Self::SkReuseport(_) => SkReuseport::PROGRAM_TYPE,
Self::CgroupSock(_) => CgroupSock::PROGRAM_TYPE,
Self::CgroupDevice(_) => CgroupDevice::PROGRAM_TYPE,
Self::Iter(_) => Iter::PROGRAM_TYPE,
@ -386,6 +395,7 @@ impl Program {
Self::Extension(p) => p.pin(path),
Self::CgroupSockAddr(p) => p.pin(path),
Self::SkLookup(p) => p.pin(path),
Self::SkReuseport(p) => p.pin(path),
Self::CgroupSock(p) => p.pin(path),
Self::CgroupDevice(p) => p.pin(path),
Self::Iter(p) => p.pin(path),
@ -418,6 +428,7 @@ impl Program {
Self::Extension(mut p) => p.unload(),
Self::CgroupSockAddr(mut p) => p.unload(),
Self::SkLookup(mut p) => p.unload(),
Self::SkReuseport(mut p) => p.unload(),
Self::CgroupSock(mut p) => p.unload(),
Self::CgroupDevice(mut p) => p.unload(),
Self::Iter(mut p) => p.unload(),
@ -452,6 +463,7 @@ impl Program {
Self::Extension(p) => p.fd(),
Self::CgroupSockAddr(p) => p.fd(),
Self::SkLookup(p) => p.fd(),
Self::SkReuseport(p) => p.fd(),
Self::CgroupSock(p) => p.fd(),
Self::CgroupDevice(p) => p.fd(),
Self::Iter(p) => p.fd(),
@ -487,6 +499,7 @@ impl Program {
Self::Extension(p) => p.info(),
Self::CgroupSockAddr(p) => p.info(),
Self::SkLookup(p) => p.info(),
Self::SkReuseport(p) => p.info(),
Self::CgroupSock(p) => p.info(),
Self::CgroupDevice(p) => p.info(),
Self::Iter(p) => p.info(),
@ -802,6 +815,7 @@ impl_program_unload!(
Extension,
CgroupSockAddr,
SkLookup,
SkReuseport,
SockOps,
CgroupSock,
CgroupDevice,
@ -844,6 +858,7 @@ impl_fd!(
Extension,
CgroupSockAddr,
SkLookup,
SkReuseport,
SockOps,
CgroupSock,
CgroupDevice,
@ -951,6 +966,7 @@ impl_program_pin!(
Extension,
CgroupSockAddr,
SkLookup,
SkReuseport,
SockOps,
CgroupSock,
CgroupDevice,
@ -992,6 +1008,7 @@ impl_from_pin!(
FlowDissector,
Extension,
SkLookup,
SkReuseport,
SockOps,
CgroupDevice,
Iter,
@ -1101,6 +1118,7 @@ impl_from_prog_info!(
unsafe FExit,
Extension,
SkLookup,
SkReuseport,
CgroupDevice,
Iter,
);
@ -1157,6 +1175,7 @@ impl_try_from_program!(
Extension,
CgroupSockAddr,
SkLookup,
SkReuseport,
CgroupSock,
CgroupDevice,
Iter,
@ -1185,6 +1204,7 @@ impl_info!(
Extension,
CgroupSockAddr,
SkLookup,
SkReuseport,
SockOps,
CgroupSock,
CgroupDevice,

@ -0,0 +1,164 @@
//! Socket load balancing with SO_REUSEPORT.
use std::{
io, mem,
os::fd::{AsFd, AsRawFd as _, RawFd},
};
use aya_obj::generated::{
bpf_attach_type::BPF_SK_REUSEPORT_SELECT, bpf_prog_type::BPF_PROG_TYPE_SK_REUSEPORT,
};
use libc::{SOL_SOCKET, setsockopt};
use thiserror::Error;
use crate::programs::{Link, ProgramData, ProgramError, ProgramType, id_as_key, load_program};
/// SO_ATTACH_REUSEPORT_EBPF socket option constant.
const SO_ATTACH_REUSEPORT_EBPF: i32 = 52;
/// SO_DETACH_REUSEPORT_BPF socket option constant.
const SO_DETACH_REUSEPORT_BPF: i32 = 68;
/// The type returned when attaching a [`SkReuseport`] fails.
#[derive(Debug, Error)]
pub enum SkReuseportError {
/// Setting the `SO_ATTACH_REUSEPORT_EBPF` socket option failed.
#[error("setsockopt SO_ATTACH_REUSEPORT_EBPF failed")]
SoAttachReuseportEbpfError {
/// original [`io::Error`]
#[source]
io_error: io::Error,
},
}
/// A program used to select a socket within a SO_REUSEPORT group.
///
/// [`SkReuseport`] programs are attached to sockets with SO_REUSEPORT set to
/// provide programmable socket selection when multiple sockets are listening
/// on the same port. The program decides which socket in the reuseport group
/// should handle an incoming connection or packet.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 4.19.
///
/// # Examples
///
/// ```no_run
/// # #[derive(Debug, thiserror::Error)]
/// # enum Error {
/// # #[error(transparent)]
/// # IO(#[from] std::io::Error),
/// # #[error(transparent)]
/// # Map(#[from] aya::maps::MapError),
/// # #[error(transparent)]
/// # Program(#[from] aya::programs::ProgramError),
/// # #[error(transparent)]
/// # Ebpf(#[from] aya::EbpfError)
/// # }
/// # let mut bpf = aya::Ebpf::load(&[])?;
/// use std::net::TcpListener;
/// use aya::programs::SkReuseport;
///
/// let listener = TcpListener::bind("127.0.0.1:8080")?;
/// let program: &mut SkReuseport = bpf.program_mut("select_socket").unwrap().try_into()?;
/// program.load()?;
/// program.attach(listener)?;
/// # Ok::<(), Error>(())
/// ```
#[derive(Debug)]
#[doc(alias = "BPF_PROG_TYPE_SK_REUSEPORT")]
pub struct SkReuseport {
pub(crate) data: ProgramData<SkReuseportLink>,
}
impl SkReuseport {
/// The type of the program according to the kernel.
pub const PROGRAM_TYPE: ProgramType = ProgramType::SkReuseport;
/// Loads the program inside the kernel.
pub fn load(&mut self) -> Result<(), ProgramError> {
self.data.expected_attach_type = Some(BPF_SK_REUSEPORT_SELECT);
load_program(BPF_PROG_TYPE_SK_REUSEPORT, &mut self.data)
}
/// Attaches the program to the given socket.
///
/// The returned value can be used to detach, see [SkReuseport::detach].
pub fn attach<T: AsFd>(&mut self, socket: T) -> Result<SkReuseportLinkId, ProgramError> {
let prog_fd = self.fd()?;
let prog_fd = prog_fd.as_fd();
let prog_fd = prog_fd.as_raw_fd();
let socket = socket.as_fd();
let socket = socket.as_raw_fd();
let ret = unsafe {
setsockopt(
socket,
SOL_SOCKET,
SO_ATTACH_REUSEPORT_EBPF,
&prog_fd as *const _ as *const _,
mem::size_of::<RawFd>() as u32,
)
};
if ret < 0 {
return Err(SkReuseportError::SoAttachReuseportEbpfError {
io_error: io::Error::last_os_error(),
}
.into());
}
self.data.links.insert(SkReuseportLink { socket, prog_fd })
}
/// Detaches the program.
///
/// See [`Self::attach`].
pub fn detach(&mut self, link_id: SkReuseportLinkId) -> Result<(), ProgramError> {
self.data.links.remove(link_id)
}
/// Takes ownership of the link referenced by the provided `link_id`.
///
/// The caller takes the responsibility of managing the lifetime of the link. When the returned
/// [`SkReuseportLink`] is dropped, the link is detached.
pub fn take_link(
&mut self,
link_id: SkReuseportLinkId,
) -> Result<SkReuseportLink, ProgramError> {
self.data.links.forget(link_id)
}
}
/// The type returned by [`SkReuseport::attach`]. Can be passed to [`SkReuseport::detach`].
#[derive(Debug, Hash, Eq, PartialEq)]
pub struct SkReuseportLinkId(RawFd, RawFd);
/// A SkReuseport Link
#[derive(Debug)]
pub struct SkReuseportLink {
socket: RawFd,
prog_fd: RawFd,
}
impl Link for SkReuseportLink {
type Id = SkReuseportLinkId;
fn id(&self) -> Self::Id {
SkReuseportLinkId(self.socket, self.prog_fd)
}
fn detach(self) -> Result<(), ProgramError> {
unsafe {
setsockopt(
self.socket,
SOL_SOCKET,
SO_DETACH_REUSEPORT_BPF,
&self.prog_fd as *const _ as *const _,
mem::size_of::<RawFd>() as u32,
);
}
Ok(())
}
}
id_as_key!(SkReuseportLink, SkReuseportLinkId);

@ -13,6 +13,7 @@ pub mod per_cpu_array;
pub mod perf;
pub mod program_array;
pub mod queue;
pub mod reuseport_sock_array;
pub mod ring_buf;
pub mod sock_hash;
pub mod sock_map;
@ -28,6 +29,7 @@ pub use per_cpu_array::PerCpuArray;
pub use perf::{PerfEventArray, PerfEventByteArray};
pub use program_array::ProgramArray;
pub use queue::Queue;
pub use reuseport_sock_array::ReusePortSockArray;
pub use ring_buf::RingBuf;
pub use sock_hash::SockHash;
pub use sock_map::SockMap;

@ -0,0 +1,60 @@
use core::{cell::UnsafeCell, mem};
use crate::{
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_REUSEPORT_SOCKARRAY},
maps::PinningType,
};
/// An array of sockets for use with SO_REUSEPORT socket selection.
///
/// `ReusePortSockArray` is used to store sockets that participate in SO_REUSEPORT
/// groups. eBPF programs of type `BPF_PROG_TYPE_SK_REUSEPORT` can use this map
/// with the `bpf_sk_select_reuseport()` helper to select specific sockets for
/// incoming connections.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 4.19.
#[repr(transparent)]
pub struct ReusePortSockArray {
def: UnsafeCell<bpf_map_def>,
}
unsafe impl Sync for ReusePortSockArray {}
impl ReusePortSockArray {
/// Creates a new `ReusePortSockArray` with the specified maximum number of entries.
pub const fn with_max_entries(max_entries: u32, flags: u32) -> ReusePortSockArray {
ReusePortSockArray {
def: UnsafeCell::new(bpf_map_def {
type_: BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
key_size: mem::size_of::<u32>() as u32,
value_size: mem::size_of::<u32>() as u32,
max_entries,
map_flags: flags,
id: 0,
pinning: PinningType::None as u32,
}),
}
}
/// Creates a new pinned `ReusePortSockArray` with the specified maximum number of entries.
pub const fn pinned(max_entries: u32, flags: u32) -> ReusePortSockArray {
ReusePortSockArray {
def: UnsafeCell::new(bpf_map_def {
type_: BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
key_size: mem::size_of::<u32>() as u32,
value_size: mem::size_of::<u32>() as u32,
max_entries,
map_flags: flags,
id: 0,
pinning: PinningType::ByName as u32,
}),
}
}
/// Returns a raw pointer to the map definition for use with helpers.
pub fn as_ptr(&self) -> *mut ::aya_ebpf_cty::c_void {
self.def.get() as *mut _
}
}

@ -10,6 +10,7 @@ pub mod retprobe;
pub mod sk_buff;
pub mod sk_lookup;
pub mod sk_msg;
pub mod sk_reuseport;
pub mod sock;
pub mod sock_addr;
pub mod sock_ops;
@ -32,6 +33,7 @@ pub use retprobe::RetProbeContext;
pub use sk_buff::SkBuffContext;
pub use sk_lookup::SkLookupContext;
pub use sk_msg::SkMsgContext;
pub use sk_reuseport::{SK_DROP, SK_PASS, SkReuseportContext};
pub use sock::SockContext;
pub use sock_addr::SockAddrContext;
pub use sock_ops::SockOpsContext;

@ -0,0 +1,162 @@
//! Socket load balancing with SO_REUSEPORT programs.
//!
//! This module provides context and constants for BPF_PROG_TYPE_SK_REUSEPORT programs
//! which allow custom load balancing logic for SO_REUSEPORT socket groups.
//!
//! # Basic Usage
//!
//! ```no_run
//! use aya_ebpf::{macros::sk_reuseport, programs::{SkReuseportContext, SK_PASS, SK_DROP}};
//!
//! #[sk_reuseport]
//! pub fn load_balancer(ctx: SkReuseportContext) -> u32 {
//! // Allow packet through - kernel will balance among sockets
//! SK_PASS
//! }
//! ```
//!
//! # Advanced Socket Selection
//!
//! For explicit socket selection, use `bpf_sk_select_reuseport()` with socket arrays:
//!
//! ```no_run
//! use aya_ebpf::{
//! macros::{sk_reuseport, map},
//! programs::{SkReuseportContext, SK_PASS, SK_DROP},
//! helpers::bpf_sk_select_reuseport,
//! maps::ReusePortSockArray,
//! EbpfContext,
//! };
//!
//! #[map(name = "socket_map")]
//! static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(10, 0);
//!
//! #[sk_reuseport]
//! pub fn select_worker(ctx: SkReuseportContext) -> u32 {
//! // Custom logic to determine worker index
//! let worker_id: u32 = 2;
//!
//! // Select specific socket using helper
//! let ret = unsafe {
//! bpf_sk_select_reuseport(
//! ctx.as_ptr() as *mut _,
//! SOCKET_MAP.as_ptr(),
//! &worker_id as *const _ as *mut _,
//! 0
//! )
//! };
//!
//! // Return SK_DROP on error, SK_PASS on success
//! if ret == 0 {
//! SK_PASS
//! } else {
//! SK_DROP
//! }
//! }
//! ```
//!
//! # Context Field Access Example
//!
//! Access packet metadata for custom load balancing decisions:
//!
//! ```no_run
//! use aya_ebpf::{
//! macros::{sk_reuseport, map},
//! programs::{SkReuseportContext, SK_PASS, SK_DROP},
//! helpers::bpf_sk_select_reuseport,
//! maps::ReusePortSockArray,
//! EbpfContext,
//! };
//!
//! #[map(name = "socket_map")]
//! static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(4, 0);
//!
//! #[sk_reuseport]
//! pub fn hash_based_selection(ctx: SkReuseportContext) -> u32 {
//! // Use packet hash for consistent load balancing
//! let socket_idx = ctx.hash() % 4;
//!
//! // Only handle TCP traffic
//! if ctx.ip_protocol() == 6 { // IPPROTO_TCP
//! let ret = unsafe {
//! bpf_sk_select_reuseport(
//! ctx.as_ptr() as *mut _,
//! SOCKET_MAP.as_ptr(),
//! &socket_idx as *const _ as *mut _,
//! 0
//! )
//! };
//!
//! if ret == 0 {
//! SK_PASS
//! } else {
//! SK_DROP
//! }
//! } else {
//! // Let kernel handle non-TCP traffic
//! SK_PASS
//! }
//! }
//! ```
use core::ffi::c_void;
use crate::{EbpfContext, bindings::sk_reuseport_md};
/// SK_PASS: Allow packet through and let kernel handle socket selection
pub const SK_PASS: u32 = 1;
/// SK_DROP: Drop the packet
pub const SK_DROP: u32 = 0;
pub struct SkReuseportContext {
pub md: *mut sk_reuseport_md,
}
impl SkReuseportContext {
pub fn new(md: *mut sk_reuseport_md) -> SkReuseportContext {
SkReuseportContext { md }
}
/// Returns the start of the directly accessible data.
pub fn data(&self) -> usize {
unsafe { (*self.md).__bindgen_anon_1.data as usize }
}
/// Returns the end of the directly accessible data.
pub fn data_end(&self) -> usize {
unsafe { (*self.md).__bindgen_anon_2.data_end as usize }
}
/// Returns the total packet length.
#[expect(clippy::len_without_is_empty)]
pub fn len(&self) -> u32 {
unsafe { (*self.md).len }
}
/// Returns the ethernet protocol from the packet (network byte order).
pub fn eth_protocol(&self) -> u32 {
unsafe { (*self.md).eth_protocol }
}
/// Returns the IP protocol (e.g., IPPROTO_TCP, IPPROTO_UDP).
pub fn ip_protocol(&self) -> u32 {
unsafe { (*self.md).ip_protocol }
}
/// Returns whether the socket is bound to an INANY address.
pub fn bind_inany(&self) -> u32 {
unsafe { (*self.md).bind_inany }
}
/// Returns the hash of the packet's 4-tuple for load balancing.
pub fn hash(&self) -> u32 {
unsafe { (*self.md).hash }
}
}
impl EbpfContext for SkReuseportContext {
fn as_ptr(&self) -> *mut c_void {
self.md as *mut _
}
}

@ -72,6 +72,10 @@ path = "src/ring_buf.rs"
name = "simple_prog"
path = "src/simple_prog.rs"
[[bin]]
name = "sk_reuseport"
path = "src/sk_reuseport.rs"
[[bin]]
name = "strncmp"
path = "src/strncmp.rs"

@ -0,0 +1,65 @@
#![no_std]
#![no_main]
use aya_ebpf::{
macros::{map, sk_reuseport},
maps::ReusePortSockArray,
programs::{SK_PASS, SK_DROP, SkReuseportContext},
helpers::bpf_sk_select_reuseport,
EbpfContext as _,
};
#[cfg(not(test))]
extern crate ebpf_panic;
#[map(name = "socket_map")]
static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(10, 0);
#[sk_reuseport]
pub fn select_socket(_ctx: SkReuseportContext) -> u32 {
// Return SK_PASS to allow packet and let kernel handle socket selection
// Return SK_DROP would drop the packet
SK_PASS
}
#[sk_reuseport]
pub fn test_context_access(ctx: SkReuseportContext) -> u32 {
// Test accessing context fields
let _len = ctx.len();
let _hash = ctx.hash();
let _ip_protocol = ctx.ip_protocol();
let _eth_protocol = ctx.eth_protocol();
let _bind_inany = ctx.bind_inany();
let _data = ctx.data();
let _data_end = ctx.data_end();
// Always pass for testing
SK_PASS
}
#[sk_reuseport]
pub fn test_helper_usage(ctx: SkReuseportContext) -> u32 {
// Use hash-based socket selection with helper
let socket_idx = ctx.hash() % 4;
// Only handle TCP traffic (protocol 6)
if ctx.ip_protocol() == 6 {
let ret = unsafe {
bpf_sk_select_reuseport(
ctx.as_ptr() as *mut _,
SOCKET_MAP.as_ptr(),
&socket_idx as *const _ as *mut _,
0
)
};
// Return result based on helper success
if ret == 0 {
SK_PASS
} else {
SK_DROP
}
} else {
// Let kernel handle non-TCP traffic
SK_PASS
}
}

@ -50,12 +50,13 @@ bpf_file!(
RELOCATIONS => "relocations",
RING_BUF => "ring_buf",
SIMPLE_PROG => "simple_prog",
SK_REUSEPORT => "sk_reuseport",
STRNCMP => "strncmp",
TCX => "tcx",
TEST => "test",
TWO_PROGS => "two_progs",
XDP_SEC => "xdp_sec",
UPROBE_COOKIE => "uprobe_cookie",
XDP_SEC => "xdp_sec",
);
#[cfg(test)]

@ -11,6 +11,7 @@ mod raw_tracepoint;
mod rbpf;
mod relocations;
mod ring_buf;
mod sk_reuseport;
mod smoke;
mod strncmp;
mod tcx;

@ -0,0 +1,307 @@
use std::{net::TcpListener, os::fd::{AsFd as _, AsRawFd as _, FromRawFd as _}, time::Duration};
use aya::{
Ebpf,
maps::ReusePortSockArray,
programs::{SkReuseport, loaded_programs},
};
use libc::{setsockopt, SOL_SOCKET, SO_REUSEPORT, socket, bind, listen, AF_INET, SOCK_STREAM, sockaddr_in};
use tokio::time::sleep;
use crate::utils::NetNsGuard;
#[tokio::test]
async fn sk_reuseport_load() {
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
let prog: &mut SkReuseport = ebpf
.program_mut("select_socket")
.unwrap()
.try_into()
.unwrap();
// Test that the program loads successfully
prog.load().unwrap();
// Test that it's properly loaded
let info = prog.info().unwrap();
assert_eq!(
info.program_type().unwrap(),
aya::programs::ProgramType::SkReuseport
);
}
#[tokio::test]
async fn sk_reuseport_loaded_programs_iteration() {
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
let prog: &mut SkReuseport = ebpf
.program_mut("select_socket")
.unwrap()
.try_into()
.unwrap();
let programs = loaded_programs().collect::<Result<Vec<_>, _>>().unwrap();
assert!(!programs.iter().any(|p| matches!(
p.program_type().unwrap(),
aya::programs::ProgramType::SkReuseport
)));
prog.load().unwrap();
sleep(Duration::from_millis(500)).await;
let programs = loaded_programs().collect::<Result<Vec<_>, _>>().unwrap();
assert!(programs.iter().any(|p| matches!(
p.program_type().unwrap(),
aya::programs::ProgramType::SkReuseport
)));
}
#[tokio::test]
async fn sk_reuseport_map_operations() {
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
// Test that we can access the ReusePortSockArray map
// Note: This test checks map creation and basic operations
// In a real scenario, you would need actual SO_REUSEPORT sockets
let map: ReusePortSockArray<_> = ebpf.take_map("socket_map")
.expect("socket_map should exist")
.try_into()
.expect("map should convert to ReusePortSockArray");
// Test that the map has the correct properties
let fd = map.fd();
assert!(!fd.as_fd().as_raw_fd() < 0, "Map fd should be valid");
// Test indices iterator (array maps have all indices pre-allocated)
let indices_count = map.indices().count();
assert_eq!(indices_count, 10, "Array map should have all 10 indices available");
}
#[test_log::test]
fn sk_reuseport_attach_detach() {
let _netns = NetNsGuard::new();
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
let prog: &mut SkReuseport = ebpf
.program_mut("select_socket")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
// Create a socket with SO_REUSEPORT enabled - a simpler approach
// First create a normal listener to get a port, then create SO_REUSEPORT sockets
let temp_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = temp_listener.local_addr().unwrap();
drop(temp_listener); // Release the port
// Create socket with SO_REUSEPORT before binding
let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
assert!(socket_fd >= 0, "Failed to create socket");
let enable = 1i32;
unsafe {
let ret = setsockopt(
socket_fd,
SOL_SOCKET,
SO_REUSEPORT,
&enable as *const _ as *const _,
std::mem::size_of_val(&enable) as u32,
);
assert_eq!(ret, 0, "Failed to set SO_REUSEPORT");
}
// Manually bind and listen
let addr = sockaddr_in {
sin_family: AF_INET as u16,
sin_port: local_addr.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be(),
},
sin_zero: [0; 8],
};
let bind_result = unsafe {
bind(
socket_fd,
&addr as *const _ as *const libc::sockaddr,
std::mem::size_of::<sockaddr_in>() as u32,
)
};
assert_eq!(bind_result, 0, "Failed to bind socket");
let listen_result = unsafe { listen(socket_fd, 1024) };
assert_eq!(listen_result, 0, "Failed to listen on socket");
// Convert to TcpListener
let listener = unsafe { TcpListener::from_raw_fd(socket_fd) };
// Test program attachment
let link_id = prog.attach(&listener).unwrap();
// Test program detachment
prog.detach(link_id).unwrap();
}
#[test_log::test]
fn sk_reuseport_socket_array_operations() {
let _netns = NetNsGuard::new();
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
// Get the socket array map
let mut socket_array: ReusePortSockArray<_> = ebpf
.take_map("socket_map")
.unwrap()
.try_into()
.unwrap();
// Create multiple SO_REUSEPORT sockets that bind to the same port
let temp_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = temp_listener.local_addr().unwrap();
drop(temp_listener); // Release the port
// Create first socket with SO_REUSEPORT
let socket1_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
assert!(socket1_fd >= 0, "Failed to create socket1");
// Create second socket with SO_REUSEPORT
let socket2_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
assert!(socket2_fd >= 0, "Failed to create socket2");
let enable = 1i32;
// Set SO_REUSEPORT on both sockets before binding
unsafe {
let ret1 = setsockopt(
socket1_fd,
SOL_SOCKET,
SO_REUSEPORT,
&enable as *const _ as *const _,
std::mem::size_of_val(&enable) as u32,
);
let ret2 = setsockopt(
socket2_fd,
SOL_SOCKET,
SO_REUSEPORT,
&enable as *const _ as *const _,
std::mem::size_of_val(&enable) as u32,
);
assert_eq!(ret1, 0, "Failed to set SO_REUSEPORT on socket1");
assert_eq!(ret2, 0, "Failed to set SO_REUSEPORT on socket2");
}
// Bind both sockets to the same address to create SO_REUSEPORT group
let addr = sockaddr_in {
sin_family: AF_INET as u16,
sin_port: local_addr.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be(),
},
sin_zero: [0; 8],
};
unsafe {
let bind_result1 = bind(
socket1_fd,
&addr as *const _ as *const libc::sockaddr,
std::mem::size_of::<sockaddr_in>() as u32,
);
let bind_result2 = bind(
socket2_fd,
&addr as *const _ as *const libc::sockaddr,
std::mem::size_of::<sockaddr_in>() as u32,
);
assert_eq!(bind_result1, 0, "Failed to bind socket1");
assert_eq!(bind_result2, 0, "Failed to bind socket2");
let listen_result1 = listen(socket1_fd, 1024);
let listen_result2 = listen(socket2_fd, 1024);
assert_eq!(listen_result1, 0, "Failed to listen on socket1");
assert_eq!(listen_result2, 0, "Failed to listen on socket2");
}
// Convert to TcpListeners
let listener1 = unsafe { TcpListener::from_raw_fd(socket1_fd) };
let listener2 = unsafe { TcpListener::from_raw_fd(socket2_fd) };
// Test storing sockets in the array
socket_array.set(0, &listener1, 0).unwrap();
socket_array.set(1, &listener2, 0).unwrap();
// Test removing sockets from the array
socket_array.clear_index(&0).unwrap();
socket_array.clear_index(&1).unwrap();
}
#[test_log::test]
fn sk_reuseport_error_conditions() {
let _netns = NetNsGuard::new();
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
// Get the socket array map
let mut socket_array: ReusePortSockArray<_> = ebpf
.take_map("socket_map")
.unwrap()
.try_into()
.unwrap();
let socket = TcpListener::bind("127.0.0.1:0").unwrap();
// Test bounds checking - should fail for out-of-bounds index
let result = socket_array.set(100, &socket, 0);
assert!(result.is_err(), "Setting socket at out-of-bounds index should fail");
let result = socket_array.clear_index(&100);
assert!(result.is_err(), "Clearing out-of-bounds index should fail");
}
#[tokio::test]
async fn sk_reuseport_context_access() {
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
let prog: &mut SkReuseport = ebpf
.program_mut("test_context_access")
.unwrap()
.try_into()
.unwrap();
// Test that the program loads successfully with context field access
prog.load().unwrap();
// Test that it's properly loaded
let info = prog.info().unwrap();
assert_eq!(
info.program_type().unwrap(),
aya::programs::ProgramType::SkReuseport
);
}
#[tokio::test]
async fn sk_reuseport_helper_usage() {
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
let prog: &mut SkReuseport = ebpf
.program_mut("test_helper_usage")
.unwrap()
.try_into()
.unwrap();
// Test that the program loads successfully with helper usage
prog.load().unwrap();
// Test that it's properly loaded
let info = prog.info().unwrap();
assert_eq!(
info.program_type().unwrap(),
aya::programs::ProgramType::SkReuseport
);
// Test that we can access the socket map used by the helper
let map: ReusePortSockArray<_> = ebpf.take_map("socket_map")
.expect("socket_map should exist")
.try_into()
.expect("map should convert to ReusePortSockArray");
// Verify map properties
let indices_count = map.indices().count();
assert_eq!(indices_count, 10, "Array map should have all 10 indices available");
}
Loading…
Cancel
Save