diff --git a/aya-log/Cargo.toml b/aya-log/Cargo.toml index 97424297..1eb4cec4 100644 --- a/aya-log/Cargo.toml +++ b/aya-log/Cargo.toml @@ -11,12 +11,12 @@ documentation = "https://docs.rs/aya-log" edition = "2021" [dependencies] -aya = { path = "../aya", version = "0.11.0", features=["async_tokio"] } +aya = { path = "../aya", version = "0.11.0" } aya-log-common = { path = "../aya-log-common", version = "0.1.12-dev.0", features=["userspace"] } thiserror = "1" log = "0.4" bytes = "1.1" -tokio = { version = "1.2.0" } +tokio = { version = "1.2.0", optional = true } [dev-dependencies] env_logger = "0.9" @@ -24,3 +24,6 @@ testing_logger = "0.1.1" [lib] path = "src/lib.rs" + +[features] +async_tokio = ["tokio", "aya/async_tokio"] diff --git a/aya-log/src/lib.rs b/aya-log/src/lib.rs index b56c16e6..a4f57273 100644 --- a/aya-log/src/lib.rs +++ b/aya-log/src/lib.rs @@ -63,14 +63,69 @@ use log::{error, Level, Log, Record}; use thiserror::Error; use aya::{ - maps::{ - perf::{AsyncPerfEventArray, PerfBufferError}, - MapError, - }, + maps::{perf::PerfBufferError, MapError}, util::online_cpus, Bpf, Pod, }; +fn buffers() -> Vec { + (0..10) + .map(|_| BytesMut::with_capacity(LOG_BUF_CAPACITY)) + .collect() +} + +macro_rules! read_events { + ($logger:expr, $buffers:expr, $events:expr) => { + #[allow(clippy::needless_range_loop)] + for i in 0..$events.read { + let buf = &mut $buffers[i]; + log_buf(buf, &*$logger).unwrap(); + } + }; +} + +#[cfg(feature = "async_tokio")] +fn logger_thread(bpf: &mut Bpf, logger: Arc) -> Result<(), Error> { + use aya::maps::perf::AsyncPerfEventArray; + let mut logs: AsyncPerfEventArray<_> = bpf.map_mut("AYA_LOGS")?.try_into()?; + + for cpu_id in online_cpus().map_err(Error::InvalidOnlineCpu)? { + let mut buf = logs.open(cpu_id, None)?; + + let log = logger.clone(); + tokio::spawn(async move { + let mut buffers = buffers(); + loop { + let events = buf.read_events(&mut buffers).await.unwrap(); + read_events!(log, buffers, events); + } + }); + } + + Ok(()) +} + +#[cfg(not(feature = "async_tokio"))] +fn logger_thread(bpf: &mut Bpf, logger: Arc) -> Result<(), Error> { + use aya::maps::perf::PerfEventArray; + let mut logs: PerfEventArray<_> = bpf.map_mut("AYA_LOGS")?.try_into()?; + + for cpu_id in online_cpus().map_err(Error::InvalidOnlineCpu)? { + let mut buf = logs.open(cpu_id, None)?; + + let log = logger.clone(); + std::thread::spawn(move || { + let mut buffers = buffers(); + loop { + let events = buf.read_events(&mut buffers).unwrap(); + read_events!(log, buffers, events); + } + }); + } + + Ok(()) +} + /// Log messages generated by `aya_log_ebpf` using the [log] crate. /// /// For more details see the [module level documentation](crate). @@ -90,28 +145,7 @@ impl BpfLogger { logger: T, ) -> Result { let logger = Arc::new(logger); - let mut logs: AsyncPerfEventArray<_> = bpf.map_mut("AYA_LOGS")?.try_into()?; - - for cpu_id in online_cpus().map_err(Error::InvalidOnlineCpu)? { - let mut buf = logs.open(cpu_id, None)?; - - let log = logger.clone(); - tokio::spawn(async move { - let mut buffers = (0..10) - .map(|_| BytesMut::with_capacity(LOG_BUF_CAPACITY)) - .collect::>(); - - loop { - let events = buf.read_events(&mut buffers).await.unwrap(); - - #[allow(clippy::needless_range_loop)] - for i in 0..events.read { - let buf = &mut buffers[i]; - log_buf(buf, &*log).unwrap(); - } - } - }); - } + logger_thread(bpf, logger)?; Ok(BpfLogger {}) }