feat: add compio support for aya-log

reviewable/pr1272/r1
Sherlock Holo 6 days ago
parent ccf6c4707f
commit 5f6552c714
No known key found for this signature in database
GPG Key ID: EF99C5518030AAE6

@ -64,6 +64,7 @@ rust-version = "1.85.0"
anyhow = { version = "1", default-features = false } anyhow = { version = "1", default-features = false }
assert_matches = { version = "1.5.0", default-features = false } assert_matches = { version = "1.5.0", default-features = false }
async-io = { version = "2.0", default-features = false } async-io = { version = "2.0", default-features = false }
async-global-executor = { version = "3.1.0", default-features = false }
base64 = { version = "0.22.1", default-features = false } base64 = { version = "0.22.1", default-features = false }
bindgen = { version = "0.71", default-features = false } bindgen = { version = "0.71", default-features = false }
bitflags = { version = "2.2.1", default-features = false } bitflags = { version = "2.2.1", default-features = false }
@ -71,6 +72,7 @@ bytes = { version = "1", default-features = false }
cargo_metadata = { version = "0.19.0", default-features = false } cargo_metadata = { version = "0.19.0", default-features = false }
clap = { version = "4", default-features = false } clap = { version = "4", default-features = false }
const-assert = { version = "1.0.1", default-features = false } const-assert = { version = "1.0.1", default-features = false }
compio = { version = "0.14.0", default-features = false }
dialoguer = { version = "0.11", default-features = false } dialoguer = { version = "0.11", default-features = false }
diff = { version = "0.1.13", default-features = false } diff = { version = "0.1.13", default-features = false }
env_logger = { version = "0.11", default-features = false } env_logger = { version = "0.11", default-features = false }

@ -16,13 +16,21 @@ rust-version.workspace = true
[lints] [lints]
workspace = true workspace = true
[features]
default = ["async_tokio"]
async_tokio = ["aya/async_tokio", "dep:tokio"]
async_std = ["aya/async_std", "dep:async-global-executor"]
async_compio = ["aya/async_compio", "dep:compio"]
[dependencies] [dependencies]
aya = { path = "../aya", version = "^0.13.1", features = ["async_tokio"] } aya = { path = "../aya", version = "^0.13.1" }
aya-log-common = { path = "../aya-log-common", version = "^0.1.15", default-features = false } aya-log-common = { path = "../aya-log-common", version = "^0.1.15", default-features = false }
bytes = { workspace = true } bytes = { workspace = true }
log = { workspace = true } log = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt"] } tokio = { workspace = true, features = ["rt"], optional = true }
async-global-executor = { workspace = true, optional = true }
compio = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
env_logger = { workspace = true } env_logger = { workspace = true }

@ -165,7 +165,8 @@ impl EbpfLogger {
let mut buf = logs.open(cpu_id, None)?; let mut buf = logs.open(cpu_id, None)?;
let log = logger.clone(); let log = logger.clone();
tokio::spawn(async move {
let fut = async move {
let mut buffers = vec![BytesMut::with_capacity(LOG_BUF_CAPACITY); 10]; let mut buffers = vec![BytesMut::with_capacity(LOG_BUF_CAPACITY); 10];
loop { loop {
@ -175,7 +176,26 @@ impl EbpfLogger {
log_buf(buf.as_ref(), &*log).unwrap(); log_buf(buf.as_ref(), &*log).unwrap();
} }
} }
}); };
#[cfg(feature = "async_tokio")]
{
tokio::spawn(fut);
}
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
{
async_global_executor::spawn(fut).detach();
}
#[cfg(all(
not(feature = "async_tokio"),
not(feature = "async_std"),
feature = "async_compio"
))]
{
compio::runtime::spawn(fut).detach();
}
} }
Ok(()) Ok(())
} }

@ -22,6 +22,7 @@ async-io = { workspace = true, optional = true }
aya-obj = { path = "../aya-obj", version = "^0.2.1", features = ["std"] } aya-obj = { path = "../aya-obj", version = "^0.2.1", features = ["std"] }
bitflags = { workspace = true } bitflags = { workspace = true }
bytes = { workspace = true } bytes = { workspace = true }
compio = { workspace = true, optional = true, features = ["default"] }
hashbrown = { workspace = true } hashbrown = { workspace = true }
libc = { workspace = true } libc = { workspace = true }
log = { workspace = true } log = { workspace = true }
@ -36,6 +37,7 @@ tempfile = { workspace = true }
[features] [features]
async_std = ["dep:async-io"] async_std = ["dep:async-io"]
async_tokio = ["tokio/net"] async_tokio = ["tokio/net"]
async_compio = ["dep:compio"]
default = [] default = []
[package.metadata.docs.rs] [package.metadata.docs.rs]

@ -68,7 +68,11 @@
//unused_results, //unused_results,
)] )]
#![cfg_attr( #![cfg_attr(
all(feature = "async_tokio", feature = "async_std"), all(
feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
),
expect(unused_crate_dependencies) expect(unused_crate_dependencies)
)] )]

@ -92,8 +92,19 @@ pub use bloom_filter::BloomFilter;
pub use hash_map::{HashMap, PerCpuHashMap}; pub use hash_map::{HashMap, PerCpuHashMap};
pub use info::{MapInfo, MapType, loaded_maps}; pub use info::{MapInfo, MapType, loaded_maps};
pub use lpm_trie::LpmTrie; pub use lpm_trie::LpmTrie;
#[cfg(any(feature = "async_tokio", feature = "async_std"))] #[cfg(any(
#[cfg_attr(docsrs, doc(cfg(any(feature = "async_tokio", feature = "async_std"))))] feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
))]
#[cfg_attr(
docsrs,
doc(cfg(any(
feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
)))
)]
pub use perf::AsyncPerfEventArray; pub use perf::AsyncPerfEventArray;
pub use perf::PerfEventArray; pub use perf::PerfEventArray;
pub use queue::Queue; pub use queue::Queue;
@ -487,8 +498,8 @@ impl_try_from_map!(() {
SockMap, SockMap,
StackTraceMap, StackTraceMap,
XskMap, XskMap,
#[cfg(any(feature = "async_tokio", feature = "async_std"))] #[cfg(any(feature = "async_tokio", feature = "async_std", feature = "async_compio"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "async_tokio", feature = "async_std"))))] #[cfg_attr(docsrs, doc(cfg(any(feature = "async_tokio", feature = "async_std", feature = "async_compio"))))]
AsyncPerfEventArray from PerfEventArray, AsyncPerfEventArray from PerfEventArray,
}); });

@ -1,3 +1,9 @@
#[cfg(all(
not(feature = "async_tokio"),
not(feature = "async_std"),
feature = "async_compio"
))]
use std::os::fd::AsRawFd as _;
use std::{ use std::{
borrow::{Borrow, BorrowMut}, borrow::{Borrow, BorrowMut},
path::Path, path::Path,
@ -10,6 +16,18 @@ use std::{
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))] #[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
use async_io::Async; use async_io::Async;
use bytes::BytesMut; use bytes::BytesMut;
#[cfg(all(
not(feature = "async_tokio"),
not(feature = "async_std"),
feature = "async_compio"
))]
use compio::{
driver::{
SharedFd,
op::{Interest, PollOnce},
},
runtime,
};
#[cfg(feature = "async_tokio")] #[cfg(feature = "async_tokio")]
use tokio::io::unix::AsyncFd; use tokio::io::unix::AsyncFd;
@ -172,6 +190,16 @@ impl<T: BorrowMut<MapData>> AsyncPerfEventArrayBuffer<T> {
unsafe { buf.get_mut() } unsafe { buf.get_mut() }
}; };
#[cfg(all(
not(feature = "async_tokio"),
not(feature = "async_std"),
feature = "async_compio"
))]
{
let poll_once = PollOnce::new(SharedFd::new(buf.as_raw_fd()), Interest::Readable);
runtime::submit(poll_once).await.0?;
}
let events = buf.read_events(buffers)?; let events = buf.read_events(buffers)?;
const EMPTY: Events = Events { read: 0, lost: 0 }; const EMPTY: Events = Events { read: 0, lost: 0 };
if events != EMPTY { if events != EMPTY {

@ -2,14 +2,36 @@
//! `perf` API. //! `perf` API.
//! //!
//! See [`PerfEventArray`] and [`AsyncPerfEventArray`]. //! See [`PerfEventArray`] and [`AsyncPerfEventArray`].
#[cfg(any(feature = "async_tokio", feature = "async_std"))] #[cfg(any(
#[cfg_attr(docsrs, doc(cfg(any(feature = "async_tokio", feature = "async_std"))))] feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
))]
#[cfg_attr(
docsrs,
doc(cfg(any(
feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
)))
)]
mod async_perf_event_array; mod async_perf_event_array;
mod perf_buffer; mod perf_buffer;
mod perf_event_array; mod perf_event_array;
#[cfg(any(feature = "async_tokio", feature = "async_std"))] #[cfg(any(
#[cfg_attr(docsrs, doc(cfg(any(feature = "async_tokio", feature = "async_std"))))] feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
))]
#[cfg_attr(
docsrs,
doc(cfg(any(
feature = "async_tokio",
feature = "async_std",
feature = "async_compio"
)))
)]
pub use async_perf_event_array::*; pub use async_perf_event_array::*;
pub use perf_buffer::*; pub use perf_buffer::*;
pub use perf_event_array::*; pub use perf_event_array::*;

Loading…
Cancel
Save