mirror of https://github.com/aya-rs/aya
aya: add support for BPF_PROG_TYPE_SK_REUSEPORT
Implements SK_REUSEPORT support to enable programmable socket selection within SO_REUSEPORT groups. - #[sk_reuseport] macro for eBPF programs - SkReuseport program type with load/attach methods - SkReuseportContext with sk_reuseport_md field access - ReusePortSockArray map for socket management - SK_PASS/SK_DROP constants for return values - Documentation and usage examples - Some integration tests This allows load balancing decisions to be made programmatically with SO_REUSEPORT set. Fixes: #215reviewable/pr1328/r1
parent
4fe920f761
commit
6f8fd230d4
@ -0,0 +1,71 @@
|
||||
use proc_macro2::TokenStream;
|
||||
use proc_macro2_diagnostics::{Diagnostic, SpanDiagnosticExt as _};
|
||||
use quote::quote;
|
||||
use syn::{ItemFn, spanned::Spanned as _};
|
||||
|
||||
pub(crate) struct SkReuseport {
|
||||
item: ItemFn,
|
||||
}
|
||||
|
||||
impl SkReuseport {
|
||||
pub(crate) fn parse(attrs: TokenStream, item: TokenStream) -> Result<Self, Diagnostic> {
|
||||
if !attrs.is_empty() {
|
||||
return Err(attrs.span().error("unexpected attribute"));
|
||||
}
|
||||
let item = syn::parse2(item)?;
|
||||
Ok(Self { item })
|
||||
}
|
||||
|
||||
pub(crate) fn expand(&self) -> TokenStream {
|
||||
let Self { item } = self;
|
||||
let ItemFn {
|
||||
attrs: _,
|
||||
vis,
|
||||
sig,
|
||||
block: _,
|
||||
} = item;
|
||||
let fn_name = &sig.ident;
|
||||
quote! {
|
||||
#[unsafe(no_mangle)]
|
||||
#[unsafe(link_section = "sk_reuseport")]
|
||||
#vis fn #fn_name(ctx: *mut ::aya_ebpf::bindings::sk_reuseport_md) -> u32 {
|
||||
return #fn_name(::aya_ebpf::programs::SkReuseportContext::new(ctx));
|
||||
|
||||
#item
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use syn::parse_quote;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_sk_reuseport() {
|
||||
let prog = SkReuseport::parse(
|
||||
parse_quote! {},
|
||||
parse_quote! {
|
||||
fn prog(ctx: &mut ::aya_ebpf::programs::SkReuseportContext) -> u32 {
|
||||
0
|
||||
}
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
let expanded = prog.expand();
|
||||
let expected = quote! {
|
||||
#[unsafe(no_mangle)]
|
||||
#[unsafe(link_section = "sk_reuseport")]
|
||||
fn prog(ctx: *mut ::aya_ebpf::bindings::sk_reuseport_md) -> u32 {
|
||||
return prog(::aya_ebpf::programs::SkReuseportContext::new(ctx));
|
||||
|
||||
fn prog(ctx: &mut ::aya_ebpf::programs::SkReuseportContext) -> u32 {
|
||||
0
|
||||
}
|
||||
}
|
||||
};
|
||||
assert_eq!(expected.to_string(), expanded.to_string());
|
||||
}
|
||||
}
|
@ -0,0 +1,171 @@
|
||||
//! An array of sockets for SO_REUSEPORT selection.
|
||||
|
||||
use std::{
|
||||
borrow::{Borrow, BorrowMut},
|
||||
os::fd::{AsFd as _, AsRawFd, RawFd},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
maps::{MapData, MapError, MapFd, MapKeys, check_bounds, check_kv_size, sock::SockMapFd},
|
||||
sys::{SyscallError, bpf_map_delete_elem, bpf_map_update_elem},
|
||||
};
|
||||
|
||||
/// An array of sockets that can be shared between eBPF programs and user space.
|
||||
///
|
||||
/// `ReusePortSockArray` stores sockets that participate in SO_REUSEPORT groups.
|
||||
/// eBPF programs of type `BPF_PROG_TYPE_SK_REUSEPORT` can use this map with the
|
||||
/// `bpf_sk_select_reuseport()` helper to select specific sockets for incoming
|
||||
/// connections.
|
||||
///
|
||||
/// # Minimum kernel version
|
||||
///
|
||||
/// The minimum kernel version required to use this feature is 4.19.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # let mut bpf = aya::Ebpf::load(&[])?;
|
||||
/// use aya::maps::ReusePortSockArray;
|
||||
/// use aya::programs::SkReuseport;
|
||||
/// use std::os::fd::{AsRawFd, FromRawFd};
|
||||
/// use std::net::TcpListener;
|
||||
/// use libc::{socket, setsockopt, bind, listen, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEPORT, sockaddr_in};
|
||||
///
|
||||
/// // Create socket with SO_REUSEPORT enabled
|
||||
/// let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
|
||||
/// let enable = 1i32;
|
||||
/// unsafe {
|
||||
/// setsockopt(
|
||||
/// socket_fd,
|
||||
/// SOL_SOCKET,
|
||||
/// SO_REUSEPORT,
|
||||
/// &enable as *const _ as *const _,
|
||||
/// std::mem::size_of_val(&enable) as u32,
|
||||
/// );
|
||||
/// }
|
||||
///
|
||||
/// // Bind and listen (setup details omitted for brevity)
|
||||
/// // ... bind(socket_fd, &addr, addr_len) and listen(socket_fd, backlog) ...
|
||||
/// let socket = unsafe { TcpListener::from_raw_fd(socket_fd) };
|
||||
///
|
||||
/// // Load the socket array map and populate it
|
||||
/// let mut socket_array: ReusePortSockArray<_> = bpf.take_map("socket_map").unwrap().try_into()?;
|
||||
/// socket_array.set(0, &socket, 0)?;
|
||||
///
|
||||
/// // Load and attach the SK_REUSEPORT program
|
||||
/// let prog: &mut SkReuseport = bpf.program_mut("select_socket").unwrap().try_into()?;
|
||||
/// prog.load()?;
|
||||
/// prog.attach(&socket)?;
|
||||
/// # Ok::<(), Box<dyn std::error::Error>>(())
|
||||
/// ```
|
||||
///
|
||||
/// # Complete Setup Example
|
||||
///
|
||||
/// This example shows proper SO_REUSEPORT socket group setup:
|
||||
///
|
||||
/// ```no_run
|
||||
/// # let mut bpf = aya::Ebpf::load(&[])?;
|
||||
/// use aya::maps::ReusePortSockArray;
|
||||
/// use aya::programs::SkReuseport;
|
||||
/// use std::net::TcpListener;
|
||||
/// use std::os::fd::{AsRawFd, FromRawFd};
|
||||
/// use libc::{socket, setsockopt, bind, listen, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEPORT, sockaddr_in};
|
||||
///
|
||||
/// // Create multiple sockets in SO_REUSEPORT group
|
||||
/// let port = 8080u16;
|
||||
/// let addr = sockaddr_in {
|
||||
/// sin_family: AF_INET as u16,
|
||||
/// sin_port: port.to_be(),
|
||||
/// sin_addr: libc::in_addr { s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be() },
|
||||
/// sin_zero: [0; 8],
|
||||
/// };
|
||||
///
|
||||
/// let enable = 1i32;
|
||||
/// let mut sockets = Vec::new();
|
||||
///
|
||||
/// // Create 4 SO_REUSEPORT sockets
|
||||
/// for _ in 0..4 {
|
||||
/// let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
|
||||
/// unsafe {
|
||||
/// // Set SO_REUSEPORT before binding (required for reuseport groups)
|
||||
/// setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT,
|
||||
/// &enable as *const _ as *const _, std::mem::size_of_val(&enable) as u32);
|
||||
/// bind(socket_fd, &addr as *const _ as *const libc::sockaddr,
|
||||
/// std::mem::size_of::<sockaddr_in>() as u32);
|
||||
/// listen(socket_fd, 1024);
|
||||
/// }
|
||||
/// sockets.push(unsafe { TcpListener::from_raw_fd(socket_fd) });
|
||||
/// }
|
||||
///
|
||||
/// // Load and populate the socket array map
|
||||
/// let mut socket_array: ReusePortSockArray<_> = bpf.take_map("socket_map").unwrap().try_into()?;
|
||||
/// for (i, socket) in sockets.iter().enumerate() {
|
||||
/// socket_array.set(i as u32, socket, 0)?;
|
||||
/// }
|
||||
///
|
||||
/// // Load and attach the SK_REUSEPORT program to first socket in group
|
||||
/// let prog: &mut SkReuseport = bpf.program_mut("load_balancer").unwrap().try_into()?;
|
||||
/// prog.load()?;
|
||||
/// prog.attach(&sockets[0])?;
|
||||
/// # Ok::<(), Box<dyn std::error::Error>>(())
|
||||
/// ```
|
||||
#[doc(alias = "BPF_MAP_TYPE_REUSEPORT_SOCKARRAY")]
|
||||
pub struct ReusePortSockArray<T> {
|
||||
pub(crate) inner: T,
|
||||
}
|
||||
|
||||
impl<T: Borrow<MapData>> ReusePortSockArray<T> {
|
||||
pub(crate) fn new(map: T) -> Result<Self, MapError> {
|
||||
let data = map.borrow();
|
||||
check_kv_size::<u32, RawFd>(data)?;
|
||||
|
||||
Ok(Self { inner: map })
|
||||
}
|
||||
|
||||
/// An iterator over the indices of the array that point to a socket. The iterator item type
|
||||
/// is `Result<u32, MapError>`.
|
||||
pub fn indices(&self) -> MapKeys<'_, u32> {
|
||||
MapKeys::new(self.inner.borrow())
|
||||
}
|
||||
|
||||
/// Returns the map's file descriptor.
|
||||
///
|
||||
/// The returned file descriptor can be used with [`SkReuseport`](crate::programs::SkReuseport) programs.
|
||||
pub fn fd(&self) -> &SockMapFd {
|
||||
let fd: &MapFd = self.inner.borrow().fd();
|
||||
// TODO(https://github.com/rust-lang/rfcs/issues/3066): avoid this unsafe.
|
||||
// SAFETY: `SockMapFd` is #[repr(transparent)] over `MapFd`.
|
||||
unsafe { std::mem::transmute(fd) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BorrowMut<MapData>> ReusePortSockArray<T> {
|
||||
/// Stores a socket into the map at the given index.
|
||||
///
|
||||
/// The socket will be available for selection by `bpf_sk_select_reuseport()` helper
|
||||
/// using the provided index.
|
||||
pub fn set<I: AsRawFd>(&mut self, index: u32, socket: &I, flags: u64) -> Result<(), MapError> {
|
||||
let data = self.inner.borrow_mut();
|
||||
let fd = data.fd().as_fd();
|
||||
check_bounds(data, index)?;
|
||||
bpf_map_update_elem(fd, Some(&index), &socket.as_raw_fd(), flags)
|
||||
.map_err(|io_error| SyscallError {
|
||||
call: "bpf_map_update_elem",
|
||||
io_error,
|
||||
})
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Removes the socket stored at `index` from the map.
|
||||
pub fn clear_index(&mut self, index: &u32) -> Result<(), MapError> {
|
||||
let data = self.inner.borrow_mut();
|
||||
let fd = data.fd().as_fd();
|
||||
check_bounds(data, *index)?;
|
||||
bpf_map_delete_elem(fd, index)
|
||||
.map_err(|io_error| SyscallError {
|
||||
call: "bpf_map_delete_elem",
|
||||
io_error,
|
||||
})
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
@ -0,0 +1,164 @@
|
||||
//! Socket load balancing with SO_REUSEPORT.
|
||||
use std::{
|
||||
io, mem,
|
||||
os::fd::{AsFd, AsRawFd as _, RawFd},
|
||||
};
|
||||
|
||||
use aya_obj::generated::{
|
||||
bpf_attach_type::BPF_SK_REUSEPORT_SELECT, bpf_prog_type::BPF_PROG_TYPE_SK_REUSEPORT,
|
||||
};
|
||||
use libc::{SOL_SOCKET, setsockopt};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::programs::{Link, ProgramData, ProgramError, ProgramType, id_as_key, load_program};
|
||||
|
||||
/// SO_ATTACH_REUSEPORT_EBPF socket option constant.
|
||||
const SO_ATTACH_REUSEPORT_EBPF: i32 = 52;
|
||||
|
||||
/// SO_DETACH_REUSEPORT_BPF socket option constant.
|
||||
const SO_DETACH_REUSEPORT_BPF: i32 = 68;
|
||||
|
||||
/// The type returned when attaching a [`SkReuseport`] fails.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SkReuseportError {
|
||||
/// Setting the `SO_ATTACH_REUSEPORT_EBPF` socket option failed.
|
||||
#[error("setsockopt SO_ATTACH_REUSEPORT_EBPF failed")]
|
||||
SoAttachReuseportEbpfError {
|
||||
/// original [`io::Error`]
|
||||
#[source]
|
||||
io_error: io::Error,
|
||||
},
|
||||
}
|
||||
|
||||
/// A program used to select a socket within a SO_REUSEPORT group.
|
||||
///
|
||||
/// [`SkReuseport`] programs are attached to sockets with SO_REUSEPORT set to
|
||||
/// provide programmable socket selection when multiple sockets are listening
|
||||
/// on the same port. The program decides which socket in the reuseport group
|
||||
/// should handle an incoming connection or packet.
|
||||
///
|
||||
/// # Minimum kernel version
|
||||
///
|
||||
/// The minimum kernel version required to use this feature is 4.19.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # #[derive(Debug, thiserror::Error)]
|
||||
/// # enum Error {
|
||||
/// # #[error(transparent)]
|
||||
/// # IO(#[from] std::io::Error),
|
||||
/// # #[error(transparent)]
|
||||
/// # Map(#[from] aya::maps::MapError),
|
||||
/// # #[error(transparent)]
|
||||
/// # Program(#[from] aya::programs::ProgramError),
|
||||
/// # #[error(transparent)]
|
||||
/// # Ebpf(#[from] aya::EbpfError)
|
||||
/// # }
|
||||
/// # let mut bpf = aya::Ebpf::load(&[])?;
|
||||
/// use std::net::TcpListener;
|
||||
/// use aya::programs::SkReuseport;
|
||||
///
|
||||
/// let listener = TcpListener::bind("127.0.0.1:8080")?;
|
||||
/// let program: &mut SkReuseport = bpf.program_mut("select_socket").unwrap().try_into()?;
|
||||
/// program.load()?;
|
||||
/// program.attach(listener)?;
|
||||
/// # Ok::<(), Error>(())
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
#[doc(alias = "BPF_PROG_TYPE_SK_REUSEPORT")]
|
||||
pub struct SkReuseport {
|
||||
pub(crate) data: ProgramData<SkReuseportLink>,
|
||||
}
|
||||
|
||||
impl SkReuseport {
|
||||
/// The type of the program according to the kernel.
|
||||
pub const PROGRAM_TYPE: ProgramType = ProgramType::SkReuseport;
|
||||
|
||||
/// Loads the program inside the kernel.
|
||||
pub fn load(&mut self) -> Result<(), ProgramError> {
|
||||
self.data.expected_attach_type = Some(BPF_SK_REUSEPORT_SELECT);
|
||||
load_program(BPF_PROG_TYPE_SK_REUSEPORT, &mut self.data)
|
||||
}
|
||||
|
||||
/// Attaches the program to the given socket.
|
||||
///
|
||||
/// The returned value can be used to detach, see [SkReuseport::detach].
|
||||
pub fn attach<T: AsFd>(&mut self, socket: T) -> Result<SkReuseportLinkId, ProgramError> {
|
||||
let prog_fd = self.fd()?;
|
||||
let prog_fd = prog_fd.as_fd();
|
||||
let prog_fd = prog_fd.as_raw_fd();
|
||||
let socket = socket.as_fd();
|
||||
let socket = socket.as_raw_fd();
|
||||
|
||||
let ret = unsafe {
|
||||
setsockopt(
|
||||
socket,
|
||||
SOL_SOCKET,
|
||||
SO_ATTACH_REUSEPORT_EBPF,
|
||||
&prog_fd as *const _ as *const _,
|
||||
mem::size_of::<RawFd>() as u32,
|
||||
)
|
||||
};
|
||||
if ret < 0 {
|
||||
return Err(SkReuseportError::SoAttachReuseportEbpfError {
|
||||
io_error: io::Error::last_os_error(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
self.data.links.insert(SkReuseportLink { socket, prog_fd })
|
||||
}
|
||||
|
||||
/// Detaches the program.
|
||||
///
|
||||
/// See [`Self::attach`].
|
||||
pub fn detach(&mut self, link_id: SkReuseportLinkId) -> Result<(), ProgramError> {
|
||||
self.data.links.remove(link_id)
|
||||
}
|
||||
|
||||
/// Takes ownership of the link referenced by the provided `link_id`.
|
||||
///
|
||||
/// The caller takes the responsibility of managing the lifetime of the link. When the returned
|
||||
/// [`SkReuseportLink`] is dropped, the link is detached.
|
||||
pub fn take_link(
|
||||
&mut self,
|
||||
link_id: SkReuseportLinkId,
|
||||
) -> Result<SkReuseportLink, ProgramError> {
|
||||
self.data.links.forget(link_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// The type returned by [`SkReuseport::attach`]. Can be passed to [`SkReuseport::detach`].
|
||||
#[derive(Debug, Hash, Eq, PartialEq)]
|
||||
pub struct SkReuseportLinkId(RawFd, RawFd);
|
||||
|
||||
/// A SkReuseport Link
|
||||
#[derive(Debug)]
|
||||
pub struct SkReuseportLink {
|
||||
socket: RawFd,
|
||||
prog_fd: RawFd,
|
||||
}
|
||||
|
||||
impl Link for SkReuseportLink {
|
||||
type Id = SkReuseportLinkId;
|
||||
|
||||
fn id(&self) -> Self::Id {
|
||||
SkReuseportLinkId(self.socket, self.prog_fd)
|
||||
}
|
||||
|
||||
fn detach(self) -> Result<(), ProgramError> {
|
||||
unsafe {
|
||||
setsockopt(
|
||||
self.socket,
|
||||
SOL_SOCKET,
|
||||
SO_DETACH_REUSEPORT_BPF,
|
||||
&self.prog_fd as *const _ as *const _,
|
||||
mem::size_of::<RawFd>() as u32,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
id_as_key!(SkReuseportLink, SkReuseportLinkId);
|
@ -0,0 +1,60 @@
|
||||
use core::{cell::UnsafeCell, mem};
|
||||
|
||||
use crate::{
|
||||
bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_REUSEPORT_SOCKARRAY},
|
||||
maps::PinningType,
|
||||
};
|
||||
|
||||
/// An array of sockets for use with SO_REUSEPORT socket selection.
|
||||
///
|
||||
/// `ReusePortSockArray` is used to store sockets that participate in SO_REUSEPORT
|
||||
/// groups. eBPF programs of type `BPF_PROG_TYPE_SK_REUSEPORT` can use this map
|
||||
/// with the `bpf_sk_select_reuseport()` helper to select specific sockets for
|
||||
/// incoming connections.
|
||||
///
|
||||
/// # Minimum kernel version
|
||||
///
|
||||
/// The minimum kernel version required to use this feature is 4.19.
|
||||
#[repr(transparent)]
|
||||
pub struct ReusePortSockArray {
|
||||
def: UnsafeCell<bpf_map_def>,
|
||||
}
|
||||
|
||||
unsafe impl Sync for ReusePortSockArray {}
|
||||
|
||||
impl ReusePortSockArray {
|
||||
/// Creates a new `ReusePortSockArray` with the specified maximum number of entries.
|
||||
pub const fn with_max_entries(max_entries: u32, flags: u32) -> ReusePortSockArray {
|
||||
ReusePortSockArray {
|
||||
def: UnsafeCell::new(bpf_map_def {
|
||||
type_: BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
|
||||
key_size: mem::size_of::<u32>() as u32,
|
||||
value_size: mem::size_of::<u32>() as u32,
|
||||
max_entries,
|
||||
map_flags: flags,
|
||||
id: 0,
|
||||
pinning: PinningType::None as u32,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new pinned `ReusePortSockArray` with the specified maximum number of entries.
|
||||
pub const fn pinned(max_entries: u32, flags: u32) -> ReusePortSockArray {
|
||||
ReusePortSockArray {
|
||||
def: UnsafeCell::new(bpf_map_def {
|
||||
type_: BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
|
||||
key_size: mem::size_of::<u32>() as u32,
|
||||
value_size: mem::size_of::<u32>() as u32,
|
||||
max_entries,
|
||||
map_flags: flags,
|
||||
id: 0,
|
||||
pinning: PinningType::ByName as u32,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a raw pointer to the map definition for use with helpers.
|
||||
pub fn as_ptr(&self) -> *mut ::aya_ebpf_cty::c_void {
|
||||
self.def.get() as *mut _
|
||||
}
|
||||
}
|
@ -0,0 +1,162 @@
|
||||
//! Socket load balancing with SO_REUSEPORT programs.
|
||||
//!
|
||||
//! This module provides context and constants for BPF_PROG_TYPE_SK_REUSEPORT programs
|
||||
//! which allow custom load balancing logic for SO_REUSEPORT socket groups.
|
||||
//!
|
||||
//! # Basic Usage
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use aya_ebpf::{macros::sk_reuseport, programs::{SkReuseportContext, SK_PASS, SK_DROP}};
|
||||
//!
|
||||
//! #[sk_reuseport]
|
||||
//! pub fn load_balancer(ctx: SkReuseportContext) -> u32 {
|
||||
//! // Allow packet through - kernel will balance among sockets
|
||||
//! SK_PASS
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! # Advanced Socket Selection
|
||||
//!
|
||||
//! For explicit socket selection, use `bpf_sk_select_reuseport()` with socket arrays:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use aya_ebpf::{
|
||||
//! macros::{sk_reuseport, map},
|
||||
//! programs::{SkReuseportContext, SK_PASS, SK_DROP},
|
||||
//! helpers::bpf_sk_select_reuseport,
|
||||
//! maps::ReusePortSockArray,
|
||||
//! EbpfContext,
|
||||
//! };
|
||||
//!
|
||||
//! #[map(name = "socket_map")]
|
||||
//! static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(10, 0);
|
||||
//!
|
||||
//! #[sk_reuseport]
|
||||
//! pub fn select_worker(ctx: SkReuseportContext) -> u32 {
|
||||
//! // Custom logic to determine worker index
|
||||
//! let worker_id: u32 = 2;
|
||||
//!
|
||||
//! // Select specific socket using helper
|
||||
//! let ret = unsafe {
|
||||
//! bpf_sk_select_reuseport(
|
||||
//! ctx.as_ptr() as *mut _,
|
||||
//! SOCKET_MAP.as_ptr(),
|
||||
//! &worker_id as *const _ as *mut _,
|
||||
//! 0
|
||||
//! )
|
||||
//! };
|
||||
//!
|
||||
//! // Return SK_DROP on error, SK_PASS on success
|
||||
//! if ret == 0 {
|
||||
//! SK_PASS
|
||||
//! } else {
|
||||
//! SK_DROP
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! # Context Field Access Example
|
||||
//!
|
||||
//! Access packet metadata for custom load balancing decisions:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use aya_ebpf::{
|
||||
//! macros::{sk_reuseport, map},
|
||||
//! programs::{SkReuseportContext, SK_PASS, SK_DROP},
|
||||
//! helpers::bpf_sk_select_reuseport,
|
||||
//! maps::ReusePortSockArray,
|
||||
//! EbpfContext,
|
||||
//! };
|
||||
//!
|
||||
//! #[map(name = "socket_map")]
|
||||
//! static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(4, 0);
|
||||
//!
|
||||
//! #[sk_reuseport]
|
||||
//! pub fn hash_based_selection(ctx: SkReuseportContext) -> u32 {
|
||||
//! // Use packet hash for consistent load balancing
|
||||
//! let socket_idx = ctx.hash() % 4;
|
||||
//!
|
||||
//! // Only handle TCP traffic
|
||||
//! if ctx.ip_protocol() == 6 { // IPPROTO_TCP
|
||||
//! let ret = unsafe {
|
||||
//! bpf_sk_select_reuseport(
|
||||
//! ctx.as_ptr() as *mut _,
|
||||
//! SOCKET_MAP.as_ptr(),
|
||||
//! &socket_idx as *const _ as *mut _,
|
||||
//! 0
|
||||
//! )
|
||||
//! };
|
||||
//!
|
||||
//! if ret == 0 {
|
||||
//! SK_PASS
|
||||
//! } else {
|
||||
//! SK_DROP
|
||||
//! }
|
||||
//! } else {
|
||||
//! // Let kernel handle non-TCP traffic
|
||||
//! SK_PASS
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
use core::ffi::c_void;
|
||||
|
||||
use crate::{EbpfContext, bindings::sk_reuseport_md};
|
||||
|
||||
/// SK_PASS: Allow packet through and let kernel handle socket selection
|
||||
pub const SK_PASS: u32 = 1;
|
||||
|
||||
/// SK_DROP: Drop the packet
|
||||
pub const SK_DROP: u32 = 0;
|
||||
|
||||
pub struct SkReuseportContext {
|
||||
pub md: *mut sk_reuseport_md,
|
||||
}
|
||||
|
||||
impl SkReuseportContext {
|
||||
pub fn new(md: *mut sk_reuseport_md) -> SkReuseportContext {
|
||||
SkReuseportContext { md }
|
||||
}
|
||||
|
||||
/// Returns the start of the directly accessible data.
|
||||
pub fn data(&self) -> usize {
|
||||
unsafe { (*self.md).__bindgen_anon_1.data as usize }
|
||||
}
|
||||
|
||||
/// Returns the end of the directly accessible data.
|
||||
pub fn data_end(&self) -> usize {
|
||||
unsafe { (*self.md).__bindgen_anon_2.data_end as usize }
|
||||
}
|
||||
|
||||
/// Returns the total packet length.
|
||||
#[expect(clippy::len_without_is_empty)]
|
||||
pub fn len(&self) -> u32 {
|
||||
unsafe { (*self.md).len }
|
||||
}
|
||||
|
||||
/// Returns the ethernet protocol from the packet (network byte order).
|
||||
pub fn eth_protocol(&self) -> u32 {
|
||||
unsafe { (*self.md).eth_protocol }
|
||||
}
|
||||
|
||||
/// Returns the IP protocol (e.g., IPPROTO_TCP, IPPROTO_UDP).
|
||||
pub fn ip_protocol(&self) -> u32 {
|
||||
unsafe { (*self.md).ip_protocol }
|
||||
}
|
||||
|
||||
/// Returns whether the socket is bound to an INANY address.
|
||||
pub fn bind_inany(&self) -> u32 {
|
||||
unsafe { (*self.md).bind_inany }
|
||||
}
|
||||
|
||||
/// Returns the hash of the packet's 4-tuple for load balancing.
|
||||
pub fn hash(&self) -> u32 {
|
||||
unsafe { (*self.md).hash }
|
||||
}
|
||||
}
|
||||
|
||||
impl EbpfContext for SkReuseportContext {
|
||||
fn as_ptr(&self) -> *mut c_void {
|
||||
self.md as *mut _
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
#![no_std]
|
||||
#![no_main]
|
||||
|
||||
use aya_ebpf::{
|
||||
macros::{map, sk_reuseport},
|
||||
maps::ReusePortSockArray,
|
||||
programs::{SK_PASS, SK_DROP, SkReuseportContext},
|
||||
helpers::bpf_sk_select_reuseport,
|
||||
EbpfContext as _,
|
||||
};
|
||||
#[cfg(not(test))]
|
||||
extern crate ebpf_panic;
|
||||
|
||||
#[map(name = "socket_map")]
|
||||
static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(10, 0);
|
||||
|
||||
#[sk_reuseport]
|
||||
pub fn select_socket(_ctx: SkReuseportContext) -> u32 {
|
||||
// Return SK_PASS to allow packet and let kernel handle socket selection
|
||||
// Return SK_DROP would drop the packet
|
||||
SK_PASS
|
||||
}
|
||||
|
||||
#[sk_reuseport]
|
||||
pub fn test_context_access(ctx: SkReuseportContext) -> u32 {
|
||||
// Test accessing context fields
|
||||
let _len = ctx.len();
|
||||
let _hash = ctx.hash();
|
||||
let _ip_protocol = ctx.ip_protocol();
|
||||
let _eth_protocol = ctx.eth_protocol();
|
||||
let _bind_inany = ctx.bind_inany();
|
||||
let _data = ctx.data();
|
||||
let _data_end = ctx.data_end();
|
||||
|
||||
// Always pass for testing
|
||||
SK_PASS
|
||||
}
|
||||
|
||||
#[sk_reuseport]
|
||||
pub fn test_helper_usage(ctx: SkReuseportContext) -> u32 {
|
||||
// Use hash-based socket selection with helper
|
||||
let socket_idx = ctx.hash() % 4;
|
||||
|
||||
// Only handle TCP traffic (protocol 6)
|
||||
if ctx.ip_protocol() == 6 {
|
||||
let ret = unsafe {
|
||||
bpf_sk_select_reuseport(
|
||||
ctx.as_ptr() as *mut _,
|
||||
SOCKET_MAP.as_ptr(),
|
||||
&socket_idx as *const _ as *mut _,
|
||||
0
|
||||
)
|
||||
};
|
||||
|
||||
// Return result based on helper success
|
||||
if ret == 0 {
|
||||
SK_PASS
|
||||
} else {
|
||||
SK_DROP
|
||||
}
|
||||
} else {
|
||||
// Let kernel handle non-TCP traffic
|
||||
SK_PASS
|
||||
}
|
||||
}
|
@ -0,0 +1,307 @@
|
||||
use std::{net::TcpListener, os::fd::{AsFd as _, AsRawFd as _, FromRawFd as _}, time::Duration};
|
||||
|
||||
use aya::{
|
||||
Ebpf,
|
||||
maps::ReusePortSockArray,
|
||||
programs::{SkReuseport, loaded_programs},
|
||||
};
|
||||
use libc::{setsockopt, SOL_SOCKET, SO_REUSEPORT, socket, bind, listen, AF_INET, SOCK_STREAM, sockaddr_in};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::utils::NetNsGuard;
|
||||
|
||||
#[tokio::test]
|
||||
async fn sk_reuseport_load() {
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
let prog: &mut SkReuseport = ebpf
|
||||
.program_mut("select_socket")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
// Test that the program loads successfully
|
||||
prog.load().unwrap();
|
||||
|
||||
// Test that it's properly loaded
|
||||
let info = prog.info().unwrap();
|
||||
assert_eq!(
|
||||
info.program_type().unwrap(),
|
||||
aya::programs::ProgramType::SkReuseport
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sk_reuseport_loaded_programs_iteration() {
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
let prog: &mut SkReuseport = ebpf
|
||||
.program_mut("select_socket")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
let programs = loaded_programs().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
assert!(!programs.iter().any(|p| matches!(
|
||||
p.program_type().unwrap(),
|
||||
aya::programs::ProgramType::SkReuseport
|
||||
)));
|
||||
|
||||
prog.load().unwrap();
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
|
||||
let programs = loaded_programs().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
assert!(programs.iter().any(|p| matches!(
|
||||
p.program_type().unwrap(),
|
||||
aya::programs::ProgramType::SkReuseport
|
||||
)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sk_reuseport_map_operations() {
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
|
||||
// Test that we can access the ReusePortSockArray map
|
||||
// Note: This test checks map creation and basic operations
|
||||
// In a real scenario, you would need actual SO_REUSEPORT sockets
|
||||
let map: ReusePortSockArray<_> = ebpf.take_map("socket_map")
|
||||
.expect("socket_map should exist")
|
||||
.try_into()
|
||||
.expect("map should convert to ReusePortSockArray");
|
||||
|
||||
// Test that the map has the correct properties
|
||||
let fd = map.fd();
|
||||
assert!(!fd.as_fd().as_raw_fd() < 0, "Map fd should be valid");
|
||||
|
||||
// Test indices iterator (array maps have all indices pre-allocated)
|
||||
let indices_count = map.indices().count();
|
||||
assert_eq!(indices_count, 10, "Array map should have all 10 indices available");
|
||||
}
|
||||
|
||||
#[test_log::test]
|
||||
fn sk_reuseport_attach_detach() {
|
||||
let _netns = NetNsGuard::new();
|
||||
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
let prog: &mut SkReuseport = ebpf
|
||||
.program_mut("select_socket")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
prog.load().unwrap();
|
||||
|
||||
// Create a socket with SO_REUSEPORT enabled - a simpler approach
|
||||
// First create a normal listener to get a port, then create SO_REUSEPORT sockets
|
||||
let temp_listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = temp_listener.local_addr().unwrap();
|
||||
drop(temp_listener); // Release the port
|
||||
|
||||
// Create socket with SO_REUSEPORT before binding
|
||||
let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
|
||||
assert!(socket_fd >= 0, "Failed to create socket");
|
||||
|
||||
let enable = 1i32;
|
||||
unsafe {
|
||||
let ret = setsockopt(
|
||||
socket_fd,
|
||||
SOL_SOCKET,
|
||||
SO_REUSEPORT,
|
||||
&enable as *const _ as *const _,
|
||||
std::mem::size_of_val(&enable) as u32,
|
||||
);
|
||||
assert_eq!(ret, 0, "Failed to set SO_REUSEPORT");
|
||||
}
|
||||
|
||||
// Manually bind and listen
|
||||
let addr = sockaddr_in {
|
||||
sin_family: AF_INET as u16,
|
||||
sin_port: local_addr.port().to_be(),
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be(),
|
||||
},
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
|
||||
let bind_result = unsafe {
|
||||
bind(
|
||||
socket_fd,
|
||||
&addr as *const _ as *const libc::sockaddr,
|
||||
std::mem::size_of::<sockaddr_in>() as u32,
|
||||
)
|
||||
};
|
||||
assert_eq!(bind_result, 0, "Failed to bind socket");
|
||||
|
||||
let listen_result = unsafe { listen(socket_fd, 1024) };
|
||||
assert_eq!(listen_result, 0, "Failed to listen on socket");
|
||||
|
||||
// Convert to TcpListener
|
||||
let listener = unsafe { TcpListener::from_raw_fd(socket_fd) };
|
||||
|
||||
// Test program attachment
|
||||
let link_id = prog.attach(&listener).unwrap();
|
||||
|
||||
// Test program detachment
|
||||
prog.detach(link_id).unwrap();
|
||||
}
|
||||
|
||||
#[test_log::test]
|
||||
fn sk_reuseport_socket_array_operations() {
|
||||
let _netns = NetNsGuard::new();
|
||||
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
|
||||
// Get the socket array map
|
||||
let mut socket_array: ReusePortSockArray<_> = ebpf
|
||||
.take_map("socket_map")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
// Create multiple SO_REUSEPORT sockets that bind to the same port
|
||||
let temp_listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = temp_listener.local_addr().unwrap();
|
||||
drop(temp_listener); // Release the port
|
||||
|
||||
// Create first socket with SO_REUSEPORT
|
||||
let socket1_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
|
||||
assert!(socket1_fd >= 0, "Failed to create socket1");
|
||||
|
||||
// Create second socket with SO_REUSEPORT
|
||||
let socket2_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) };
|
||||
assert!(socket2_fd >= 0, "Failed to create socket2");
|
||||
|
||||
let enable = 1i32;
|
||||
// Set SO_REUSEPORT on both sockets before binding
|
||||
unsafe {
|
||||
let ret1 = setsockopt(
|
||||
socket1_fd,
|
||||
SOL_SOCKET,
|
||||
SO_REUSEPORT,
|
||||
&enable as *const _ as *const _,
|
||||
std::mem::size_of_val(&enable) as u32,
|
||||
);
|
||||
let ret2 = setsockopt(
|
||||
socket2_fd,
|
||||
SOL_SOCKET,
|
||||
SO_REUSEPORT,
|
||||
&enable as *const _ as *const _,
|
||||
std::mem::size_of_val(&enable) as u32,
|
||||
);
|
||||
assert_eq!(ret1, 0, "Failed to set SO_REUSEPORT on socket1");
|
||||
assert_eq!(ret2, 0, "Failed to set SO_REUSEPORT on socket2");
|
||||
}
|
||||
|
||||
// Bind both sockets to the same address to create SO_REUSEPORT group
|
||||
let addr = sockaddr_in {
|
||||
sin_family: AF_INET as u16,
|
||||
sin_port: local_addr.port().to_be(),
|
||||
sin_addr: libc::in_addr {
|
||||
s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be(),
|
||||
},
|
||||
sin_zero: [0; 8],
|
||||
};
|
||||
|
||||
unsafe {
|
||||
let bind_result1 = bind(
|
||||
socket1_fd,
|
||||
&addr as *const _ as *const libc::sockaddr,
|
||||
std::mem::size_of::<sockaddr_in>() as u32,
|
||||
);
|
||||
let bind_result2 = bind(
|
||||
socket2_fd,
|
||||
&addr as *const _ as *const libc::sockaddr,
|
||||
std::mem::size_of::<sockaddr_in>() as u32,
|
||||
);
|
||||
assert_eq!(bind_result1, 0, "Failed to bind socket1");
|
||||
assert_eq!(bind_result2, 0, "Failed to bind socket2");
|
||||
|
||||
let listen_result1 = listen(socket1_fd, 1024);
|
||||
let listen_result2 = listen(socket2_fd, 1024);
|
||||
assert_eq!(listen_result1, 0, "Failed to listen on socket1");
|
||||
assert_eq!(listen_result2, 0, "Failed to listen on socket2");
|
||||
}
|
||||
|
||||
// Convert to TcpListeners
|
||||
let listener1 = unsafe { TcpListener::from_raw_fd(socket1_fd) };
|
||||
let listener2 = unsafe { TcpListener::from_raw_fd(socket2_fd) };
|
||||
|
||||
// Test storing sockets in the array
|
||||
socket_array.set(0, &listener1, 0).unwrap();
|
||||
socket_array.set(1, &listener2, 0).unwrap();
|
||||
|
||||
// Test removing sockets from the array
|
||||
socket_array.clear_index(&0).unwrap();
|
||||
socket_array.clear_index(&1).unwrap();
|
||||
}
|
||||
|
||||
#[test_log::test]
|
||||
fn sk_reuseport_error_conditions() {
|
||||
let _netns = NetNsGuard::new();
|
||||
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
|
||||
// Get the socket array map
|
||||
let mut socket_array: ReusePortSockArray<_> = ebpf
|
||||
.take_map("socket_map")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
let socket = TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
|
||||
// Test bounds checking - should fail for out-of-bounds index
|
||||
let result = socket_array.set(100, &socket, 0);
|
||||
assert!(result.is_err(), "Setting socket at out-of-bounds index should fail");
|
||||
|
||||
let result = socket_array.clear_index(&100);
|
||||
assert!(result.is_err(), "Clearing out-of-bounds index should fail");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sk_reuseport_context_access() {
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
let prog: &mut SkReuseport = ebpf
|
||||
.program_mut("test_context_access")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
// Test that the program loads successfully with context field access
|
||||
prog.load().unwrap();
|
||||
|
||||
// Test that it's properly loaded
|
||||
let info = prog.info().unwrap();
|
||||
assert_eq!(
|
||||
info.program_type().unwrap(),
|
||||
aya::programs::ProgramType::SkReuseport
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sk_reuseport_helper_usage() {
|
||||
let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap();
|
||||
let prog: &mut SkReuseport = ebpf
|
||||
.program_mut("test_helper_usage")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
// Test that the program loads successfully with helper usage
|
||||
prog.load().unwrap();
|
||||
|
||||
// Test that it's properly loaded
|
||||
let info = prog.info().unwrap();
|
||||
assert_eq!(
|
||||
info.program_type().unwrap(),
|
||||
aya::programs::ProgramType::SkReuseport
|
||||
);
|
||||
|
||||
// Test that we can access the socket map used by the helper
|
||||
let map: ReusePortSockArray<_> = ebpf.take_map("socket_map")
|
||||
.expect("socket_map should exist")
|
||||
.try_into()
|
||||
.expect("map should convert to ReusePortSockArray");
|
||||
|
||||
// Verify map properties
|
||||
let indices_count = map.indices().count();
|
||||
assert_eq!(indices_count, 10, "Array map should have all 10 indices available");
|
||||
}
|
Loading…
Reference in New Issue