mirror of https://github.com/aya-rs/aya
aya: adds support for AF_XDP
Contains user-space functionality for doing AF_XDP socket interactions. Much is copy-paste from xdpilone, but with higher-level abstractions put on top. It should be usable at a high level without requiring unsafe code on the library user's side. The more low-level control is still available, for power users who need that level of control (and accept the safety implications). Fixes: #507reviewable/pr1096/r1
parent
119049f2a2
commit
b3aad24952
@ -0,0 +1,104 @@
|
||||
//! Rust idiomatic bindings for the AF_XDP socket interface.
|
||||
//!
|
||||
//! This module helps with creating suitable socket(s) from a memory allocation of chunks, sockets
|
||||
//! for access to all four rings, binding to a specific `(ifname, queue_id)`, and for creating the
|
||||
//! memory mapping to interact with all these queues directly.
|
||||
//!
|
||||
//! Please see https://docs.kernel.org/networking/af_xdp.html for a detailed explanation of AF_XDP.
|
||||
//!
|
||||
//! The entrypoint to the module is an instance of [`XdpSocketBuilder`], or for power users
|
||||
//! the more low-level [`crate::Umem`].
|
||||
//!
|
||||
//! This module builds upon the `xdpilone` crate (https://crates.io/crates/xdpilone), with
|
||||
//! some (optional) abstractions on top.
|
||||
|
||||
use std::{borrow::Cow, ffi::NulError, io::Error};
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
mod xsk;
|
||||
|
||||
pub use xsk::{
|
||||
BufIdx, DeviceQueue, IfInfo, ReadComplete, ReadRx, RingCons, RingProd, RingRx, RingTx, Socket,
|
||||
SocketConfig, Umem, UmemChunk, UmemConfig, User, WriteFill, WriteTx, XdpSocketBuilder,
|
||||
};
|
||||
|
||||
/// Errors occuring from working with AF_XDP
|
||||
#[derive(Error)]
|
||||
pub enum XskError {
|
||||
/// Errno returned by the OS
|
||||
#[error("errno {errno}")]
|
||||
Errno {
|
||||
/// The errno
|
||||
errno: i32,
|
||||
},
|
||||
/// Error creating a [`CString`]
|
||||
#[error("nul error")]
|
||||
NulError(#[from] NulError),
|
||||
|
||||
/// Invalid option in XskSocketBuilder
|
||||
#[error("invalid option: {0}")]
|
||||
SocketOptionError(String),
|
||||
|
||||
/// Memory related errors
|
||||
#[error("memory error")]
|
||||
MemoryError(#[from] AllocationError),
|
||||
}
|
||||
|
||||
/// Errors related to allocation of UMEM memory
|
||||
#[derive(Error, Debug)]
|
||||
pub enum AllocationError {
|
||||
/// The memory is not page aligned
|
||||
#[error("memory region not page aligned")]
|
||||
UmemUnaligned,
|
||||
/// The memory region is smaller than what's required by [`UmemConfig`]
|
||||
#[error("memory region too small")]
|
||||
UmemSize,
|
||||
}
|
||||
|
||||
impl<'a> XskError {
|
||||
/// Create an error from the latest [`errno`].
|
||||
pub fn last_os_error() -> Self {
|
||||
Self::Errno {
|
||||
errno: Error::last_os_error().raw_os_error().unwrap_or(-1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the string that describes the error code in `errno`
|
||||
/// Returns [`None`] if the error type is any other than [`XskError::Errno`]
|
||||
pub fn get_strerror(&self) -> Option<Cow<'a, str>> {
|
||||
if let Self::Errno { errno } = self {
|
||||
unsafe {
|
||||
Some(Cow::Owned(
|
||||
std::ffi::CStr::from_ptr(libc::strerror(*errno))
|
||||
.to_string_lossy()
|
||||
.into_owned(),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for XskError {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Errno { errno } => {
|
||||
let description = self
|
||||
.get_strerror()
|
||||
.unwrap_or_else(|| Cow::Owned("Unknown error".to_string()));
|
||||
write!(f, "Errno({}: {})", errno, description)
|
||||
}
|
||||
Self::NulError(e) => {
|
||||
write!(f, "NulError {}", e)
|
||||
}
|
||||
Self::SocketOptionError(e) => {
|
||||
write!(f, "SocketOptionError {}", e)
|
||||
}
|
||||
Self::MemoryError(e) => {
|
||||
write!(f, "MemoryError {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,573 @@
|
||||
//! XSK user-space ring implementation.
|
||||
//!
|
||||
//! Where it makes sense, some structs are bindings to a C header.
|
||||
|
||||
mod iface;
|
||||
mod ring;
|
||||
mod socket;
|
||||
mod umem;
|
||||
mod user;
|
||||
|
||||
use std::{
|
||||
num::NonZeroU32,
|
||||
ptr::NonNull,
|
||||
sync::{atomic::AtomicU32, Arc},
|
||||
};
|
||||
|
||||
use aya_obj::generated::{xdp_mmap_offsets, xdp_ring_offset};
|
||||
use libc::SOL_XDP;
|
||||
|
||||
pub use self::user::{ReadComplete, ReadRx, WriteFill, WriteTx};
|
||||
use super::XskError;
|
||||
use crate::af_xdp::AllocationError;
|
||||
|
||||
/// Internal structure shared for all rings.
|
||||
///
|
||||
/// TODO: copied from <xdp.h>, does everything make sense in Rust?
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
struct XskRing {
|
||||
/// _owned_ version of the producer head, may lag.
|
||||
cached_producer: u32,
|
||||
/// _owned_ version of the consumer head, may lag.
|
||||
cached_consumer: u32,
|
||||
/// Bit mask to quickly validate/force entry IDs.
|
||||
mask: u32,
|
||||
/// Number of entries (= mask + 1).
|
||||
size: u32,
|
||||
/// The mmaped-producer base.
|
||||
///
|
||||
/// Note: Using lifetime static here, but we point into an `mmap` area and it is important that
|
||||
/// we do not outlive the binding. The constructor promises this.
|
||||
producer: &'static AtomicU32,
|
||||
/// The mmaped-consumer base.
|
||||
consumer: &'static AtomicU32,
|
||||
/// The mmaped-consumer ring control base.
|
||||
ring: NonNull<core::ffi::c_void>,
|
||||
/// The mmaped-consumer flags base.
|
||||
flags: NonNull<u32>,
|
||||
}
|
||||
|
||||
/// Stuct for configuring the UMEM
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UmemConfig {
|
||||
/// Number of entries in the fill queue.
|
||||
pub fill_size: u32,
|
||||
/// Number of entries in the completion queue.
|
||||
pub complete_size: u32,
|
||||
/// Size of data chunks in each of the ring queues.
|
||||
pub frame_size: u32,
|
||||
/// Reserved area at the start of the kernel area.
|
||||
pub headroom: u32,
|
||||
/// Flags to set with the creation calls.
|
||||
pub flags: u32,
|
||||
}
|
||||
|
||||
/// Wrapper around a socket file descriptor
|
||||
pub(crate) struct SocketFd(libc::c_int);
|
||||
|
||||
/// Config for an XSK socket
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct SocketConfig {
|
||||
/// The number of receive descriptors in the ring.
|
||||
pub rx_size: Option<NonZeroU32>,
|
||||
/// The number of transmit descriptors in the ring.
|
||||
pub tx_size: Option<NonZeroU32>,
|
||||
/// Additional flags to pass to the `bind` call as part of `sockaddr_xdp`.
|
||||
pub bind_flags: u16,
|
||||
}
|
||||
|
||||
/// Prior version of XdpMmapOffsets (<= Linux 5.3).
|
||||
#[repr(C)]
|
||||
#[derive(Default, Debug, Copy, Clone)]
|
||||
pub(crate) struct XdpRingOffsetsV1 {
|
||||
/// the relative address of the producer.
|
||||
pub producer: u64,
|
||||
/// the relative address of the consumer.
|
||||
pub consumer: u64,
|
||||
/// the relative address of the descriptor.
|
||||
pub desc: u64,
|
||||
}
|
||||
|
||||
/// Prior version of XdpMmapOffsets (<= Linux 5.3).
|
||||
#[repr(C)]
|
||||
#[derive(Default, Debug, Copy, Clone)]
|
||||
pub(crate) struct XdpMmapOffsetsV1 {
|
||||
/// Offsets for the receive ring (kernel produced).
|
||||
pub rx: XdpRingOffsetsV1,
|
||||
/// Offsets for the transmit ring (user produced).
|
||||
pub tx: XdpRingOffsetsV1,
|
||||
/// Offsets for the fill ring (user produced).
|
||||
pub fr: XdpRingOffsetsV1,
|
||||
/// Offsets for the completion ring (kernel produced).
|
||||
pub cr: XdpRingOffsetsV1,
|
||||
}
|
||||
|
||||
/// Represents a single frame extracted from the RX ring.
|
||||
#[allow(dead_code)]
|
||||
pub struct Frame<'a> {
|
||||
/// A slice of the frame's data.
|
||||
pub buffer: &'a [u8],
|
||||
/// The index of this frame in the ring.
|
||||
idx: BufIdx,
|
||||
/// A reference to the RX ring for releasing this frame later.
|
||||
ring: *mut RingRx,
|
||||
}
|
||||
|
||||
impl Frame<'_> {
|
||||
/// Release this frame back to the kernel.
|
||||
pub fn release(self) {
|
||||
unsafe {
|
||||
(*self.ring).ring.release(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SocketMmapOffsets {
|
||||
inner: xdp_mmap_offsets,
|
||||
}
|
||||
|
||||
impl Default for SocketMmapOffsets {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: xdp_mmap_offsets {
|
||||
rx: xdp_ring_offset {
|
||||
producer: u64::default(),
|
||||
consumer: u64::default(),
|
||||
desc: u64::default(),
|
||||
flags: u64::default(),
|
||||
},
|
||||
tx: xdp_ring_offset {
|
||||
producer: u64::default(),
|
||||
consumer: u64::default(),
|
||||
desc: u64::default(),
|
||||
flags: u64::default(),
|
||||
},
|
||||
fr: xdp_ring_offset {
|
||||
producer: u64::default(),
|
||||
consumer: u64::default(),
|
||||
desc: u64::default(),
|
||||
flags: u64::default(),
|
||||
},
|
||||
cr: xdp_ring_offset {
|
||||
producer: u64::default(),
|
||||
consumer: u64::default(),
|
||||
desc: u64::default(),
|
||||
flags: u64::default(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The basic Umem descriptor.
|
||||
///
|
||||
/// This struct manages the buffers themselves, in a high-level sense, not any of the
|
||||
/// communication or queues.
|
||||
///
|
||||
/// Compared to `libxdp` there's no link to where the queues are stored. Such a struct would necessitate
|
||||
/// thread-safe access to the ring's producer and consumer queues. Instead, a `DeviceQueue` is the
|
||||
/// owner of a device queue's fill/completion ring, but _not_ receive and transmission rings. All
|
||||
/// other sockets with the same interface/queue depend on it but have their own packet rings.
|
||||
///
|
||||
/// You'll note that the fill ring and completion are a shared liveness requirement but under
|
||||
/// unique control. Exactly one process has the responsibility of maintaining them and ensuring the
|
||||
/// rings progress. Failing to do so impacts _all_ sockets sharing this `Umem`. The converse is not
|
||||
/// true. A single socket can starve its transmission buffer or refuse accepting received packets
|
||||
/// but the worst is packet loss in this queue.
|
||||
///
|
||||
/// The controller of the fill/completion pair also controls the associated bpf program which maps
|
||||
/// packets onto the set of sockets (aka. 'XSKMAP').
|
||||
pub struct Umem {
|
||||
/// The allocated shared memory region
|
||||
umem_buffer: NonNull<[u8]>,
|
||||
/// the config for the shared memory region
|
||||
config: UmemConfig,
|
||||
/// The socket
|
||||
fd: Arc<SocketFd>,
|
||||
/// wrapper around a `ControlSet`
|
||||
devices: DeviceControl,
|
||||
}
|
||||
|
||||
/// A raw pointer to a specific chunk in a Umem.
|
||||
///
|
||||
/// It's unsafe to access the frame, by design. All aspects of _managing_ the contents of the
|
||||
/// kernel-shared memory are left to the user of the module.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct UmemChunk {
|
||||
/// The address range associated with the chunk.
|
||||
pub addr: NonNull<[u8]>,
|
||||
/// The absolute offset of this chunk from the start of the Umem.
|
||||
/// This is the basis of the address calculation shared with the kernel.
|
||||
pub offset: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DeviceControl {
|
||||
inner: Arc<dyn ControlSet>,
|
||||
}
|
||||
|
||||
/// A synchronized set for tracking which `IfCtx` are taken.
|
||||
trait ControlSet: Send + Sync + 'static {
|
||||
fn insert(&self, _: IfCtx) -> bool;
|
||||
#[allow(dead_code)]
|
||||
fn contains(&self, _: &IfCtx) -> bool;
|
||||
fn remove(&self, _: &IfCtx);
|
||||
}
|
||||
|
||||
/// One prepared socket for a receive/transmit pair.
|
||||
///
|
||||
/// Note: it is not yet _bound_ to a specific `AF_XDP` address (device queue).
|
||||
pub struct Socket {
|
||||
/// Information about the socket
|
||||
info: Arc<IfInfo>,
|
||||
/// Socket file descriptor
|
||||
fd: Arc<SocketFd>,
|
||||
}
|
||||
|
||||
/// One device queue associated with an XDP socket.
|
||||
///
|
||||
/// A socket is more specifically a set of receive and transmit queues for packets (mapping to some
|
||||
/// underlying hardware mapping those bytes with a network). The fill and completion queue can, in
|
||||
/// theory, be shared with other sockets of the same `Umem`.
|
||||
pub struct DeviceQueue {
|
||||
/// Fill and completion queues.
|
||||
fcq: DeviceRings,
|
||||
/// This is also a socket.
|
||||
socket: Socket,
|
||||
/// Reference to de-register.
|
||||
devices: DeviceControl,
|
||||
}
|
||||
|
||||
/// An owner of receive/transmit queues.
|
||||
///
|
||||
/// This represents a configured version of the raw `Socket`. It allows you to map the required
|
||||
/// rings and _then_ [`Umem::bind`] the socket, enabling the operations of the queues with the
|
||||
/// interface.
|
||||
pub struct User {
|
||||
/// A clone of the socket it was created from.
|
||||
pub socket: Socket,
|
||||
/// The configuration with which it was created.
|
||||
config: Arc<SocketConfig>,
|
||||
/// A cached version of the map describing receive/tranmit queues.
|
||||
map: SocketMmapOffsets,
|
||||
}
|
||||
|
||||
/// A receiver queue.
|
||||
///
|
||||
/// This also maintains the mmap of the associated queue.
|
||||
// Implemented in <xsk/user.rs>
|
||||
pub struct RingRx {
|
||||
ring: RingCons,
|
||||
fd: Arc<SocketFd>,
|
||||
}
|
||||
|
||||
/// A transmitter queue.
|
||||
///
|
||||
/// This also maintains the mmap of the associated queue.
|
||||
// Implemented in <xsk/user.rs>
|
||||
pub struct RingTx {
|
||||
ring: RingProd,
|
||||
fd: Arc<SocketFd>,
|
||||
}
|
||||
|
||||
/// A complete (cached) information about a socket.
|
||||
///
|
||||
/// Please allocate this, the struct is quite large. For instance, put it into an `Arc` as soon as
|
||||
/// it is no longer mutable, or initialize it in-place with [`Arc::get_mut`].
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct IfInfo {
|
||||
ctx: IfCtx,
|
||||
ifname: [libc::c_char; libc::IFNAMSIZ],
|
||||
}
|
||||
|
||||
/// Reduced version of `IfCtx`, only retaining numeric IDs for the kernel.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub(crate) struct IfCtx {
|
||||
ifindex: libc::c_uint,
|
||||
queue_id: u32,
|
||||
/// The namespace cookie, associated with a *socket*.
|
||||
/// This field is filled by some surrounding struct containing the info.
|
||||
netnscookie: u64,
|
||||
}
|
||||
|
||||
pub(crate) struct DeviceRings {
|
||||
pub prod: RingProd,
|
||||
pub cons: RingCons,
|
||||
// Proof that we obtained this. Not sure if and where we'd use it.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) map: SocketMmapOffsets,
|
||||
}
|
||||
|
||||
/// An index to an XDP buffer.
|
||||
///
|
||||
/// Usually passed from a call of reserved or available buffers(in [`RingProd`] and
|
||||
/// [`RingCons`] respectively) to one of the access functions. This resolves the raw index to a
|
||||
/// memory address in the ring buffer.
|
||||
///
|
||||
/// This is _not_ a pure offset, a masking is needed to access the raw offset! The kernel requires
|
||||
/// the buffer count to be a power-of-two for this to be efficient. Then, producer and consumer
|
||||
/// heads operate on the 32-bit number range, _silently_ mapping to the same range of indices.
|
||||
/// (Similar to TCP segments, actually). Well-behaving sides will maintain the order of the two
|
||||
/// numbers in this wrapping space, which stays perfectly well-defined as long as less than `2**31`
|
||||
/// buffer are identified in total.
|
||||
///
|
||||
/// In other words, you need a configured ring to determine an exact offset or compare two indices.
|
||||
///
|
||||
/// This type does _not_ implement comparison traits or hashing! Nevertheless, there's nothing
|
||||
/// unsafe about creating or observing this detail, so feel free to construct your own or use the
|
||||
/// transparent layout to (unsafely) treat the type as a `u32` instead.
|
||||
#[repr(transparent)]
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct BufIdx(pub u32);
|
||||
|
||||
/// A producer ring.
|
||||
///
|
||||
/// Here, user space maintains the write head and the kernel the read tail.
|
||||
#[derive(Debug)]
|
||||
pub struct RingProd {
|
||||
inner: XskRing,
|
||||
mmap_addr: NonNull<[u8]>,
|
||||
}
|
||||
|
||||
/// A consumer ring.
|
||||
///
|
||||
/// Here, kernel maintains the write head and user space the read tail.
|
||||
#[derive(Debug)]
|
||||
pub struct RingCons {
|
||||
inner: XskRing,
|
||||
mmap_addr: NonNull<[u8]>,
|
||||
}
|
||||
|
||||
impl Default for UmemConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
fill_size: 1 << 11,
|
||||
complete_size: 1 << 11,
|
||||
frame_size: 1 << 12,
|
||||
headroom: 0,
|
||||
flags: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SocketFd {
|
||||
fn drop(&mut self) {
|
||||
let _ = unsafe { libc::close(self.0) };
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: pending stabilization, use pointer::len directly.
|
||||
// <https://doc.rust-lang.org/stable/std/primitive.pointer.html#method.len>
|
||||
//
|
||||
// FIXME: In 1.79 this was stabilized. Bump MSRV fine?
|
||||
fn ptr_len(ptr: *mut [u8]) -> usize {
|
||||
unsafe { (*(ptr as *mut [()])).len() }
|
||||
}
|
||||
|
||||
impl Socket {
|
||||
/// Get the raw file descriptor number underlying this socket.
|
||||
pub fn as_raw_fd(&self) -> i32 {
|
||||
self.fd.0
|
||||
}
|
||||
}
|
||||
|
||||
impl User {
|
||||
/// Get the raw file descriptor number underlying this socket.
|
||||
pub fn as_raw_fd(&self) -> i32 {
|
||||
self.socket.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder struct for setting up a [`Umem`] shared with the kernel,
|
||||
/// and a [`User`] to enable userspace operations on the rings and socket.
|
||||
///
|
||||
/// /// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # let mut bpf = Ebpf::load_file("ebpf_programs.o")?;
|
||||
/// use aya::{Ebpf, programs::{Xdp, XdpFlags}};
|
||||
///
|
||||
/// let program: &mut Xdp = bpf.program_mut("intercept_packets").unwrap().try_into()?;
|
||||
/// let mut socks: XskMap<_> = bpf.take_map("SOCKS").unwrap().try_into().unwrap();
|
||||
/// program.attach("eth0", XdpFlags::default())?;
|
||||
///
|
||||
/// let (umem, user) = XdpSocketBuilder::new()
|
||||
/// .with_iface("eth0") // The interface to attach to
|
||||
/// .with_queue_id(0)
|
||||
/// .with_umem_config(umem_config) // If not provided, a default one is used
|
||||
/// .with_rx_size(NonZeroU32::new(32).unwrap()) // One of rx_size or tx_size must be nonzero
|
||||
/// .build()
|
||||
/// .unwrap();
|
||||
///
|
||||
/// let mut fq_cq = umem.fq_cq(&user.socket).unwrap(); // Fill Queue / Completion Queue
|
||||
///
|
||||
/// let mut rx = user.map_rx().unwrap(); // map the RX ring into memory, get handle
|
||||
///
|
||||
/// umem.bind(&user).unwrap(); // bind the socket to a device
|
||||
///
|
||||
/// socks.set(0, rx.as_raw_fd(), 0).unwrap(); // set the socket at the given index
|
||||
///
|
||||
/// # Ok::<(), aya::EbpfError>(())
|
||||
/// ```
|
||||
#[derive(Default)]
|
||||
pub struct XdpSocketBuilder {
|
||||
/// The interface name
|
||||
iface: Option<String>,
|
||||
queue_id: Option<u32>,
|
||||
/// Size of the RX queue
|
||||
rx_size: Option<NonZeroU32>,
|
||||
/// Size of the TX queue
|
||||
tx_size: Option<NonZeroU32>,
|
||||
umem_config: UmemConfig,
|
||||
bind_flags: u16,
|
||||
user_memory: Option<NonNull<[u8]>>,
|
||||
page_size: usize,
|
||||
}
|
||||
|
||||
impl XdpSocketBuilder {
|
||||
/// Creates a new builder with default configurations.
|
||||
pub fn new() -> Self {
|
||||
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize };
|
||||
|
||||
Self {
|
||||
iface: None,
|
||||
queue_id: None,
|
||||
rx_size: None,
|
||||
tx_size: None,
|
||||
umem_config: UmemConfig::default(),
|
||||
bind_flags: 0, // Default bind flags
|
||||
user_memory: None,
|
||||
page_size,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the network interface name (e.g., "eth0").
|
||||
pub fn with_iface(mut self, iface: &str) -> Self {
|
||||
self.iface = Some(iface.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the queue ID for the socket.
|
||||
pub fn with_queue_id(mut self, queue_id: u32) -> Self {
|
||||
self.queue_id = Some(queue_id);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the RX ring size.
|
||||
pub fn with_rx_size(mut self, rx_size: NonZeroU32) -> Self {
|
||||
self.rx_size = Some(rx_size);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the TX ring size.
|
||||
pub fn with_tx_size(mut self, tx_size: NonZeroU32) -> Self {
|
||||
self.tx_size = Some(tx_size);
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures UMEM settings.
|
||||
pub fn with_umem_config(mut self, config: UmemConfig) -> Self {
|
||||
self.umem_config = config;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets additional bind flags.
|
||||
pub fn with_bind_flags(mut self, flags: u16) -> Self {
|
||||
self.bind_flags = flags;
|
||||
self
|
||||
}
|
||||
|
||||
/// Use user-provided memory for UMEM.
|
||||
///
|
||||
/// # Safety
|
||||
/// The caller must ensure that the provided memory is valid, properly aligned, and large enough
|
||||
/// for the UMEM configuration (e.g., `frame_size * fill_size`).
|
||||
pub unsafe fn with_user_memory(mut self, mem: NonNull<[u8]>) -> Result<Self, XskError> {
|
||||
let addr = mem.as_ptr() as *mut u8 as usize;
|
||||
if addr & (self.page_size - 1) != 0 {
|
||||
return Err(AllocationError::UmemUnaligned.into()); // Memory must be page-aligned
|
||||
}
|
||||
|
||||
if mem.len() < (self.umem_config.frame_size * self.umem_config.fill_size) as usize {
|
||||
return Err(AllocationError::UmemSize.into());
|
||||
}
|
||||
self.user_memory = Some(mem);
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Allocate page-aligned memory for Umem.
|
||||
fn allocate_page_aligned_memory(&self, size: usize) -> Result<NonNull<[u8]>, XskError> {
|
||||
let aligned_mem = unsafe {
|
||||
libc::mmap(
|
||||
std::ptr::null_mut(),
|
||||
size,
|
||||
libc::PROT_READ | libc::PROT_WRITE,
|
||||
libc::MAP_SHARED | libc::MAP_ANONYMOUS | libc::MAP_POPULATE,
|
||||
-1,
|
||||
0,
|
||||
)
|
||||
};
|
||||
|
||||
if aligned_mem == libc::MAP_FAILED {
|
||||
return Err(XskError::last_os_error());
|
||||
}
|
||||
|
||||
Ok(unsafe {
|
||||
NonNull::new_unchecked(core::ptr::slice_from_raw_parts_mut(
|
||||
aligned_mem as *mut u8,
|
||||
size,
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds and returns a configured `Socket` and its associated rings.
|
||||
///
|
||||
/// If a user-allocated memory region is not provided, one will be allocated
|
||||
/// and used as the shared UMEM region.
|
||||
pub fn build(self) -> Result<(Umem, User), XskError> {
|
||||
let iface_name = self.iface.as_ref().ok_or(XskError::Errno {
|
||||
errno: libc::EINVAL,
|
||||
})?;
|
||||
|
||||
// Create IfInfo from interface name
|
||||
let mut iface_info = IfInfo::invalid();
|
||||
iface_info.from_name(iface_name)?;
|
||||
|
||||
// Set queue ID if provided
|
||||
if let Some(queue_id) = self.queue_id {
|
||||
iface_info.set_queue(queue_id);
|
||||
}
|
||||
|
||||
// Check that at least one of rx_size or tx_size is Some
|
||||
if self.rx_size.is_none() && self.tx_size.is_none() {
|
||||
return Err(XskError::SocketOptionError(
|
||||
"both rx_size and tx_size are None".into(),
|
||||
));
|
||||
}
|
||||
|
||||
// Determine memory size based on UMEM configuration
|
||||
let mem_size = (self.umem_config.frame_size * self.umem_config.fill_size) as usize;
|
||||
|
||||
// Use user-provided memory or allocate internally
|
||||
let mem = match self.user_memory {
|
||||
Some(mem) => mem,
|
||||
None => self.allocate_page_aligned_memory(mem_size)?,
|
||||
};
|
||||
// Allocate UMEM using the provided or allocated memory
|
||||
let umem = unsafe { Umem::new(self.umem_config.clone(), mem)? };
|
||||
|
||||
// Create Socket
|
||||
let socket_config = SocketConfig {
|
||||
rx_size: self.rx_size,
|
||||
tx_size: self.tx_size,
|
||||
bind_flags: self.bind_flags,
|
||||
};
|
||||
|
||||
let socket = Socket::with_shared(&iface_info, &umem)?;
|
||||
let rxtx = umem.rx_tx(&socket, &socket_config)?;
|
||||
|
||||
Ok((umem, rxtx))
|
||||
}
|
||||
}
|
@ -0,0 +1,196 @@
|
||||
use std::ffi::CString;
|
||||
|
||||
use aya_obj::generated::{xdp_mmap_offsets, xdp_statistics, XDP_MMAP_OFFSETS, XDP_STATISTICS};
|
||||
|
||||
use super::{IfCtx, IfInfo, SocketFd, SocketMmapOffsets};
|
||||
use crate::af_xdp::{xsk::XdpMmapOffsetsV1, XskError};
|
||||
|
||||
impl IfInfo {
|
||||
/// Create an info referring to no device.
|
||||
///
|
||||
/// This allows allocating an info to overwrite with more specific information.
|
||||
pub fn invalid() -> Self {
|
||||
Self {
|
||||
ctx: IfCtx {
|
||||
ifindex: 0,
|
||||
queue_id: 0,
|
||||
netnscookie: 0,
|
||||
},
|
||||
ifname: [b'\0' as libc::c_char; libc::IFNAMSIZ],
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the information from an interface, by name.
|
||||
///
|
||||
/// Common interface names may be `enp8s0`, `lo`, `wg0`, etc. The interface name-to-index pair
|
||||
/// will be very similar to what would be returned by `ip link show`.
|
||||
pub fn from_name(&mut self, st: &str) -> Result<(), XskError> {
|
||||
let name = CString::new(st)?;
|
||||
let bytes = name.to_bytes_with_nul();
|
||||
|
||||
if bytes.len() > self.ifname.len() {
|
||||
return Err(XskError::Errno {
|
||||
errno: libc::EINVAL,
|
||||
});
|
||||
}
|
||||
|
||||
assert!(bytes.len() <= self.ifname.len());
|
||||
let bytes = unsafe { &*(bytes as *const _ as *const [libc::c_char]) };
|
||||
let index = unsafe { libc::if_nametoindex(name.as_ptr()) };
|
||||
|
||||
if index == 0 {
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
self.ctx.ifindex = index;
|
||||
self.ctx.queue_id = 0;
|
||||
self.ctx.netnscookie = 0;
|
||||
self.ifname[..bytes.len()].copy_from_slice(bytes);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the information from an interface, by its numeric identifier.
|
||||
///
|
||||
/// See [`Self::from_name`].
|
||||
pub fn from_ifindex(&mut self, index: libc::c_uint) -> Result<(), XskError> {
|
||||
let err = unsafe { libc::if_indextoname(index, self.ifname.as_mut_ptr()) };
|
||||
|
||||
if err.is_null() {
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Configure the QueueID.
|
||||
///
|
||||
/// This does _not_ guarantee that this queue is valid, or actually exists. You'll find out
|
||||
/// during the bind call. Most other ways of querying such information could suffer from TOCTOU
|
||||
/// issues in any case.
|
||||
pub fn set_queue(&mut self, queue_id: u32) {
|
||||
self.ctx.queue_id = queue_id;
|
||||
}
|
||||
|
||||
/// Get the `ifindex`, numeric ID of the interface in the kernel, for the identified interface.
|
||||
pub fn ifindex(&self) -> u32 {
|
||||
self.ctx.ifindex
|
||||
}
|
||||
|
||||
/// Get the queue ID previously set with `set_queue`.
|
||||
pub fn queue_id(&self) -> u32 {
|
||||
self.ctx.queue_id
|
||||
}
|
||||
}
|
||||
|
||||
impl SocketMmapOffsets {
|
||||
const OPT_V1: libc::socklen_t = core::mem::size_of::<XdpMmapOffsetsV1>() as libc::socklen_t;
|
||||
const OPT_LATEST: libc::socklen_t = core::mem::size_of::<xdp_mmap_offsets>() as libc::socklen_t;
|
||||
|
||||
/// Query the socket mmap offsets of an XDP socket.
|
||||
pub(crate) fn new(sock: &SocketFd) -> Result<Self, XskError> {
|
||||
let mut this = Self::default();
|
||||
this.set_from_fd(sock)?;
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
/// Overwrite data with the socket mmap offsets of an XDP socket.
|
||||
///
|
||||
/// This operation is atomic: On error, the previous values are retained. On success, the
|
||||
/// attributes have been updated.
|
||||
pub(crate) fn set_from_fd(&mut self, sock: &SocketFd) -> Result<(), XskError> {
|
||||
use crate::af_xdp::xsk::{xdp_ring_offset, XdpRingOffsetsV1};
|
||||
|
||||
// The flags was implicit, based on the consumer.
|
||||
fn fixup_v1(v1: XdpRingOffsetsV1) -> xdp_ring_offset {
|
||||
xdp_ring_offset {
|
||||
producer: v1.producer,
|
||||
consumer: v1.consumer,
|
||||
desc: v1.desc,
|
||||
flags: v1.consumer + core::mem::size_of::<u32>() as u64,
|
||||
}
|
||||
}
|
||||
|
||||
union Offsets {
|
||||
v1: XdpMmapOffsetsV1,
|
||||
latest: xdp_mmap_offsets,
|
||||
init: (),
|
||||
}
|
||||
|
||||
let mut off = Offsets { init: () };
|
||||
let mut optlen: libc::socklen_t = core::mem::size_of_val(&off) as libc::socklen_t;
|
||||
|
||||
let err = unsafe {
|
||||
libc::getsockopt(
|
||||
sock.0,
|
||||
super::SOL_XDP,
|
||||
XDP_MMAP_OFFSETS as i32,
|
||||
(&mut off) as *mut _ as *mut libc::c_void,
|
||||
&mut optlen,
|
||||
)
|
||||
};
|
||||
|
||||
if err != 0 {
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
match optlen {
|
||||
Self::OPT_V1 => {
|
||||
let v1 = unsafe { off.v1 };
|
||||
|
||||
self.inner = xdp_mmap_offsets {
|
||||
rx: fixup_v1(v1.rx),
|
||||
tx: fixup_v1(v1.tx),
|
||||
fr: fixup_v1(v1.fr),
|
||||
cr: fixup_v1(v1.cr),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Self::OPT_LATEST => {
|
||||
self.inner = unsafe { off.latest };
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(XskError::Errno {
|
||||
errno: -libc::EINVAL,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct XdpStatistics(xdp_statistics);
|
||||
|
||||
impl XdpStatistics {
|
||||
pub(crate) fn new(sock: &SocketFd) -> Result<Self, XskError> {
|
||||
let mut this = Self(xdp_statistics {
|
||||
rx_dropped: 0,
|
||||
rx_invalid_descs: 0,
|
||||
tx_invalid_descs: 0,
|
||||
rx_ring_full: 0,
|
||||
rx_fill_ring_empty_descs: 0,
|
||||
tx_ring_empty_descs: 0,
|
||||
});
|
||||
this.set_from_fd(sock)?;
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
pub(crate) fn set_from_fd(&mut self, sock: &SocketFd) -> Result<(), XskError> {
|
||||
let mut optlen: libc::socklen_t = core::mem::size_of_val(self) as libc::socklen_t;
|
||||
let err = unsafe {
|
||||
libc::getsockopt(
|
||||
sock.0,
|
||||
super::SOL_XDP,
|
||||
XDP_STATISTICS as i32,
|
||||
&mut self.0 as *mut _ as *mut libc::c_void,
|
||||
&mut optlen,
|
||||
)
|
||||
};
|
||||
|
||||
if err != 0 {
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::af_xdp::{
|
||||
xsk::{IfInfo, Socket, SocketFd, Umem},
|
||||
XskError,
|
||||
};
|
||||
|
||||
impl Socket {
|
||||
const SO_NETNS_COOKIE: libc::c_int = 71;
|
||||
const INIT_NS: u64 = 1;
|
||||
|
||||
/// Create a new socket for a given interface.
|
||||
pub fn new(interface: &IfInfo) -> Result<Self, XskError> {
|
||||
let fd = Arc::new(SocketFd::new()?);
|
||||
Self::with_xdp_socket(interface, fd)
|
||||
}
|
||||
|
||||
/// Create a socket using the FD of the `umem`.
|
||||
pub fn with_shared(interface: &IfInfo, umem: &Umem) -> Result<Self, XskError> {
|
||||
Self::with_xdp_socket(interface, umem.fd.clone())
|
||||
}
|
||||
|
||||
fn with_xdp_socket(interface: &IfInfo, fd: Arc<SocketFd>) -> Result<Self, XskError> {
|
||||
let mut info = Arc::new(*interface);
|
||||
|
||||
let mut netnscookie: u64 = 0;
|
||||
let mut optlen: libc::socklen_t = core::mem::size_of_val(&netnscookie) as libc::socklen_t;
|
||||
let err = unsafe {
|
||||
libc::getsockopt(
|
||||
fd.0,
|
||||
libc::SOL_SOCKET,
|
||||
Self::SO_NETNS_COOKIE,
|
||||
(&mut netnscookie) as *mut _ as *mut libc::c_void,
|
||||
&mut optlen,
|
||||
)
|
||||
};
|
||||
|
||||
match err {
|
||||
0 => {}
|
||||
libc::ENOPROTOOPT => netnscookie = Self::INIT_NS,
|
||||
_ => return Err(XskError::last_os_error())?,
|
||||
}
|
||||
|
||||
// Won't reallocate in practice.
|
||||
Arc::make_mut(&mut info).ctx.netnscookie = netnscookie;
|
||||
|
||||
Ok(Self { fd, info })
|
||||
}
|
||||
}
|
||||
|
||||
impl SocketFd {
|
||||
pub(crate) fn new() -> Result<Self, XskError> {
|
||||
let fd = unsafe { libc::socket(libc::AF_XDP, libc::SOCK_RAW, 0) };
|
||||
if fd < 0 {
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
Ok(Self(fd))
|
||||
}
|
||||
}
|
@ -0,0 +1,517 @@
|
||||
use core::ptr::NonNull;
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use aya_obj::generated::{
|
||||
sockaddr_xdp, xdp_desc, xdp_umem_reg, XDP_COPY, XDP_RX_RING, XDP_SHARED_UMEM, XDP_TX_RING,
|
||||
XDP_UMEM_COMPLETION_RING, XDP_UMEM_FILL_RING, XDP_UMEM_REG, XDP_USE_NEED_WAKEUP, XDP_ZEROCOPY,
|
||||
};
|
||||
|
||||
use crate::af_xdp::{
|
||||
xsk::{
|
||||
iface::XdpStatistics, ptr_len, BufIdx, DeviceControl, DeviceQueue, DeviceRings, IfCtx,
|
||||
RingCons, RingProd, RingRx, RingTx, Socket, SocketConfig, SocketFd, SocketMmapOffsets,
|
||||
Umem, UmemChunk, UmemConfig, User,
|
||||
},
|
||||
XskError,
|
||||
};
|
||||
|
||||
impl BufIdx {
|
||||
/// Convert a slice of raw numbers to buffer indices, in-place.
|
||||
pub fn from_slice(id: &[u32]) -> &[Self] {
|
||||
unsafe { &*(id as *const [u32] as *const [Self]) }
|
||||
}
|
||||
|
||||
/// Convert a slice of raw numbers to buffer indices, in-place.
|
||||
pub fn from_mut_slice(id: &mut [u32]) -> &mut [Self] {
|
||||
unsafe { &mut *(id as *mut [u32] as *mut [Self]) }
|
||||
}
|
||||
|
||||
/// Convert a slice buffer indices to raw numbers, in-place.
|
||||
pub fn to_slice(this: &[Self]) -> &[u32] {
|
||||
unsafe { &*(this as *const [Self] as *const [u32]) }
|
||||
}
|
||||
|
||||
/// Convert a slice buffer indices to raw numbers, in-place.
|
||||
pub fn to_mut_slice(this: &mut [Self]) -> &mut [u32] {
|
||||
unsafe { &mut *(this as *mut [Self] as *mut [u32]) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Umem {
|
||||
/// Create a new Umem ring.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller passes an area denoting the memory of the ring. It must be valid for the
|
||||
/// indicated buffer size and count. The caller is also responsible for keeping the mapping
|
||||
/// alive.
|
||||
///
|
||||
/// The area must be page aligned and not exceed i64::MAX in length (on future systems where
|
||||
/// you could).
|
||||
pub unsafe fn new(config: UmemConfig, area: NonNull<[u8]>) -> Result<Self, XskError> {
|
||||
fn is_page_aligned(area: NonNull<[u8]>) -> bool {
|
||||
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as usize;
|
||||
// TODO: use `addr()` as we don't need to expose the pointer here. Just the address as
|
||||
// an integer and no provenance-preserving cast intended.
|
||||
(area.as_ptr() as *mut u8 as usize & (page_size - 1)) == 0
|
||||
}
|
||||
|
||||
assert!(config.frame_size > 0, "Invalid frame size");
|
||||
|
||||
assert!(
|
||||
is_page_aligned(area),
|
||||
"UB: Bad mmap area provided, but caller is responsible for its soundness."
|
||||
);
|
||||
|
||||
let area_size = ptr_len(area.as_ptr());
|
||||
|
||||
assert!(
|
||||
u64::try_from(area_size).is_ok(),
|
||||
"Unhandled address space calculation"
|
||||
);
|
||||
|
||||
let devices = DeviceControl {
|
||||
inner: Arc::new(SpinLockedControlSet::default()),
|
||||
};
|
||||
|
||||
// Two steps:
|
||||
// 1. Create a new XDP socket in the kernel.
|
||||
// 2. Configure it with the area and size.
|
||||
// Safety: correct `socket` call.
|
||||
let umem = Self {
|
||||
config,
|
||||
fd: Arc::new(SocketFd::new()?),
|
||||
umem_buffer: area,
|
||||
devices,
|
||||
};
|
||||
|
||||
Self::configure(&umem)?;
|
||||
|
||||
Ok(umem)
|
||||
}
|
||||
|
||||
/// Get the address associated with a buffer, if it is in-bounds.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// No requirements. However, please ensure that _use_ of the pointer is done properly. The
|
||||
/// pointer is guaranteed to be derived from the `area` passed in the constructor. The method
|
||||
/// guarantees that it does not _access_ any of the pointers in this process.
|
||||
pub fn frame(&self, idx: BufIdx) -> Option<UmemChunk> {
|
||||
let pitch: u32 = self.config.frame_size;
|
||||
let idx: u32 = idx.0;
|
||||
let area_size = ptr_len(self.umem_buffer.as_ptr()) as u64;
|
||||
|
||||
// Validate that it fits.
|
||||
let offset = u64::from(pitch) * u64::from(idx);
|
||||
if area_size.checked_sub(u64::from(pitch)) < Some(offset) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Now: area_size is converted, without loss, from an isize that denotes the [u8] length,
|
||||
// valid as guaranteed by the caller of the constructor. We have just checked:
|
||||
//
|
||||
// `[offset..offset+pitch) < area_size`.
|
||||
//
|
||||
// So all of the following is within the bounds of the constructor-guaranteed
|
||||
// address manipulation.
|
||||
let base = unsafe {
|
||||
self.umem_buffer
|
||||
.cast::<u8>()
|
||||
.as_ptr()
|
||||
.offset(offset as isize)
|
||||
};
|
||||
debug_assert!(!base.is_null(), "UB: offsetting area within produced NULL");
|
||||
let slice = core::ptr::slice_from_raw_parts_mut(base, pitch as usize);
|
||||
let addr = unsafe { NonNull::new_unchecked(slice) };
|
||||
Some(UmemChunk { addr, offset })
|
||||
}
|
||||
|
||||
/// Count the number of available data frames.
|
||||
pub fn len_frames(&self) -> u32 {
|
||||
let area_size = ptr_len(self.umem_buffer.as_ptr()) as u64;
|
||||
let count = area_size / u64::from(self.config.frame_size);
|
||||
u32::try_from(count).unwrap_or(u32::MAX)
|
||||
}
|
||||
|
||||
fn configure(this: &Self) -> Result<(), XskError> {
|
||||
let mut mr = xdp_umem_reg {
|
||||
addr: this.umem_buffer.as_ptr() as *mut u8 as u64,
|
||||
len: ptr_len(this.umem_buffer.as_ptr()) as u64,
|
||||
chunk_size: this.config.frame_size,
|
||||
headroom: this.config.headroom,
|
||||
flags: this.config.flags,
|
||||
tx_metadata_len: 0,
|
||||
};
|
||||
|
||||
let optlen = core::mem::size_of_val(&mr) as libc::socklen_t;
|
||||
let err = unsafe {
|
||||
libc::setsockopt(
|
||||
this.fd.0,
|
||||
super::SOL_XDP,
|
||||
XDP_UMEM_REG as i32,
|
||||
(&mut mr) as *mut _ as *mut libc::c_void,
|
||||
optlen,
|
||||
)
|
||||
};
|
||||
|
||||
if err != 0 {
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Configure the fill and completion queue for a interface queue.
|
||||
///
|
||||
/// The caller _should_ only call this once for each interface info. However, it's not entirely
|
||||
/// incorrect to do it multiple times. Just, be careful that the administration becomes extra
|
||||
/// messy. All code is written under the assumption that only one controller/writer for the
|
||||
/// user-space portions of each queue is active at a time. The kernel won't care about your
|
||||
/// broken code and race conditions writing to the same queue concurrently. It's an SPSC.
|
||||
/// Probably only the first call for each interface succeeds.
|
||||
pub fn fq_cq(&self, interface: &Socket) -> Result<DeviceQueue, XskError> {
|
||||
if !self.devices.insert(interface.info.ctx) {
|
||||
// We know this will just yield `-EBUSY` anyways.
|
||||
return Err(XskError::Errno {
|
||||
errno: libc::EINVAL,
|
||||
});
|
||||
}
|
||||
|
||||
struct DropableDevice<'info>(&'info IfCtx, &'info DeviceControl);
|
||||
|
||||
impl Drop for DropableDevice<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.1.remove(self.0);
|
||||
}
|
||||
}
|
||||
|
||||
// Okay, got a device. Let's create the queues for it. On failure, cleanup.
|
||||
let _tmp_device = DropableDevice(&interface.info.ctx, &self.devices);
|
||||
|
||||
let sock = &*interface.fd;
|
||||
Self::configure_cq(sock, &self.config)?;
|
||||
let map = SocketMmapOffsets::new(sock)?;
|
||||
|
||||
// FIXME: should we be configured the `cached_consumer` and `cached_producer` and
|
||||
// potentially other values, here? The setup produces a very rough clone of _just_ the ring
|
||||
// itself and none of the logic beyond.
|
||||
let prod = unsafe { RingProd::fill(sock, &map, self.config.fill_size) }?;
|
||||
let cons = unsafe { RingCons::comp(sock, &map, self.config.complete_size) }?;
|
||||
|
||||
let device = DeviceQueue {
|
||||
fcq: DeviceRings { map, cons, prod },
|
||||
socket: Socket {
|
||||
info: interface.info.clone(),
|
||||
fd: interface.fd.clone(),
|
||||
},
|
||||
devices: self.devices.clone(),
|
||||
};
|
||||
|
||||
core::mem::forget(_tmp_device);
|
||||
Ok(device)
|
||||
}
|
||||
|
||||
/// Configure the device address for a socket.
|
||||
///
|
||||
/// Either `rx_size` or `tx_size` must be non-zero, i.e. the call to bind will fail if none of
|
||||
/// the rings is actually configured.
|
||||
///
|
||||
/// Note: if the underlying socket is shared then this will also bind other objects that share
|
||||
/// the underlying socket file descriptor, this is intended.
|
||||
pub fn rx_tx(&self, interface: &Socket, config: &SocketConfig) -> Result<User, XskError> {
|
||||
let sock = &*interface.fd;
|
||||
Self::configure_rt(sock, config)?;
|
||||
let map = SocketMmapOffsets::new(sock)?;
|
||||
|
||||
Ok(User {
|
||||
socket: Socket {
|
||||
info: interface.info.clone(),
|
||||
fd: interface.fd.clone(),
|
||||
},
|
||||
config: Arc::new(config.clone()),
|
||||
map,
|
||||
})
|
||||
}
|
||||
|
||||
/// Activate a socket with by binding it to a device.
|
||||
///
|
||||
/// This associates the umem region to these queues. This is intended for:
|
||||
///
|
||||
/// - sockets that maintain the fill and completion ring for a device queue, i.e. a `fc_cq` was
|
||||
/// called with the socket and that network interface queue is currently being bound.
|
||||
///
|
||||
/// - queues that the umem socket file descriptor is maintaining as a device queue, i.e. the
|
||||
/// call to `fc_cq` used a socket created with [`Socket::with_shared`] that utilized the
|
||||
/// [`Umem`] instance.
|
||||
///
|
||||
/// Otherwise, when a pure rx/tx socket should be setup use [`DeviceQueue::bind`] with the
|
||||
/// previously bound socket providing its fill/completion queues.
|
||||
///
|
||||
/// The tree of parents should look as follows:
|
||||
///
|
||||
/// ```text
|
||||
/// fd0: umem [+fq/cq for ifq0] [+rx/+tx]
|
||||
/// |- [fd1: socket +rx/tx on ifq0 if fd0 has fq/cq] Umem::bind(fd0, fd1)
|
||||
/// |- [fd2: socket +rx/tx on ifq0 if fd0 has fq/cq …] Umem::bind(fd0, fd2)
|
||||
/// |
|
||||
/// |- fd3: socket +fq/cq for ifq1 [+rx/tx] Umem::bind(fd0, fd3)
|
||||
/// | |- fd4: socket +rx/tx on ifq1 DeviceQueue::bind(fd3, fd4)
|
||||
/// | |- fd5: socket +rx/tx on ifq1 … DeviceQueue::bind(fd3, fd5)
|
||||
/// |
|
||||
/// |-fd6: socket +fq/cq for ifq2 [+rx/tx] Umem::bind(fd0, fd6)
|
||||
/// | |- fd7: socket +rx/tx on ifq1 DeviceQueue::bind(fd6, fd7)
|
||||
/// | |- …
|
||||
/// ```
|
||||
pub fn bind(&self, interface: &User) -> Result<(), XskError> {
|
||||
Self::bind_at(interface, &self.fd)
|
||||
}
|
||||
|
||||
fn bind_at(interface: &User, umem_sock: &SocketFd) -> Result<(), XskError> {
|
||||
let mut sxdp = sockaddr_xdp {
|
||||
sxdp_flags: 0,
|
||||
sxdp_family: libc::AF_XDP as u16,
|
||||
sxdp_shared_umem_fd: 0,
|
||||
sxdp_ifindex: interface.socket.info.ctx.ifindex,
|
||||
sxdp_queue_id: interface.socket.info.ctx.queue_id,
|
||||
};
|
||||
|
||||
// Note: using a separate socket with shared umem requires one dedicated configured cq for
|
||||
// the interface indicated.
|
||||
|
||||
if interface.socket.fd.0 != umem_sock.0 {
|
||||
sxdp.sxdp_flags = interface.config.bind_flags | XDP_SHARED_UMEM as u16;
|
||||
sxdp.sxdp_shared_umem_fd = umem_sock.0 as u32;
|
||||
}
|
||||
|
||||
if unsafe {
|
||||
libc::bind(
|
||||
interface.socket.fd.0,
|
||||
(&sxdp) as *const _ as *const libc::sockaddr,
|
||||
core::mem::size_of_val(&sxdp) as libc::socklen_t,
|
||||
)
|
||||
} != 0
|
||||
{
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn configure_cq(fd: &SocketFd, config: &UmemConfig) -> Result<(), XskError> {
|
||||
if unsafe {
|
||||
libc::setsockopt(
|
||||
fd.0,
|
||||
super::SOL_XDP,
|
||||
XDP_UMEM_COMPLETION_RING as i32,
|
||||
(&config.complete_size) as *const _ as *const libc::c_void,
|
||||
core::mem::size_of_val(&config.complete_size) as libc::socklen_t,
|
||||
)
|
||||
} != 0
|
||||
{
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
if unsafe {
|
||||
libc::setsockopt(
|
||||
fd.0,
|
||||
super::SOL_XDP,
|
||||
XDP_UMEM_FILL_RING as i32,
|
||||
(&config.fill_size) as *const _ as *const libc::c_void,
|
||||
core::mem::size_of_val(&config.fill_size) as libc::socklen_t,
|
||||
)
|
||||
} != 0
|
||||
{
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn configure_rt(fd: &SocketFd, config: &SocketConfig) -> Result<(), XskError> {
|
||||
if let Some(num) = config.rx_size {
|
||||
if unsafe {
|
||||
libc::setsockopt(
|
||||
fd.0,
|
||||
super::SOL_XDP,
|
||||
XDP_RX_RING as i32,
|
||||
(&num) as *const _ as *const libc::c_void,
|
||||
core::mem::size_of_val(&num) as libc::socklen_t,
|
||||
)
|
||||
} != 0
|
||||
{
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(num) = config.tx_size {
|
||||
if unsafe {
|
||||
libc::setsockopt(
|
||||
fd.0,
|
||||
super::SOL_XDP,
|
||||
XDP_TX_RING as i32,
|
||||
(&num) as *const _ as *const libc::c_void,
|
||||
core::mem::size_of_val(&num) as libc::socklen_t,
|
||||
)
|
||||
} != 0
|
||||
{
|
||||
return Err(XskError::last_os_error())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeviceQueue {
|
||||
/// Get the statistics of this XDP socket.
|
||||
pub fn statistics(&self) -> Result<XdpStatistics, XskError> {
|
||||
XdpStatistics::new(&self.socket.fd)
|
||||
}
|
||||
|
||||
/// Bind the socket to a device queue, activate rx/tx queues.
|
||||
pub fn bind(&self, interface: &User) -> Result<(), XskError> {
|
||||
Umem::bind_at(interface, &self.socket.fd)
|
||||
}
|
||||
}
|
||||
|
||||
impl User {
|
||||
/// Get the statistics of this XDP socket.
|
||||
pub fn statistics(&self) -> Result<XdpStatistics, XskError> {
|
||||
XdpStatistics::new(&self.socket.fd)
|
||||
}
|
||||
|
||||
/// Map the RX ring into memory, returning a handle.
|
||||
///
|
||||
/// Fails if you did not pass any size for `rx_size` in the configuration, which should be somewhat obvious.
|
||||
///
|
||||
/// FIXME: we allow mapping the ring more than once. Not a memory safety problem afaik, but a
|
||||
/// correctness problem.
|
||||
pub fn map_rx(&self) -> Result<RingRx, XskError> {
|
||||
let rx_size = self
|
||||
.config
|
||||
.rx_size
|
||||
.ok_or(XskError::Errno {
|
||||
errno: -libc::EINVAL,
|
||||
})?
|
||||
.get();
|
||||
let ring = unsafe { RingCons::rx(&self.socket.fd, &self.map, rx_size) }?;
|
||||
Ok(RingRx {
|
||||
fd: self.socket.fd.clone(),
|
||||
ring,
|
||||
})
|
||||
}
|
||||
|
||||
/// Map the TX ring into memory, returning a handle.
|
||||
///
|
||||
/// Fails if you did not pass any size for `tx_size` in the configuration, which should be somewhat obvious.
|
||||
///
|
||||
/// FIXME: we allow mapping the ring more than once. Not a memory safety problem afaik, but a
|
||||
/// correctness problem.
|
||||
pub fn map_tx(&self) -> Result<RingTx, XskError> {
|
||||
let tx_size = self
|
||||
.config
|
||||
.tx_size
|
||||
.ok_or(XskError::Errno {
|
||||
errno: -libc::EINVAL,
|
||||
})?
|
||||
.get();
|
||||
let ring = unsafe { RingProd::tx(&self.socket.fd, &self.map, tx_size) }?;
|
||||
Ok(RingTx {
|
||||
fd: self.socket.fd.clone(),
|
||||
ring,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SocketConfig {
|
||||
/// Flag-bit for [`Umem::bind`] that the descriptor is shared.
|
||||
///
|
||||
/// Generally, this flag need not be passed directly. Instead, it is set within by the library
|
||||
/// when the same `Umem` is used for multiple interface/queue combinations.
|
||||
pub const XDP_BIND_SHARED_UMEM: u16 = XDP_SHARED_UMEM as u16;
|
||||
/// Force copy-mode.
|
||||
pub const XDP_BIND_COPY: u16 = XDP_COPY as u16;
|
||||
/// Force zero-copy-mode.
|
||||
pub const XDP_BIND_ZEROCOPY: u16 = XDP_ZEROCOPY as u16;
|
||||
/// Enable support for need wakeup.
|
||||
///
|
||||
/// Needs to be set for [`DeviceQueue::needs_wakeup`] and [`RingTx::needs_wakeup`].
|
||||
pub const XDP_BIND_NEED_WAKEUP: u16 = XDP_USE_NEED_WAKEUP as u16;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SpinLockedControlSet {
|
||||
inner: RwLock<BTreeSet<IfCtx>>,
|
||||
}
|
||||
|
||||
impl core::ops::Deref for DeviceControl {
|
||||
type Target = dyn super::ControlSet;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&*self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl super::ControlSet for SpinLockedControlSet {
|
||||
fn insert(&self, ctx: IfCtx) -> bool {
|
||||
let mut lock = self.inner.write().expect("Poisoned RwLock");
|
||||
lock.insert(ctx)
|
||||
}
|
||||
|
||||
fn contains(&self, ctx: &IfCtx) -> bool {
|
||||
let lock = self.inner.read().expect("Poisoned RwLock");
|
||||
lock.contains(ctx)
|
||||
}
|
||||
|
||||
fn remove(&self, ctx: &IfCtx) {
|
||||
let mut lock = self.inner.write().expect("Poisoned RwLock");
|
||||
lock.remove(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
impl UmemChunk {
|
||||
/// Turn this whole chunk into a concrete descriptor for the transmit ring.
|
||||
///
|
||||
/// If you've the address or offset are not as returned by the ring then the result is
|
||||
/// unspecified, but sound. And potentially safe to use, but the kernel may complain.
|
||||
pub fn as_xdp(self) -> xdp_desc {
|
||||
let len = ptr_len(self.addr.as_ptr()) as u32;
|
||||
self.as_xdp_with_len(len)
|
||||
}
|
||||
|
||||
/// Turn into a descriptor with concrete length.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// When debug assertions are enabled, this panics if the length is longer than the address
|
||||
/// range refers to.
|
||||
pub fn as_xdp_with_len(self, len: u32) -> xdp_desc {
|
||||
debug_assert!(
|
||||
len <= ptr_len(self.addr.as_ptr()) as u32,
|
||||
"Invalid XDP descriptor length {} for chunk of size {}",
|
||||
len,
|
||||
ptr_len(self.addr.as_ptr()) as u32,
|
||||
);
|
||||
|
||||
xdp_desc {
|
||||
addr: self.offset,
|
||||
len,
|
||||
options: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract a slice from this UmemChunk
|
||||
/// Safety: Must be a valid UmemChunk, it's up to the caller to guarantee that the
|
||||
/// we check that the index is within bounds
|
||||
#[inline]
|
||||
pub fn slice_at(&self, start: usize, end: usize) -> &[u8] {
|
||||
assert!(
|
||||
self.addr.len() > (start + end),
|
||||
"out of bounds index into UmemChunk"
|
||||
);
|
||||
unsafe { &self.addr.as_ref()[start..start + end] }
|
||||
}
|
||||
}
|
@ -0,0 +1,516 @@
|
||||
use aya_obj::generated::xdp_desc;
|
||||
|
||||
use crate::af_xdp::{
|
||||
xsk::Frame, BufIdx, DeviceQueue, RingCons, RingProd, RingRx, RingTx, Umem, XskError,
|
||||
};
|
||||
|
||||
impl DeviceQueue {
|
||||
/// Prepare some buffers for the fill ring.
|
||||
///
|
||||
/// The argument is an upper bound of buffers. Use the resulting object to pass specific
|
||||
/// buffers to the fill queue and commit the write.
|
||||
pub fn fill(&mut self, max: u32) -> WriteFill<'_> {
|
||||
WriteFill {
|
||||
idx: BufIdxIter::reserve(&mut self.fcq.prod, max),
|
||||
queue: &mut self.fcq.prod,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reap some buffers from the completion ring.
|
||||
///
|
||||
/// Return an iterator over completed buffers.
|
||||
///
|
||||
/// The argument is an upper bound of buffers. Use the resulting object to dequeue specific
|
||||
/// buffers from the completion queue and commit the read.
|
||||
pub fn complete(&mut self, n: u32) -> ReadComplete<'_> {
|
||||
ReadComplete {
|
||||
idx: BufIdxIter::peek(&mut self.fcq.cons, n),
|
||||
queue: &mut self.fcq.cons,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the difference between our the kernel's producer state and our consumer head.
|
||||
pub fn available(&self) -> u32 {
|
||||
self.fcq.cons.count_pending()
|
||||
}
|
||||
|
||||
/// Return the difference between our committed consumer state and the kernel's producer state.
|
||||
pub fn pending(&self) -> u32 {
|
||||
self.fcq.prod.count_pending()
|
||||
}
|
||||
|
||||
/// Get the raw file descriptor of this ring.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Use the file descriptor to attach the ring to an XSK map, for instance, but do not close it
|
||||
/// and avoid modifying it (unless you know what you're doing). It should be treated as a
|
||||
/// `BorrowedFd<'_>`. That said, it's not instant UB but probably delayed UB when the
|
||||
/// `DeviceQueue` modifies a reused file descriptor that it assumes to own.
|
||||
pub fn as_raw_fd(&self) -> libc::c_int {
|
||||
self.socket.fd.0
|
||||
}
|
||||
|
||||
/// Query if the fill queue needs to be woken to proceed receiving.
|
||||
///
|
||||
/// This is only accurate if `Umem::XDP_BIND_NEED_WAKEUP` was set.
|
||||
pub fn needs_wakeup(&self) -> bool {
|
||||
self.fcq.prod.check_flags() & RingTx::XDP_RING_NEED_WAKEUP != 0
|
||||
}
|
||||
|
||||
/// Poll the fill queue descriptor, to wake it up.
|
||||
pub fn wake(&mut self) {
|
||||
// A bit more complex than TX, here we do a full poll on the FD.
|
||||
let mut poll = libc::pollfd {
|
||||
fd: self.socket.fd.0,
|
||||
events: 0,
|
||||
revents: 0,
|
||||
};
|
||||
|
||||
// FIXME: should somehow log this, right?
|
||||
let _err = unsafe { libc::poll(&mut poll as *mut _, 1, 0) };
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DeviceQueue {
|
||||
fn drop(&mut self) {
|
||||
self.devices.remove(&self.socket.info.ctx);
|
||||
}
|
||||
}
|
||||
|
||||
impl RingRx {
|
||||
/// Receive some buffers.
|
||||
///
|
||||
/// Returns an iterator over the descriptors.
|
||||
pub fn receive(&mut self, n: u32) -> ReadRx<'_> {
|
||||
ReadRx {
|
||||
idx: BufIdxIter::peek(&mut self.ring, n),
|
||||
queue: &mut self.ring,
|
||||
}
|
||||
}
|
||||
|
||||
/// Query the number of available descriptors.
|
||||
///
|
||||
/// This operation is advisory only. It performs a __relaxed__ atomic load of the kernel
|
||||
/// producer. An `acquire` barrier, such as performed by [`RingRx::receive`], is always needed
|
||||
/// before reading any of the written descriptors to ensure that these reads do not race with
|
||||
/// the kernel's writes.
|
||||
pub fn available(&self) -> u32 {
|
||||
self.ring.count_pending()
|
||||
}
|
||||
|
||||
/// Get the raw file descriptor of this RX ring.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Use the file descriptor to attach the ring to an XSK map, for instance, but do not close it
|
||||
/// and avoid modifying it (unless you know what you're doing). It should be treated as a
|
||||
/// `BorrowedFd<'_>`. That said, it's not instant UB but probably delayed UB when the `RingRx`
|
||||
/// modifies a reused file descriptor that it assumes to own...
|
||||
pub fn as_raw_fd(&self) -> libc::c_int {
|
||||
self.fd.0
|
||||
}
|
||||
|
||||
/// Safely extract a frame descriptor from the RX ring.
|
||||
///
|
||||
/// Returns a reference to the frame data if available, or `None` if no frames are ready.
|
||||
pub fn extract_frame<'a>(&mut self, umem: &'a Umem) -> Option<Frame<'a>> {
|
||||
// Check if there are any available descriptors
|
||||
if self.ring.count_available(1) < 1 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Peek at the next descriptor
|
||||
let mut idx = BufIdx(0);
|
||||
let count = self.ring.peek(1..=1, &mut idx);
|
||||
if count == 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Get the descriptor safely
|
||||
let desc = unsafe { self.ring.rx_desc(idx).as_ref() };
|
||||
|
||||
// Calculate the frame address and length
|
||||
let addr = desc.addr as usize;
|
||||
let len = desc.len as usize;
|
||||
|
||||
// Ensure that the address and length are within bounds
|
||||
let buffer = unsafe {
|
||||
umem.umem_buffer
|
||||
.as_ref()
|
||||
.get(addr..addr + len)
|
||||
.expect("Invalid frame bounds")
|
||||
};
|
||||
|
||||
// Create a Frame abstraction
|
||||
Some(Frame {
|
||||
buffer,
|
||||
idx,
|
||||
ring: self,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl RingTx {
|
||||
const XDP_RING_NEED_WAKEUP: u32 = 1 << 0;
|
||||
|
||||
/// Transmit some buffers.
|
||||
///
|
||||
/// Returns a proxy that can be fed descriptors.
|
||||
pub fn transmit(&mut self, n: u32) -> WriteTx<'_> {
|
||||
WriteTx {
|
||||
idx: BufIdxIter::reserve(&mut self.ring, n),
|
||||
queue: &mut self.ring,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the difference between our committed producer state and the kernel's consumer head.
|
||||
pub fn pending(&self) -> u32 {
|
||||
self.ring.count_pending()
|
||||
}
|
||||
|
||||
/// Query if the transmit queue needs to be woken to proceed receiving.
|
||||
///
|
||||
/// This is only accurate if `Umem::XDP_BIND_NEED_WAKEUP` was set.
|
||||
pub fn needs_wakeup(&self) -> bool {
|
||||
self.ring.check_flags() & Self::XDP_RING_NEED_WAKEUP != 0
|
||||
}
|
||||
|
||||
/// Send a message (with `MSG_DONTWAIT`) to wake up the transmit queue.
|
||||
pub fn wake(&self) {
|
||||
// FIXME: should somehow log this on failure, right?
|
||||
let _ = unsafe {
|
||||
libc::sendto(
|
||||
self.fd.0,
|
||||
core::ptr::null_mut(),
|
||||
0,
|
||||
libc::MSG_DONTWAIT,
|
||||
core::ptr::null_mut(),
|
||||
0,
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
/// Get the raw file descriptor of this TX ring.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Use the file descriptor to attach the ring to an XSK map, for instance, but do not close it
|
||||
/// and avoid modifying it (unless you know what you're doing). It should be treated as a
|
||||
/// `BorrowedFd<'_>`. That said, it's not instant UB but probably delayed UB when the
|
||||
/// `RingTx` modifies a reused file descriptor that it assumes to own (for instance, `wake`
|
||||
/// sends a message to it).
|
||||
pub fn as_raw_fd(&self) -> libc::c_int {
|
||||
self.fd.0
|
||||
}
|
||||
|
||||
/// Submit a frame back to the kernel for transmission or reuse.
|
||||
pub fn submit_frame(&mut self, addr: u64) -> Result<(), XskError> {
|
||||
// Ensure there is space in the ring
|
||||
if self.ring.count_free(1) < 1 {
|
||||
return Err(XskError::Errno {
|
||||
errno: libc::ENOBUFS,
|
||||
});
|
||||
}
|
||||
|
||||
// Reserve space in the ring
|
||||
let mut idx = BufIdx(0);
|
||||
self.ring.reserve(1..=1, &mut idx);
|
||||
|
||||
// Write the address into the descriptor
|
||||
unsafe {
|
||||
*self.ring.fill_addr(idx).as_mut() = addr;
|
||||
}
|
||||
|
||||
// Commit the submission
|
||||
self.ring.submit(1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct BufIdxIter {
|
||||
/// The base of our operation.
|
||||
base: BufIdx,
|
||||
/// The number of peeked buffers.
|
||||
buffers: u32,
|
||||
/// The number of buffers still left.
|
||||
remain: u32,
|
||||
}
|
||||
|
||||
/// A writer to a fill queue.
|
||||
///
|
||||
/// Created with [`DeviceQueue::fill`].
|
||||
///
|
||||
/// The owner of this value should call some of the insertion methods in any order, then release
|
||||
/// the writes by [`WriteFill::commit`] which performs an atomic release in the Umem queue.
|
||||
#[must_use = "Does nothing unless the writes are committed"]
|
||||
pub struct WriteFill<'queue> {
|
||||
idx: BufIdxIter,
|
||||
/// The queue we read from.
|
||||
queue: &'queue mut RingProd,
|
||||
}
|
||||
|
||||
/// A reader from a completion queue.
|
||||
///
|
||||
/// Created with [`DeviceQueue::complete`].
|
||||
///
|
||||
/// The owner of this value should call some of the reader methods or iteration in any order, then
|
||||
/// mark the reads by [`ReadComplete::release`], which performs an atomic release in the Umem
|
||||
/// queue.
|
||||
#[must_use = "Does nothing unless the reads are committed"]
|
||||
pub struct ReadComplete<'queue> {
|
||||
idx: BufIdxIter,
|
||||
/// The queue we read from.
|
||||
queue: &'queue mut RingCons,
|
||||
}
|
||||
|
||||
/// A writer to a transmission (TX) queue.
|
||||
///
|
||||
/// Created with [`RingTx::transmit`].
|
||||
///
|
||||
/// The owner of this value should call some of the insertion methods in any order, then release
|
||||
/// the writes by [`WriteTx::commit`] which performs an atomic release in the Umem queue.
|
||||
#[must_use = "Does nothing unless the writes are committed"]
|
||||
pub struct WriteTx<'queue> {
|
||||
idx: BufIdxIter,
|
||||
/// The queue we read from.
|
||||
queue: &'queue mut RingProd,
|
||||
}
|
||||
|
||||
/// A reader from an receive (RX) queue.
|
||||
///
|
||||
/// Created with [`RingRx::receive`].
|
||||
///
|
||||
/// The owner of this value should call some of the reader methods or iteration in any order, then
|
||||
/// mark the reads by [`ReadRx::release`], which performs an atomic release in the Umem queue.
|
||||
#[must_use = "Does nothing unless the reads are committed"]
|
||||
pub struct ReadRx<'queue> {
|
||||
idx: BufIdxIter,
|
||||
/// The queue we read from.
|
||||
queue: &'queue mut RingCons,
|
||||
}
|
||||
|
||||
impl Iterator for BufIdxIter {
|
||||
type Item = BufIdx;
|
||||
fn next(&mut self) -> Option<BufIdx> {
|
||||
let next = self.remain.checked_sub(1)?;
|
||||
self.remain = next;
|
||||
let ret = self.base;
|
||||
self.base.0 = self.base.0.wrapping_add(1);
|
||||
Some(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl BufIdxIter {
|
||||
fn peek(queue: &mut RingCons, n: u32) -> Self {
|
||||
let mut this = Self {
|
||||
buffers: 0,
|
||||
remain: 0,
|
||||
base: BufIdx(0),
|
||||
};
|
||||
this.buffers = queue.peek(1..=n, &mut this.base);
|
||||
this.remain = this.buffers;
|
||||
this
|
||||
}
|
||||
|
||||
fn reserve(queue: &mut RingProd, n: u32) -> Self {
|
||||
let mut this = Self {
|
||||
buffers: 0,
|
||||
remain: 0,
|
||||
base: BufIdx(0),
|
||||
};
|
||||
this.buffers = queue.reserve(1..=n, &mut this.base);
|
||||
this.remain = this.buffers;
|
||||
this
|
||||
}
|
||||
|
||||
fn commit_prod(&mut self, queue: &mut RingProd) {
|
||||
// This contains an atomic write, which LLVM won't even try to optimize away.
|
||||
// But, as long as queues are filled there's a decent chance that we didn't manage to
|
||||
// reserve or fill a single buffer.
|
||||
//
|
||||
// FIXME: Should we expose this as a hint to the user? I.e. `commit_likely_empty` with a
|
||||
// hint. As well as better ways to avoid doing any work at all.
|
||||
if self.buffers > 0 {
|
||||
let count = self.buffers - self.remain;
|
||||
queue.submit(count);
|
||||
self.buffers -= count;
|
||||
self.base.0 += count;
|
||||
}
|
||||
}
|
||||
|
||||
fn release_cons(&mut self, queue: &mut RingCons) {
|
||||
// See also `commit_prod`.
|
||||
if self.buffers > 0 {
|
||||
let count = self.buffers - self.remain;
|
||||
queue.release(count);
|
||||
self.buffers -= count;
|
||||
self.base.0 += count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WriteFill<'_> {
|
||||
/// The total number of available slots.
|
||||
pub fn capacity(&self) -> u32 {
|
||||
self.idx.buffers
|
||||
}
|
||||
|
||||
/// Fill one device descriptor to be filled.
|
||||
///
|
||||
/// A descriptor is an offset in the respective Umem's memory. Any offset within a chunk can
|
||||
/// be used to mark the chunk as available for fill. The kernel will overwrite the contents
|
||||
/// arbitrarily until the chunk is returned via the RX queue.
|
||||
///
|
||||
/// Returns if the insert was successful, that is false if the ring is full. It's guaranteed
|
||||
/// that the first [`WriteFill::capacity`] inserts with this function succeed.
|
||||
pub fn insert_once(&mut self, nr: u64) -> bool {
|
||||
self.insert(core::iter::once(nr)) > 0
|
||||
}
|
||||
|
||||
/// Fill additional slots that were reserved.
|
||||
///
|
||||
/// The iterator is polled only for each available slot until either is empty. Returns the
|
||||
/// total number of slots filled.
|
||||
pub fn insert(&mut self, it: impl Iterator<Item = u64>) -> u32 {
|
||||
let mut n = 0;
|
||||
for (item, bufidx) in it.zip(self.idx.by_ref()) {
|
||||
n += 1;
|
||||
unsafe { *self.queue.fill_addr(bufidx).as_ptr() = item };
|
||||
}
|
||||
n
|
||||
}
|
||||
|
||||
/// Commit the previously written buffers to the kernel.
|
||||
pub fn commit(&mut self) {
|
||||
self.idx.commit_prod(self.queue)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WriteFill<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Unless everything is committed, roll back the cached queue state.
|
||||
if self.idx.buffers != 0 {
|
||||
self.queue.cancel(self.idx.buffers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadComplete<'_> {
|
||||
/// The total number of available buffers.
|
||||
pub fn capacity(&self) -> u32 {
|
||||
self.idx.buffers
|
||||
}
|
||||
|
||||
/// Read the next descriptor, an address of a chunk that was transmitted.
|
||||
pub fn read(&mut self) -> Option<u64> {
|
||||
let bufidx = self.idx.next()?;
|
||||
// Safety: the buffer is from that same queue by construction.
|
||||
Some(unsafe { *self.queue.comp_addr(bufidx).as_ptr() })
|
||||
}
|
||||
|
||||
/// Commit some of the written buffers to the kernel.
|
||||
pub fn release(&mut self) {
|
||||
self.idx.release_cons(self.queue)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ReadComplete<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Unless everything is committed, roll back the cached queue state.
|
||||
if self.idx.buffers != 0 {
|
||||
self.queue.cancel(self.idx.buffers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ReadComplete<'_> {
|
||||
type Item = u64;
|
||||
|
||||
fn next(&mut self) -> Option<u64> {
|
||||
self.read()
|
||||
}
|
||||
}
|
||||
|
||||
impl WriteTx<'_> {
|
||||
/// The total number of available slots.
|
||||
pub fn capacity(&self) -> u32 {
|
||||
self.idx.buffers
|
||||
}
|
||||
|
||||
/// Insert a chunk descriptor to be sent.
|
||||
///
|
||||
/// Returns if the insert was successful, that is false if the ring is full. It's guaranteed
|
||||
/// that the first [`WriteTx::capacity`] inserts with this function succeed.
|
||||
pub fn insert_once(&mut self, nr: xdp_desc) -> bool {
|
||||
self.insert(core::iter::once(nr)) > 0
|
||||
}
|
||||
|
||||
/// Fill the transmit ring from an iterator.
|
||||
///
|
||||
/// Returns the total number of enqueued descriptor. This is a `u32` as it is the common
|
||||
/// integral type for describing cardinalities of descriptors in a ring. Use an inspecting
|
||||
/// iterator for a more intrusive callback.
|
||||
pub fn insert(&mut self, it: impl Iterator<Item = xdp_desc>) -> u32 {
|
||||
let mut n = 0;
|
||||
// FIXME: incorrect iteration order? Some items may get consumed but not inserted.
|
||||
for (item, bufidx) in it.zip(self.idx.by_ref()) {
|
||||
n += 1;
|
||||
unsafe { *self.queue.tx_desc(bufidx).as_ptr() = item };
|
||||
}
|
||||
n
|
||||
}
|
||||
|
||||
/// Commit the previously written buffers to the kernel.
|
||||
pub fn commit(&mut self) {
|
||||
self.idx.commit_prod(self.queue);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WriteTx<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Unless everything is committed, roll back the cached queue state.
|
||||
if self.idx.buffers != 0 {
|
||||
self.queue.cancel(self.idx.buffers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadRx<'_> {
|
||||
/// The total number of available buffers.
|
||||
pub fn capacity(&self) -> u32 {
|
||||
self.idx.buffers
|
||||
}
|
||||
|
||||
/// Read one descriptor from the receive ring.
|
||||
pub fn read(&mut self) -> Option<xdp_desc> {
|
||||
let bufidx = self.idx.next()?;
|
||||
// Safety: the buffer is from that same queue by construction, by assumption this is within
|
||||
// the valid memory region of the mapping.
|
||||
// FIXME: queue could validate that this is aligned.
|
||||
Some(unsafe { *self.queue.rx_desc(bufidx).as_ptr() })
|
||||
}
|
||||
|
||||
/// Commit some of the written buffers to the kernel.
|
||||
pub fn release(&mut self) {
|
||||
self.idx.release_cons(self.queue)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ReadRx<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Unless everything is committed, roll back the cached queue state.
|
||||
if self.idx.buffers != 0 {
|
||||
self.queue.cancel(self.idx.buffers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ReadRx<'_> {
|
||||
type Item = xdp_desc;
|
||||
|
||||
fn next(&mut self) -> Option<xdp_desc> {
|
||||
self.read()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue