aya-log: Make tokio optional

Use tokio only when `async_tokio` feature is enabled, otherwise use
`std::thread`.

Tested on: https://github.com/vadorovsky/aya-examples/tree/main/notokio

Signed-off-by: Michal Rostecki <vadorovsky@gmail.com>
pull/396/head
Michal Rostecki 3 years ago
parent a93a975cc6
commit c04cae32b2

@ -11,12 +11,12 @@ documentation = "https://docs.rs/aya-log"
edition = "2021"
[dependencies]
aya = { path = "../aya", version = "0.11.0", features=["async_tokio"] }
aya = { path = "../aya", version = "0.11.0" }
aya-log-common = { path = "../aya-log-common", version = "0.1.12-dev.0", features=["userspace"] }
thiserror = "1"
log = "0.4"
bytes = "1.1"
tokio = { version = "1.2.0" }
tokio = { version = "1.2.0", optional = true }
[dev-dependencies]
env_logger = "0.9"
@ -24,3 +24,6 @@ testing_logger = "0.1.1"
[lib]
path = "src/lib.rs"
[features]
async_tokio = ["tokio", "aya/async_tokio"]

@ -63,14 +63,69 @@ use log::{error, Level, Log, Record};
use thiserror::Error;
use aya::{
maps::{
perf::{AsyncPerfEventArray, PerfBufferError},
MapError,
},
maps::{perf::PerfBufferError, MapError},
util::online_cpus,
Bpf, Pod,
};
fn buffers() -> Vec<BytesMut> {
(0..10)
.map(|_| BytesMut::with_capacity(LOG_BUF_CAPACITY))
.collect()
}
macro_rules! read_events {
($logger:expr, $buffers:expr, $events:expr) => {
#[allow(clippy::needless_range_loop)]
for i in 0..$events.read {
let buf = &mut $buffers[i];
log_buf(buf, &*$logger).unwrap();
}
};
}
#[cfg(feature = "async_tokio")]
fn logger_thread<T: Log + 'static>(bpf: &mut Bpf, logger: Arc<T>) -> Result<(), Error> {
use aya::maps::perf::AsyncPerfEventArray;
let mut logs: AsyncPerfEventArray<_> = bpf.map_mut("AYA_LOGS")?.try_into()?;
for cpu_id in online_cpus().map_err(Error::InvalidOnlineCpu)? {
let mut buf = logs.open(cpu_id, None)?;
let log = logger.clone();
tokio::spawn(async move {
let mut buffers = buffers();
loop {
let events = buf.read_events(&mut buffers).await.unwrap();
read_events!(log, buffers, events);
}
});
}
Ok(())
}
#[cfg(not(feature = "async_tokio"))]
fn logger_thread<T: Log + 'static>(bpf: &mut Bpf, logger: Arc<T>) -> Result<(), Error> {
use aya::maps::perf::PerfEventArray;
let mut logs: PerfEventArray<_> = bpf.map_mut("AYA_LOGS")?.try_into()?;
for cpu_id in online_cpus().map_err(Error::InvalidOnlineCpu)? {
let mut buf = logs.open(cpu_id, None)?;
let log = logger.clone();
std::thread::spawn(move || {
let mut buffers = buffers();
loop {
let events = buf.read_events(&mut buffers).unwrap();
read_events!(log, buffers, events);
}
});
}
Ok(())
}
/// Log messages generated by `aya_log_ebpf` using the [log] crate.
///
/// For more details see the [module level documentation](crate).
@ -90,28 +145,7 @@ impl BpfLogger {
logger: T,
) -> Result<BpfLogger, Error> {
let logger = Arc::new(logger);
let mut logs: AsyncPerfEventArray<_> = bpf.map_mut("AYA_LOGS")?.try_into()?;
for cpu_id in online_cpus().map_err(Error::InvalidOnlineCpu)? {
let mut buf = logs.open(cpu_id, None)?;
let log = logger.clone();
tokio::spawn(async move {
let mut buffers = (0..10)
.map(|_| BytesMut::with_capacity(LOG_BUF_CAPACITY))
.collect::<Vec<_>>();
loop {
let events = buf.read_events(&mut buffers).await.unwrap();
#[allow(clippy::needless_range_loop)]
for i in 0..events.read {
let buf = &mut buffers[i];
log_buf(buf, &*log).unwrap();
}
}
});
}
logger_thread(bpf, logger)?;
Ok(BpfLogger {})
}

Loading…
Cancel
Save