diff --git a/aya-ebpf-macros/src/lib.rs b/aya-ebpf-macros/src/lib.rs index d5811d53..634e6287 100644 --- a/aya-ebpf-macros/src/lib.rs +++ b/aya-ebpf-macros/src/lib.rs @@ -16,6 +16,7 @@ mod perf_event; mod raw_tracepoint; mod sk_lookup; mod sk_msg; +mod sk_reuseport; mod sk_skb; mod sock_ops; mod socket_filter; @@ -42,6 +43,7 @@ use proc_macro::TokenStream; use raw_tracepoint::RawTracePoint; use sk_lookup::SkLookup; use sk_msg::SkMsg; +use sk_reuseport::SkReuseport; use sk_skb::{SkSkb, SkSkbKind}; use sock_ops::SockOps; use socket_filter::SocketFilter; @@ -612,6 +614,67 @@ pub fn sk_lookup(attrs: TokenStream, item: TokenStream) -> TokenStream { .into() } +/// Marks a function as an eBPF Socket Reuseport program that can be attached to +/// a socket with SO_REUSEPORT set. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 4.19 +/// +/// # Examples +/// +/// Basic usage allowing kernel to handle socket selection: +/// +/// ```no_run +/// use aya_ebpf::{macros::sk_reuseport, programs::{SkReuseportContext, SK_PASS, SK_DROP}}; +/// +/// #[sk_reuseport] +/// pub fn select_socket(_ctx: SkReuseportContext) -> u32 { +/// // Return SK_DROP to drop packet, SK_PASS to let kernel handle selection +/// SK_PASS +/// } +/// ``` +/// +/// Advanced usage with custom socket selection: +/// +/// ```no_run +/// use aya_ebpf::{ +/// macros::{sk_reuseport, map}, +/// programs::{SkReuseportContext, SK_PASS, SK_DROP}, +/// helpers::bpf_sk_select_reuseport, +/// maps::ReusePortSockArray, +/// EbpfContext, +/// }; +/// +/// #[map(name = "socket_map")] +/// static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(4, 0); +/// +/// #[sk_reuseport] +/// pub fn load_balance(ctx: SkReuseportContext) -> u32 { +/// // Use packet hash for consistent load balancing +/// let socket_idx = ctx.hash() % 4; +/// +/// let ret = unsafe { +/// bpf_sk_select_reuseport( +/// ctx.as_ptr() as *mut _, +/// SOCKET_MAP.as_ptr(), +/// &socket_idx as *const _ as *mut _, +/// 0 +/// ) +/// }; +/// +/// if ret == 0 { SK_PASS } else { SK_DROP } +/// } +/// ``` +#[proc_macro_attribute] +pub fn sk_reuseport(attrs: TokenStream, item: TokenStream) -> TokenStream { + match SkReuseport::parse(attrs.into(), item.into()) { + Ok(prog) => prog.expand(), + Err(err) => err.emit_as_expr_tokens(), + } + .into() +} + /// Marks a function as a cgroup device eBPF program that can be attached to a /// cgroup. /// diff --git a/aya-ebpf-macros/src/sk_reuseport.rs b/aya-ebpf-macros/src/sk_reuseport.rs new file mode 100644 index 00000000..7407c216 --- /dev/null +++ b/aya-ebpf-macros/src/sk_reuseport.rs @@ -0,0 +1,71 @@ +use proc_macro2::TokenStream; +use proc_macro2_diagnostics::{Diagnostic, SpanDiagnosticExt as _}; +use quote::quote; +use syn::{ItemFn, spanned::Spanned as _}; + +pub(crate) struct SkReuseport { + item: ItemFn, +} + +impl SkReuseport { + pub(crate) fn parse(attrs: TokenStream, item: TokenStream) -> Result { + if !attrs.is_empty() { + return Err(attrs.span().error("unexpected attribute")); + } + let item = syn::parse2(item)?; + Ok(Self { item }) + } + + pub(crate) fn expand(&self) -> TokenStream { + let Self { item } = self; + let ItemFn { + attrs: _, + vis, + sig, + block: _, + } = item; + let fn_name = &sig.ident; + quote! { + #[unsafe(no_mangle)] + #[unsafe(link_section = "sk_reuseport")] + #vis fn #fn_name(ctx: *mut ::aya_ebpf::bindings::sk_reuseport_md) -> u32 { + return #fn_name(::aya_ebpf::programs::SkReuseportContext::new(ctx)); + + #item + } + } + } +} + +#[cfg(test)] +mod tests { + use syn::parse_quote; + + use super::*; + + #[test] + fn test_sk_reuseport() { + let prog = SkReuseport::parse( + parse_quote! {}, + parse_quote! { + fn prog(ctx: &mut ::aya_ebpf::programs::SkReuseportContext) -> u32 { + 0 + } + }, + ) + .unwrap(); + let expanded = prog.expand(); + let expected = quote! { + #[unsafe(no_mangle)] + #[unsafe(link_section = "sk_reuseport")] + fn prog(ctx: *mut ::aya_ebpf::bindings::sk_reuseport_md) -> u32 { + return prog(::aya_ebpf::programs::SkReuseportContext::new(ctx)); + + fn prog(ctx: &mut ::aya_ebpf::programs::SkReuseportContext) -> u32 { + 0 + } + } + }; + assert_eq!(expected.to_string(), expanded.to_string()); + } +} diff --git a/aya-obj/src/obj.rs b/aya-obj/src/obj.rs index ece738ef..93ddae90 100644 --- a/aya-obj/src/obj.rs +++ b/aya-obj/src/obj.rs @@ -215,7 +215,7 @@ pub struct Function { /// - `lwt_in`, `lwt_out`, `lwt_seg6local`, `lwt_xmit` /// - `raw_tp.w+`, `raw_tracepoint.w+` /// - `action` -/// - `sk_reuseport/migrate`, `sk_reuseport` +/// - `sk_reuseport/migrate` /// - `syscall` /// - `struct_ops+` /// - `fmod_ret+`, `fmod_ret.s+` @@ -268,6 +268,7 @@ pub enum ProgramSection { FlowDissector, Extension, SkLookup, + SkReuseport, CgroupSock { attach_type: CgroupSockAttachType, }, @@ -427,6 +428,7 @@ impl FromStr for ProgramSection { "flow_dissector" => FlowDissector, "freplace" => Extension, "sk_lookup" => SkLookup, + "sk_reuseport" => SkReuseport, "iter" => Iter { sleepable: false }, "iter.s" => Iter { sleepable: true }, _ => { diff --git a/aya/src/bpf.rs b/aya/src/bpf.rs index c1b67955..32e89567 100644 --- a/aya/src/bpf.rs +++ b/aya/src/bpf.rs @@ -25,8 +25,8 @@ use crate::{ BtfTracePoint, CgroupDevice, CgroupSkb, CgroupSkbAttachType, CgroupSock, CgroupSockAddr, CgroupSockopt, CgroupSysctl, Extension, FEntry, FExit, FlowDissector, Iter, KProbe, LircMode2, Lsm, PerfEvent, ProbeKind, Program, ProgramData, ProgramError, RawTracePoint, - SchedClassifier, SkLookup, SkMsg, SkSkb, SkSkbKind, SockOps, SocketFilter, TracePoint, - UProbe, Xdp, + SchedClassifier, SkLookup, SkMsg, SkReuseport, SkSkb, SkSkbKind, SockOps, SocketFilter, + TracePoint, UProbe, Xdp, }, sys::{ bpf_load_btf, is_bpf_cookie_supported, is_bpf_global_data_supported, @@ -429,6 +429,7 @@ impl<'a> EbpfLoader<'a> { | ProgramSection::PerfEvent | ProgramSection::RawTracePoint | ProgramSection::SkLookup + | ProgramSection::SkReuseport | ProgramSection::FlowDissector | ProgramSection::CgroupSock { attach_type: _ } | ProgramSection::CgroupDevice => {} @@ -670,6 +671,9 @@ impl<'a> EbpfLoader<'a> { ProgramSection::SkLookup => Program::SkLookup(SkLookup { data: ProgramData::new(prog_name, obj, btf_fd, *verifier_log_level), }), + ProgramSection::SkReuseport => Program::SkReuseport(SkReuseport { + data: ProgramData::new(prog_name, obj, btf_fd, *verifier_log_level), + }), ProgramSection::CgroupSock { attach_type, .. } => { Program::CgroupSock(CgroupSock { data: ProgramData::new(prog_name, obj, btf_fd, *verifier_log_level), @@ -716,6 +720,7 @@ fn parse_map( BPF_MAP_TYPE_PERCPU_HASH => Map::PerCpuHashMap(map), BPF_MAP_TYPE_LRU_PERCPU_HASH => Map::PerCpuLruHashMap(map), BPF_MAP_TYPE_PERF_EVENT_ARRAY => Map::PerfEventArray(map), + BPF_MAP_TYPE_REUSEPORT_SOCKARRAY => Map::ReusePortSockArray(map), BPF_MAP_TYPE_RINGBUF => Map::RingBuf(map), BPF_MAP_TYPE_SOCKHASH => Map::SockHash(map), BPF_MAP_TYPE_SOCKMAP => Map::SockMap(map), diff --git a/aya/src/maps/mod.rs b/aya/src/maps/mod.rs index 0d3e0ebf..44a33b77 100644 --- a/aya/src/maps/mod.rs +++ b/aya/src/maps/mod.rs @@ -95,7 +95,7 @@ pub use lpm_trie::LpmTrie; pub use perf::PerfEventArray; pub use queue::Queue; pub use ring_buf::RingBuf; -pub use sock::{SockHash, SockMap}; +pub use sock::{ReusePortSockArray, SockHash, SockMap}; pub use stack::Stack; pub use stack_trace::StackTraceMap; pub use xdp::{CpuMap, DevMap, DevMapHash, XskMap}; @@ -297,6 +297,8 @@ pub enum Map { ProgramArray(MapData), /// A [`Queue`] map. Queue(MapData), + /// A [`ReusePortSockArray`] map. + ReusePortSockArray(MapData), /// A [`RingBuf`] map. RingBuf(MapData), /// A [`SockHash`] map @@ -331,6 +333,7 @@ impl Map { Self::PerfEventArray(map) => map.obj.map_type(), Self::ProgramArray(map) => map.obj.map_type(), Self::Queue(map) => map.obj.map_type(), + Self::ReusePortSockArray(map) => map.obj.map_type(), Self::RingBuf(map) => map.obj.map_type(), Self::SockHash(map) => map.obj.map_type(), Self::SockMap(map) => map.obj.map_type(), @@ -362,6 +365,7 @@ impl Map { Self::ProgramArray(map) => map.pin(path), Self::Queue(map) => map.pin(path), Self::RingBuf(map) => map.pin(path), + Self::ReusePortSockArray(map) => map.pin(path), Self::SockHash(map) => map.pin(path), Self::SockMap(map) => map.pin(path), Self::Stack(map) => map.pin(path), @@ -480,6 +484,7 @@ impl_try_from_map!(() { DevMapHash, PerfEventArray, ProgramArray, + ReusePortSockArray, RingBuf, SockMap, StackTraceMap, diff --git a/aya/src/maps/sock/mod.rs b/aya/src/maps/sock/mod.rs index b1015f2c..e63c4267 100644 --- a/aya/src/maps/sock/mod.rs +++ b/aya/src/maps/sock/mod.rs @@ -1,4 +1,5 @@ //! Socket maps. +mod reuseport_sock_array; mod sock_hash; mod sock_map; @@ -7,6 +8,7 @@ use std::{ os::fd::{AsFd, BorrowedFd}, }; +pub use reuseport_sock_array::ReusePortSockArray; pub use sock_hash::SockHash; pub use sock_map::SockMap; diff --git a/aya/src/maps/sock/reuseport_sock_array.rs b/aya/src/maps/sock/reuseport_sock_array.rs new file mode 100644 index 00000000..502be21c --- /dev/null +++ b/aya/src/maps/sock/reuseport_sock_array.rs @@ -0,0 +1,171 @@ +//! An array of sockets for SO_REUSEPORT selection. + +use std::{ + borrow::{Borrow, BorrowMut}, + os::fd::{AsFd as _, AsRawFd, RawFd}, +}; + +use crate::{ + maps::{MapData, MapError, MapFd, MapKeys, check_bounds, check_kv_size, sock::SockMapFd}, + sys::{SyscallError, bpf_map_delete_elem, bpf_map_update_elem}, +}; + +/// An array of sockets that can be shared between eBPF programs and user space. +/// +/// `ReusePortSockArray` stores sockets that participate in SO_REUSEPORT groups. +/// eBPF programs of type `BPF_PROG_TYPE_SK_REUSEPORT` can use this map with the +/// `bpf_sk_select_reuseport()` helper to select specific sockets for incoming +/// connections. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 4.19. +/// +/// # Examples +/// +/// ```no_run +/// # let mut bpf = aya::Ebpf::load(&[])?; +/// use aya::maps::ReusePortSockArray; +/// use aya::programs::SkReuseport; +/// use std::os::fd::{AsRawFd, FromRawFd}; +/// use std::net::TcpListener; +/// use libc::{socket, setsockopt, bind, listen, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEPORT, sockaddr_in}; +/// +/// // Create socket with SO_REUSEPORT enabled +/// let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) }; +/// let enable = 1i32; +/// unsafe { +/// setsockopt( +/// socket_fd, +/// SOL_SOCKET, +/// SO_REUSEPORT, +/// &enable as *const _ as *const _, +/// std::mem::size_of_val(&enable) as u32, +/// ); +/// } +/// +/// // Bind and listen (setup details omitted for brevity) +/// // ... bind(socket_fd, &addr, addr_len) and listen(socket_fd, backlog) ... +/// let socket = unsafe { TcpListener::from_raw_fd(socket_fd) }; +/// +/// // Load the socket array map and populate it +/// let mut socket_array: ReusePortSockArray<_> = bpf.take_map("socket_map").unwrap().try_into()?; +/// socket_array.set(0, &socket, 0)?; +/// +/// // Load and attach the SK_REUSEPORT program +/// let prog: &mut SkReuseport = bpf.program_mut("select_socket").unwrap().try_into()?; +/// prog.load()?; +/// prog.attach(&socket)?; +/// # Ok::<(), Box>(()) +/// ``` +/// +/// # Complete Setup Example +/// +/// This example shows proper SO_REUSEPORT socket group setup: +/// +/// ```no_run +/// # let mut bpf = aya::Ebpf::load(&[])?; +/// use aya::maps::ReusePortSockArray; +/// use aya::programs::SkReuseport; +/// use std::net::TcpListener; +/// use std::os::fd::{AsRawFd, FromRawFd}; +/// use libc::{socket, setsockopt, bind, listen, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEPORT, sockaddr_in}; +/// +/// // Create multiple sockets in SO_REUSEPORT group +/// let port = 8080u16; +/// let addr = sockaddr_in { +/// sin_family: AF_INET as u16, +/// sin_port: port.to_be(), +/// sin_addr: libc::in_addr { s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be() }, +/// sin_zero: [0; 8], +/// }; +/// +/// let enable = 1i32; +/// let mut sockets = Vec::new(); +/// +/// // Create 4 SO_REUSEPORT sockets +/// for _ in 0..4 { +/// let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) }; +/// unsafe { +/// // Set SO_REUSEPORT before binding (required for reuseport groups) +/// setsockopt(socket_fd, SOL_SOCKET, SO_REUSEPORT, +/// &enable as *const _ as *const _, std::mem::size_of_val(&enable) as u32); +/// bind(socket_fd, &addr as *const _ as *const libc::sockaddr, +/// std::mem::size_of::() as u32); +/// listen(socket_fd, 1024); +/// } +/// sockets.push(unsafe { TcpListener::from_raw_fd(socket_fd) }); +/// } +/// +/// // Load and populate the socket array map +/// let mut socket_array: ReusePortSockArray<_> = bpf.take_map("socket_map").unwrap().try_into()?; +/// for (i, socket) in sockets.iter().enumerate() { +/// socket_array.set(i as u32, socket, 0)?; +/// } +/// +/// // Load and attach the SK_REUSEPORT program to first socket in group +/// let prog: &mut SkReuseport = bpf.program_mut("load_balancer").unwrap().try_into()?; +/// prog.load()?; +/// prog.attach(&sockets[0])?; +/// # Ok::<(), Box>(()) +/// ``` +#[doc(alias = "BPF_MAP_TYPE_REUSEPORT_SOCKARRAY")] +pub struct ReusePortSockArray { + pub(crate) inner: T, +} + +impl> ReusePortSockArray { + pub(crate) fn new(map: T) -> Result { + let data = map.borrow(); + check_kv_size::(data)?; + + Ok(Self { inner: map }) + } + + /// An iterator over the indices of the array that point to a socket. The iterator item type + /// is `Result`. + pub fn indices(&self) -> MapKeys<'_, u32> { + MapKeys::new(self.inner.borrow()) + } + + /// Returns the map's file descriptor. + /// + /// The returned file descriptor can be used with [`SkReuseport`](crate::programs::SkReuseport) programs. + pub fn fd(&self) -> &SockMapFd { + let fd: &MapFd = self.inner.borrow().fd(); + // TODO(https://github.com/rust-lang/rfcs/issues/3066): avoid this unsafe. + // SAFETY: `SockMapFd` is #[repr(transparent)] over `MapFd`. + unsafe { std::mem::transmute(fd) } + } +} + +impl> ReusePortSockArray { + /// Stores a socket into the map at the given index. + /// + /// The socket will be available for selection by `bpf_sk_select_reuseport()` helper + /// using the provided index. + pub fn set(&mut self, index: u32, socket: &I, flags: u64) -> Result<(), MapError> { + let data = self.inner.borrow_mut(); + let fd = data.fd().as_fd(); + check_bounds(data, index)?; + bpf_map_update_elem(fd, Some(&index), &socket.as_raw_fd(), flags) + .map_err(|io_error| SyscallError { + call: "bpf_map_update_elem", + io_error, + }) + .map_err(Into::into) + } + + /// Removes the socket stored at `index` from the map. + pub fn clear_index(&mut self, index: &u32) -> Result<(), MapError> { + let data = self.inner.borrow_mut(); + let fd = data.fd().as_fd(); + check_bounds(data, *index)?; + bpf_map_delete_elem(fd, index) + .map_err(|io_error| SyscallError { + call: "bpf_map_delete_elem", + io_error, + }) + .map_err(Into::into) + } +} \ No newline at end of file diff --git a/aya/src/maps/sock/sock_map.rs b/aya/src/maps/sock/sock_map.rs index d139730a..facd9106 100644 --- a/aya/src/maps/sock/sock_map.rs +++ b/aya/src/maps/sock/sock_map.rs @@ -63,7 +63,7 @@ impl> SockMap { Ok(Self { inner: map }) } - /// An iterator over the indices of the array that point to a program. The iterator item type + /// An iterator over the indices of the array that point to a socket. The iterator item type /// is `Result`. pub fn indices(&self) -> MapKeys<'_, u32> { MapKeys::new(self.inner.borrow()) diff --git a/aya/src/programs/mod.rs b/aya/src/programs/mod.rs index fa1e41ec..c74078c8 100644 --- a/aya/src/programs/mod.rs +++ b/aya/src/programs/mod.rs @@ -62,6 +62,7 @@ pub mod perf_event; pub mod raw_trace_point; pub mod sk_lookup; pub mod sk_msg; +pub mod sk_reuseport; pub mod sk_skb; pub mod sock_ops; pub mod socket_filter; @@ -114,6 +115,7 @@ pub use crate::programs::{ raw_trace_point::RawTracePoint, sk_lookup::SkLookup, sk_msg::SkMsg, + sk_reuseport::{SkReuseport, SkReuseportError}, sk_skb::{SkSkb, SkSkbKind}, sock_ops::SockOps, socket_filter::{SocketFilter, SocketFilterError}, @@ -200,6 +202,10 @@ pub enum ProgramError { #[error(transparent)] SocketFilterError(#[from] SocketFilterError), + /// An error occurred while working with a [`SkReuseport`] program. + #[error(transparent)] + SkReuseportError(#[from] SkReuseportError), + /// An error occurred while working with an [`Xdp`] program. #[error(transparent)] XdpError(#[from] XdpError), @@ -319,6 +325,8 @@ pub enum Program { Extension(Extension), /// A [`SkLookup`] program SkLookup(SkLookup), + /// A [`SkReuseport`] program + SkReuseport(SkReuseport), /// A [`CgroupSock`] program CgroupSock(CgroupSock), /// A [`CgroupDevice`] program @@ -353,6 +361,7 @@ impl Program { Self::Extension(_) => Extension::PROGRAM_TYPE, Self::CgroupSockAddr(_) => CgroupSockAddr::PROGRAM_TYPE, Self::SkLookup(_) => SkLookup::PROGRAM_TYPE, + Self::SkReuseport(_) => SkReuseport::PROGRAM_TYPE, Self::CgroupSock(_) => CgroupSock::PROGRAM_TYPE, Self::CgroupDevice(_) => CgroupDevice::PROGRAM_TYPE, Self::Iter(_) => Iter::PROGRAM_TYPE, @@ -386,6 +395,7 @@ impl Program { Self::Extension(p) => p.pin(path), Self::CgroupSockAddr(p) => p.pin(path), Self::SkLookup(p) => p.pin(path), + Self::SkReuseport(p) => p.pin(path), Self::CgroupSock(p) => p.pin(path), Self::CgroupDevice(p) => p.pin(path), Self::Iter(p) => p.pin(path), @@ -418,6 +428,7 @@ impl Program { Self::Extension(mut p) => p.unload(), Self::CgroupSockAddr(mut p) => p.unload(), Self::SkLookup(mut p) => p.unload(), + Self::SkReuseport(mut p) => p.unload(), Self::CgroupSock(mut p) => p.unload(), Self::CgroupDevice(mut p) => p.unload(), Self::Iter(mut p) => p.unload(), @@ -452,6 +463,7 @@ impl Program { Self::Extension(p) => p.fd(), Self::CgroupSockAddr(p) => p.fd(), Self::SkLookup(p) => p.fd(), + Self::SkReuseport(p) => p.fd(), Self::CgroupSock(p) => p.fd(), Self::CgroupDevice(p) => p.fd(), Self::Iter(p) => p.fd(), @@ -487,6 +499,7 @@ impl Program { Self::Extension(p) => p.info(), Self::CgroupSockAddr(p) => p.info(), Self::SkLookup(p) => p.info(), + Self::SkReuseport(p) => p.info(), Self::CgroupSock(p) => p.info(), Self::CgroupDevice(p) => p.info(), Self::Iter(p) => p.info(), @@ -802,6 +815,7 @@ impl_program_unload!( Extension, CgroupSockAddr, SkLookup, + SkReuseport, SockOps, CgroupSock, CgroupDevice, @@ -844,6 +858,7 @@ impl_fd!( Extension, CgroupSockAddr, SkLookup, + SkReuseport, SockOps, CgroupSock, CgroupDevice, @@ -951,6 +966,7 @@ impl_program_pin!( Extension, CgroupSockAddr, SkLookup, + SkReuseport, SockOps, CgroupSock, CgroupDevice, @@ -992,6 +1008,7 @@ impl_from_pin!( FlowDissector, Extension, SkLookup, + SkReuseport, SockOps, CgroupDevice, Iter, @@ -1101,6 +1118,7 @@ impl_from_prog_info!( unsafe FExit, Extension, SkLookup, + SkReuseport, CgroupDevice, Iter, ); @@ -1157,6 +1175,7 @@ impl_try_from_program!( Extension, CgroupSockAddr, SkLookup, + SkReuseport, CgroupSock, CgroupDevice, Iter, @@ -1185,6 +1204,7 @@ impl_info!( Extension, CgroupSockAddr, SkLookup, + SkReuseport, SockOps, CgroupSock, CgroupDevice, diff --git a/aya/src/programs/sk_reuseport.rs b/aya/src/programs/sk_reuseport.rs new file mode 100644 index 00000000..6f38912d --- /dev/null +++ b/aya/src/programs/sk_reuseport.rs @@ -0,0 +1,164 @@ +//! Socket load balancing with SO_REUSEPORT. +use std::{ + io, mem, + os::fd::{AsFd, AsRawFd as _, RawFd}, +}; + +use aya_obj::generated::{ + bpf_attach_type::BPF_SK_REUSEPORT_SELECT, bpf_prog_type::BPF_PROG_TYPE_SK_REUSEPORT, +}; +use libc::{SOL_SOCKET, setsockopt}; +use thiserror::Error; + +use crate::programs::{Link, ProgramData, ProgramError, ProgramType, id_as_key, load_program}; + +/// SO_ATTACH_REUSEPORT_EBPF socket option constant. +const SO_ATTACH_REUSEPORT_EBPF: i32 = 52; + +/// SO_DETACH_REUSEPORT_BPF socket option constant. +const SO_DETACH_REUSEPORT_BPF: i32 = 68; + +/// The type returned when attaching a [`SkReuseport`] fails. +#[derive(Debug, Error)] +pub enum SkReuseportError { + /// Setting the `SO_ATTACH_REUSEPORT_EBPF` socket option failed. + #[error("setsockopt SO_ATTACH_REUSEPORT_EBPF failed")] + SoAttachReuseportEbpfError { + /// original [`io::Error`] + #[source] + io_error: io::Error, + }, +} + +/// A program used to select a socket within a SO_REUSEPORT group. +/// +/// [`SkReuseport`] programs are attached to sockets with SO_REUSEPORT set to +/// provide programmable socket selection when multiple sockets are listening +/// on the same port. The program decides which socket in the reuseport group +/// should handle an incoming connection or packet. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 4.19. +/// +/// # Examples +/// +/// ```no_run +/// # #[derive(Debug, thiserror::Error)] +/// # enum Error { +/// # #[error(transparent)] +/// # IO(#[from] std::io::Error), +/// # #[error(transparent)] +/// # Map(#[from] aya::maps::MapError), +/// # #[error(transparent)] +/// # Program(#[from] aya::programs::ProgramError), +/// # #[error(transparent)] +/// # Ebpf(#[from] aya::EbpfError) +/// # } +/// # let mut bpf = aya::Ebpf::load(&[])?; +/// use std::net::TcpListener; +/// use aya::programs::SkReuseport; +/// +/// let listener = TcpListener::bind("127.0.0.1:8080")?; +/// let program: &mut SkReuseport = bpf.program_mut("select_socket").unwrap().try_into()?; +/// program.load()?; +/// program.attach(listener)?; +/// # Ok::<(), Error>(()) +/// ``` +#[derive(Debug)] +#[doc(alias = "BPF_PROG_TYPE_SK_REUSEPORT")] +pub struct SkReuseport { + pub(crate) data: ProgramData, +} + +impl SkReuseport { + /// The type of the program according to the kernel. + pub const PROGRAM_TYPE: ProgramType = ProgramType::SkReuseport; + + /// Loads the program inside the kernel. + pub fn load(&mut self) -> Result<(), ProgramError> { + self.data.expected_attach_type = Some(BPF_SK_REUSEPORT_SELECT); + load_program(BPF_PROG_TYPE_SK_REUSEPORT, &mut self.data) + } + + /// Attaches the program to the given socket. + /// + /// The returned value can be used to detach, see [SkReuseport::detach]. + pub fn attach(&mut self, socket: T) -> Result { + let prog_fd = self.fd()?; + let prog_fd = prog_fd.as_fd(); + let prog_fd = prog_fd.as_raw_fd(); + let socket = socket.as_fd(); + let socket = socket.as_raw_fd(); + + let ret = unsafe { + setsockopt( + socket, + SOL_SOCKET, + SO_ATTACH_REUSEPORT_EBPF, + &prog_fd as *const _ as *const _, + mem::size_of::() as u32, + ) + }; + if ret < 0 { + return Err(SkReuseportError::SoAttachReuseportEbpfError { + io_error: io::Error::last_os_error(), + } + .into()); + } + + self.data.links.insert(SkReuseportLink { socket, prog_fd }) + } + + /// Detaches the program. + /// + /// See [`Self::attach`]. + pub fn detach(&mut self, link_id: SkReuseportLinkId) -> Result<(), ProgramError> { + self.data.links.remove(link_id) + } + + /// Takes ownership of the link referenced by the provided `link_id`. + /// + /// The caller takes the responsibility of managing the lifetime of the link. When the returned + /// [`SkReuseportLink`] is dropped, the link is detached. + pub fn take_link( + &mut self, + link_id: SkReuseportLinkId, + ) -> Result { + self.data.links.forget(link_id) + } +} + +/// The type returned by [`SkReuseport::attach`]. Can be passed to [`SkReuseport::detach`]. +#[derive(Debug, Hash, Eq, PartialEq)] +pub struct SkReuseportLinkId(RawFd, RawFd); + +/// A SkReuseport Link +#[derive(Debug)] +pub struct SkReuseportLink { + socket: RawFd, + prog_fd: RawFd, +} + +impl Link for SkReuseportLink { + type Id = SkReuseportLinkId; + + fn id(&self) -> Self::Id { + SkReuseportLinkId(self.socket, self.prog_fd) + } + + fn detach(self) -> Result<(), ProgramError> { + unsafe { + setsockopt( + self.socket, + SOL_SOCKET, + SO_DETACH_REUSEPORT_BPF, + &self.prog_fd as *const _ as *const _, + mem::size_of::() as u32, + ); + } + Ok(()) + } +} + +id_as_key!(SkReuseportLink, SkReuseportLinkId); diff --git a/ebpf/aya-ebpf/src/maps/mod.rs b/ebpf/aya-ebpf/src/maps/mod.rs index ead24dc3..a4975760 100644 --- a/ebpf/aya-ebpf/src/maps/mod.rs +++ b/ebpf/aya-ebpf/src/maps/mod.rs @@ -13,6 +13,7 @@ pub mod per_cpu_array; pub mod perf; pub mod program_array; pub mod queue; +pub mod reuseport_sock_array; pub mod ring_buf; pub mod sock_hash; pub mod sock_map; @@ -28,6 +29,7 @@ pub use per_cpu_array::PerCpuArray; pub use perf::{PerfEventArray, PerfEventByteArray}; pub use program_array::ProgramArray; pub use queue::Queue; +pub use reuseport_sock_array::ReusePortSockArray; pub use ring_buf::RingBuf; pub use sock_hash::SockHash; pub use sock_map::SockMap; diff --git a/ebpf/aya-ebpf/src/maps/reuseport_sock_array.rs b/ebpf/aya-ebpf/src/maps/reuseport_sock_array.rs new file mode 100644 index 00000000..1a2808cf --- /dev/null +++ b/ebpf/aya-ebpf/src/maps/reuseport_sock_array.rs @@ -0,0 +1,60 @@ +use core::{cell::UnsafeCell, mem}; + +use crate::{ + bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_REUSEPORT_SOCKARRAY}, + maps::PinningType, +}; + +/// An array of sockets for use with SO_REUSEPORT socket selection. +/// +/// `ReusePortSockArray` is used to store sockets that participate in SO_REUSEPORT +/// groups. eBPF programs of type `BPF_PROG_TYPE_SK_REUSEPORT` can use this map +/// with the `bpf_sk_select_reuseport()` helper to select specific sockets for +/// incoming connections. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 4.19. +#[repr(transparent)] +pub struct ReusePortSockArray { + def: UnsafeCell, +} + +unsafe impl Sync for ReusePortSockArray {} + +impl ReusePortSockArray { + /// Creates a new `ReusePortSockArray` with the specified maximum number of entries. + pub const fn with_max_entries(max_entries: u32, flags: u32) -> ReusePortSockArray { + ReusePortSockArray { + def: UnsafeCell::new(bpf_map_def { + type_: BPF_MAP_TYPE_REUSEPORT_SOCKARRAY, + key_size: mem::size_of::() as u32, + value_size: mem::size_of::() as u32, + max_entries, + map_flags: flags, + id: 0, + pinning: PinningType::None as u32, + }), + } + } + + /// Creates a new pinned `ReusePortSockArray` with the specified maximum number of entries. + pub const fn pinned(max_entries: u32, flags: u32) -> ReusePortSockArray { + ReusePortSockArray { + def: UnsafeCell::new(bpf_map_def { + type_: BPF_MAP_TYPE_REUSEPORT_SOCKARRAY, + key_size: mem::size_of::() as u32, + value_size: mem::size_of::() as u32, + max_entries, + map_flags: flags, + id: 0, + pinning: PinningType::ByName as u32, + }), + } + } + + /// Returns a raw pointer to the map definition for use with helpers. + pub fn as_ptr(&self) -> *mut ::aya_ebpf_cty::c_void { + self.def.get() as *mut _ + } +} \ No newline at end of file diff --git a/ebpf/aya-ebpf/src/programs/mod.rs b/ebpf/aya-ebpf/src/programs/mod.rs index f95b38de..fe4e8c91 100644 --- a/ebpf/aya-ebpf/src/programs/mod.rs +++ b/ebpf/aya-ebpf/src/programs/mod.rs @@ -10,6 +10,7 @@ pub mod retprobe; pub mod sk_buff; pub mod sk_lookup; pub mod sk_msg; +pub mod sk_reuseport; pub mod sock; pub mod sock_addr; pub mod sock_ops; @@ -32,6 +33,7 @@ pub use retprobe::RetProbeContext; pub use sk_buff::SkBuffContext; pub use sk_lookup::SkLookupContext; pub use sk_msg::SkMsgContext; +pub use sk_reuseport::{SK_DROP, SK_PASS, SkReuseportContext}; pub use sock::SockContext; pub use sock_addr::SockAddrContext; pub use sock_ops::SockOpsContext; diff --git a/ebpf/aya-ebpf/src/programs/sk_reuseport.rs b/ebpf/aya-ebpf/src/programs/sk_reuseport.rs new file mode 100644 index 00000000..4e7c73fc --- /dev/null +++ b/ebpf/aya-ebpf/src/programs/sk_reuseport.rs @@ -0,0 +1,162 @@ +//! Socket load balancing with SO_REUSEPORT programs. +//! +//! This module provides context and constants for BPF_PROG_TYPE_SK_REUSEPORT programs +//! which allow custom load balancing logic for SO_REUSEPORT socket groups. +//! +//! # Basic Usage +//! +//! ```no_run +//! use aya_ebpf::{macros::sk_reuseport, programs::{SkReuseportContext, SK_PASS, SK_DROP}}; +//! +//! #[sk_reuseport] +//! pub fn load_balancer(ctx: SkReuseportContext) -> u32 { +//! // Allow packet through - kernel will balance among sockets +//! SK_PASS +//! } +//! ``` +//! +//! # Advanced Socket Selection +//! +//! For explicit socket selection, use `bpf_sk_select_reuseport()` with socket arrays: +//! +//! ```no_run +//! use aya_ebpf::{ +//! macros::{sk_reuseport, map}, +//! programs::{SkReuseportContext, SK_PASS, SK_DROP}, +//! helpers::bpf_sk_select_reuseport, +//! maps::ReusePortSockArray, +//! EbpfContext, +//! }; +//! +//! #[map(name = "socket_map")] +//! static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(10, 0); +//! +//! #[sk_reuseport] +//! pub fn select_worker(ctx: SkReuseportContext) -> u32 { +//! // Custom logic to determine worker index +//! let worker_id: u32 = 2; +//! +//! // Select specific socket using helper +//! let ret = unsafe { +//! bpf_sk_select_reuseport( +//! ctx.as_ptr() as *mut _, +//! SOCKET_MAP.as_ptr(), +//! &worker_id as *const _ as *mut _, +//! 0 +//! ) +//! }; +//! +//! // Return SK_DROP on error, SK_PASS on success +//! if ret == 0 { +//! SK_PASS +//! } else { +//! SK_DROP +//! } +//! } +//! ``` +//! +//! # Context Field Access Example +//! +//! Access packet metadata for custom load balancing decisions: +//! +//! ```no_run +//! use aya_ebpf::{ +//! macros::{sk_reuseport, map}, +//! programs::{SkReuseportContext, SK_PASS, SK_DROP}, +//! helpers::bpf_sk_select_reuseport, +//! maps::ReusePortSockArray, +//! EbpfContext, +//! }; +//! +//! #[map(name = "socket_map")] +//! static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(4, 0); +//! +//! #[sk_reuseport] +//! pub fn hash_based_selection(ctx: SkReuseportContext) -> u32 { +//! // Use packet hash for consistent load balancing +//! let socket_idx = ctx.hash() % 4; +//! +//! // Only handle TCP traffic +//! if ctx.ip_protocol() == 6 { // IPPROTO_TCP +//! let ret = unsafe { +//! bpf_sk_select_reuseport( +//! ctx.as_ptr() as *mut _, +//! SOCKET_MAP.as_ptr(), +//! &socket_idx as *const _ as *mut _, +//! 0 +//! ) +//! }; +//! +//! if ret == 0 { +//! SK_PASS +//! } else { +//! SK_DROP +//! } +//! } else { +//! // Let kernel handle non-TCP traffic +//! SK_PASS +//! } +//! } +//! ``` + +use core::ffi::c_void; + +use crate::{EbpfContext, bindings::sk_reuseport_md}; + +/// SK_PASS: Allow packet through and let kernel handle socket selection +pub const SK_PASS: u32 = 1; + +/// SK_DROP: Drop the packet +pub const SK_DROP: u32 = 0; + +pub struct SkReuseportContext { + pub md: *mut sk_reuseport_md, +} + +impl SkReuseportContext { + pub fn new(md: *mut sk_reuseport_md) -> SkReuseportContext { + SkReuseportContext { md } + } + + /// Returns the start of the directly accessible data. + pub fn data(&self) -> usize { + unsafe { (*self.md).__bindgen_anon_1.data as usize } + } + + /// Returns the end of the directly accessible data. + pub fn data_end(&self) -> usize { + unsafe { (*self.md).__bindgen_anon_2.data_end as usize } + } + + /// Returns the total packet length. + #[expect(clippy::len_without_is_empty)] + pub fn len(&self) -> u32 { + unsafe { (*self.md).len } + } + + /// Returns the ethernet protocol from the packet (network byte order). + pub fn eth_protocol(&self) -> u32 { + unsafe { (*self.md).eth_protocol } + } + + /// Returns the IP protocol (e.g., IPPROTO_TCP, IPPROTO_UDP). + pub fn ip_protocol(&self) -> u32 { + unsafe { (*self.md).ip_protocol } + } + + /// Returns whether the socket is bound to an INANY address. + pub fn bind_inany(&self) -> u32 { + unsafe { (*self.md).bind_inany } + } + + /// Returns the hash of the packet's 4-tuple for load balancing. + pub fn hash(&self) -> u32 { + unsafe { (*self.md).hash } + } +} + +impl EbpfContext for SkReuseportContext { + fn as_ptr(&self) -> *mut c_void { + self.md as *mut _ + } +} diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index b0573882..e976d9ff 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -68,6 +68,10 @@ path = "src/ring_buf.rs" name = "simple_prog" path = "src/simple_prog.rs" +[[bin]] +name = "sk_reuseport" +path = "src/sk_reuseport.rs" + [[bin]] name = "strncmp" path = "src/strncmp.rs" diff --git a/test/integration-ebpf/src/sk_reuseport.rs b/test/integration-ebpf/src/sk_reuseport.rs new file mode 100644 index 00000000..fc43f944 --- /dev/null +++ b/test/integration-ebpf/src/sk_reuseport.rs @@ -0,0 +1,65 @@ +#![no_std] +#![no_main] + +use aya_ebpf::{ + macros::{map, sk_reuseport}, + maps::ReusePortSockArray, + programs::{SK_PASS, SK_DROP, SkReuseportContext}, + helpers::bpf_sk_select_reuseport, + EbpfContext as _, +}; +#[cfg(not(test))] +extern crate ebpf_panic; + +#[map(name = "socket_map")] +static SOCKET_MAP: ReusePortSockArray = ReusePortSockArray::with_max_entries(10, 0); + +#[sk_reuseport] +pub fn select_socket(_ctx: SkReuseportContext) -> u32 { + // Return SK_PASS to allow packet and let kernel handle socket selection + // Return SK_DROP would drop the packet + SK_PASS +} + +#[sk_reuseport] +pub fn test_context_access(ctx: SkReuseportContext) -> u32 { + // Test accessing context fields + let _len = ctx.len(); + let _hash = ctx.hash(); + let _ip_protocol = ctx.ip_protocol(); + let _eth_protocol = ctx.eth_protocol(); + let _bind_inany = ctx.bind_inany(); + let _data = ctx.data(); + let _data_end = ctx.data_end(); + + // Always pass for testing + SK_PASS +} + +#[sk_reuseport] +pub fn test_helper_usage(ctx: SkReuseportContext) -> u32 { + // Use hash-based socket selection with helper + let socket_idx = ctx.hash() % 4; + + // Only handle TCP traffic (protocol 6) + if ctx.ip_protocol() == 6 { + let ret = unsafe { + bpf_sk_select_reuseport( + ctx.as_ptr() as *mut _, + SOCKET_MAP.as_ptr(), + &socket_idx as *const _ as *mut _, + 0 + ) + }; + + // Return result based on helper success + if ret == 0 { + SK_PASS + } else { + SK_DROP + } + } else { + // Let kernel handle non-TCP traffic + SK_PASS + } +} diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index f7cc30c6..ee2195bf 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -49,12 +49,13 @@ bpf_file!( RELOCATIONS => "relocations", RING_BUF => "ring_buf", SIMPLE_PROG => "simple_prog", + SK_REUSEPORT => "sk_reuseport", STRNCMP => "strncmp", TCX => "tcx", TEST => "test", TWO_PROGS => "two_progs", - XDP_SEC => "xdp_sec", UPROBE_COOKIE => "uprobe_cookie", + XDP_SEC => "xdp_sec", ); #[cfg(test)] diff --git a/test/integration-test/src/tests.rs b/test/integration-test/src/tests.rs index 43894366..d12271a9 100644 --- a/test/integration-test/src/tests.rs +++ b/test/integration-test/src/tests.rs @@ -10,6 +10,7 @@ mod raw_tracepoint; mod rbpf; mod relocations; mod ring_buf; +mod sk_reuseport; mod smoke; mod strncmp; mod tcx; diff --git a/test/integration-test/src/tests/sk_reuseport.rs b/test/integration-test/src/tests/sk_reuseport.rs new file mode 100644 index 00000000..f9f34f60 --- /dev/null +++ b/test/integration-test/src/tests/sk_reuseport.rs @@ -0,0 +1,307 @@ +use std::{net::TcpListener, os::fd::{AsFd as _, AsRawFd as _, FromRawFd as _}, time::Duration}; + +use aya::{ + Ebpf, + maps::ReusePortSockArray, + programs::{SkReuseport, loaded_programs}, +}; +use libc::{setsockopt, SOL_SOCKET, SO_REUSEPORT, socket, bind, listen, AF_INET, SOCK_STREAM, sockaddr_in}; +use tokio::time::sleep; + +use crate::utils::NetNsGuard; + +#[tokio::test] +async fn sk_reuseport_load() { + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + let prog: &mut SkReuseport = ebpf + .program_mut("select_socket") + .unwrap() + .try_into() + .unwrap(); + + // Test that the program loads successfully + prog.load().unwrap(); + + // Test that it's properly loaded + let info = prog.info().unwrap(); + assert_eq!( + info.program_type().unwrap(), + aya::programs::ProgramType::SkReuseport + ); +} + +#[tokio::test] +async fn sk_reuseport_loaded_programs_iteration() { + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + let prog: &mut SkReuseport = ebpf + .program_mut("select_socket") + .unwrap() + .try_into() + .unwrap(); + + let programs = loaded_programs().collect::, _>>().unwrap(); + assert!(!programs.iter().any(|p| matches!( + p.program_type().unwrap(), + aya::programs::ProgramType::SkReuseport + ))); + + prog.load().unwrap(); + sleep(Duration::from_millis(500)).await; + + let programs = loaded_programs().collect::, _>>().unwrap(); + assert!(programs.iter().any(|p| matches!( + p.program_type().unwrap(), + aya::programs::ProgramType::SkReuseport + ))); +} + +#[tokio::test] +async fn sk_reuseport_map_operations() { + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + + // Test that we can access the ReusePortSockArray map + // Note: This test checks map creation and basic operations + // In a real scenario, you would need actual SO_REUSEPORT sockets + let map: ReusePortSockArray<_> = ebpf.take_map("socket_map") + .expect("socket_map should exist") + .try_into() + .expect("map should convert to ReusePortSockArray"); + + // Test that the map has the correct properties + let fd = map.fd(); + assert!(!fd.as_fd().as_raw_fd() < 0, "Map fd should be valid"); + + // Test indices iterator (array maps have all indices pre-allocated) + let indices_count = map.indices().count(); + assert_eq!(indices_count, 10, "Array map should have all 10 indices available"); +} + +#[test_log::test] +fn sk_reuseport_attach_detach() { + let _netns = NetNsGuard::new(); + + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + let prog: &mut SkReuseport = ebpf + .program_mut("select_socket") + .unwrap() + .try_into() + .unwrap(); + + prog.load().unwrap(); + + // Create a socket with SO_REUSEPORT enabled - a simpler approach + // First create a normal listener to get a port, then create SO_REUSEPORT sockets + let temp_listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let local_addr = temp_listener.local_addr().unwrap(); + drop(temp_listener); // Release the port + + // Create socket with SO_REUSEPORT before binding + let socket_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) }; + assert!(socket_fd >= 0, "Failed to create socket"); + + let enable = 1i32; + unsafe { + let ret = setsockopt( + socket_fd, + SOL_SOCKET, + SO_REUSEPORT, + &enable as *const _ as *const _, + std::mem::size_of_val(&enable) as u32, + ); + assert_eq!(ret, 0, "Failed to set SO_REUSEPORT"); + } + + // Manually bind and listen + let addr = sockaddr_in { + sin_family: AF_INET as u16, + sin_port: local_addr.port().to_be(), + sin_addr: libc::in_addr { + s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be(), + }, + sin_zero: [0; 8], + }; + + let bind_result = unsafe { + bind( + socket_fd, + &addr as *const _ as *const libc::sockaddr, + std::mem::size_of::() as u32, + ) + }; + assert_eq!(bind_result, 0, "Failed to bind socket"); + + let listen_result = unsafe { listen(socket_fd, 1024) }; + assert_eq!(listen_result, 0, "Failed to listen on socket"); + + // Convert to TcpListener + let listener = unsafe { TcpListener::from_raw_fd(socket_fd) }; + + // Test program attachment + let link_id = prog.attach(&listener).unwrap(); + + // Test program detachment + prog.detach(link_id).unwrap(); +} + +#[test_log::test] +fn sk_reuseport_socket_array_operations() { + let _netns = NetNsGuard::new(); + + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + + // Get the socket array map + let mut socket_array: ReusePortSockArray<_> = ebpf + .take_map("socket_map") + .unwrap() + .try_into() + .unwrap(); + + // Create multiple SO_REUSEPORT sockets that bind to the same port + let temp_listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let local_addr = temp_listener.local_addr().unwrap(); + drop(temp_listener); // Release the port + + // Create first socket with SO_REUSEPORT + let socket1_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) }; + assert!(socket1_fd >= 0, "Failed to create socket1"); + + // Create second socket with SO_REUSEPORT + let socket2_fd = unsafe { socket(AF_INET, SOCK_STREAM, 0) }; + assert!(socket2_fd >= 0, "Failed to create socket2"); + + let enable = 1i32; + // Set SO_REUSEPORT on both sockets before binding + unsafe { + let ret1 = setsockopt( + socket1_fd, + SOL_SOCKET, + SO_REUSEPORT, + &enable as *const _ as *const _, + std::mem::size_of_val(&enable) as u32, + ); + let ret2 = setsockopt( + socket2_fd, + SOL_SOCKET, + SO_REUSEPORT, + &enable as *const _ as *const _, + std::mem::size_of_val(&enable) as u32, + ); + assert_eq!(ret1, 0, "Failed to set SO_REUSEPORT on socket1"); + assert_eq!(ret2, 0, "Failed to set SO_REUSEPORT on socket2"); + } + + // Bind both sockets to the same address to create SO_REUSEPORT group + let addr = sockaddr_in { + sin_family: AF_INET as u16, + sin_port: local_addr.port().to_be(), + sin_addr: libc::in_addr { + s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be(), + }, + sin_zero: [0; 8], + }; + + unsafe { + let bind_result1 = bind( + socket1_fd, + &addr as *const _ as *const libc::sockaddr, + std::mem::size_of::() as u32, + ); + let bind_result2 = bind( + socket2_fd, + &addr as *const _ as *const libc::sockaddr, + std::mem::size_of::() as u32, + ); + assert_eq!(bind_result1, 0, "Failed to bind socket1"); + assert_eq!(bind_result2, 0, "Failed to bind socket2"); + + let listen_result1 = listen(socket1_fd, 1024); + let listen_result2 = listen(socket2_fd, 1024); + assert_eq!(listen_result1, 0, "Failed to listen on socket1"); + assert_eq!(listen_result2, 0, "Failed to listen on socket2"); + } + + // Convert to TcpListeners + let listener1 = unsafe { TcpListener::from_raw_fd(socket1_fd) }; + let listener2 = unsafe { TcpListener::from_raw_fd(socket2_fd) }; + + // Test storing sockets in the array + socket_array.set(0, &listener1, 0).unwrap(); + socket_array.set(1, &listener2, 0).unwrap(); + + // Test removing sockets from the array + socket_array.clear_index(&0).unwrap(); + socket_array.clear_index(&1).unwrap(); +} + +#[test_log::test] +fn sk_reuseport_error_conditions() { + let _netns = NetNsGuard::new(); + + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + + // Get the socket array map + let mut socket_array: ReusePortSockArray<_> = ebpf + .take_map("socket_map") + .unwrap() + .try_into() + .unwrap(); + + let socket = TcpListener::bind("127.0.0.1:0").unwrap(); + + // Test bounds checking - should fail for out-of-bounds index + let result = socket_array.set(100, &socket, 0); + assert!(result.is_err(), "Setting socket at out-of-bounds index should fail"); + + let result = socket_array.clear_index(&100); + assert!(result.is_err(), "Clearing out-of-bounds index should fail"); +} + +#[tokio::test] +async fn sk_reuseport_context_access() { + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + let prog: &mut SkReuseport = ebpf + .program_mut("test_context_access") + .unwrap() + .try_into() + .unwrap(); + + // Test that the program loads successfully with context field access + prog.load().unwrap(); + + // Test that it's properly loaded + let info = prog.info().unwrap(); + assert_eq!( + info.program_type().unwrap(), + aya::programs::ProgramType::SkReuseport + ); +} + +#[tokio::test] +async fn sk_reuseport_helper_usage() { + let mut ebpf = Ebpf::load(crate::SK_REUSEPORT).unwrap(); + let prog: &mut SkReuseport = ebpf + .program_mut("test_helper_usage") + .unwrap() + .try_into() + .unwrap(); + + // Test that the program loads successfully with helper usage + prog.load().unwrap(); + + // Test that it's properly loaded + let info = prog.info().unwrap(); + assert_eq!( + info.program_type().unwrap(), + aya::programs::ProgramType::SkReuseport + ); + + // Test that we can access the socket map used by the helper + let map: ReusePortSockArray<_> = ebpf.take_map("socket_map") + .expect("socket_map should exist") + .try_into() + .expect("map should convert to ReusePortSockArray"); + + // Verify map properties + let indices_count = map.indices().count(); + assert_eq!(indices_count, 10, "Array map should have all 10 indices available"); +}