From 5fa52ac5c11076ea0962d92ae8c49c87af1f3445 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Sat, 5 Jul 2025 14:45:20 -0400 Subject: [PATCH] integration-test: remove an allocation --- test/integration-test/Cargo.toml | 2 +- test/integration-test/src/tests/ring_buf.rs | 81 +++++++++------------ 2 files changed, 37 insertions(+), 46 deletions(-) diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index f908c4ea..dd1e5447 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -21,7 +21,7 @@ aya-log = { path = "../../aya-log", version = "^0.2.1", default-features = false aya-obj = { path = "../../aya-obj", version = "^0.2.1", default-features = false } env_logger = { workspace = true } epoll = { workspace = true } -futures = { workspace = true, features = ["std"] } +futures = { workspace = true, features = ["alloc"] } integration-common = { path = "../integration-common", features = ["user"] } libc = { workspace = true } log = { workspace = true } diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index c7bf633f..ba11e489 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -6,6 +6,7 @@ use std::{ atomic::{AtomicBool, Ordering}, }, thread, + time::Duration, }; use anyhow::Context as _; @@ -19,10 +20,7 @@ use aya_obj::generated::BPF_RINGBUF_HDR_SZ; use integration_common::ring_buf::Registers; use rand::Rng as _; use test_log::test; -use tokio::{ - io::unix::AsyncFd, - time::{Duration, sleep}, -}; +use tokio::io::unix::AsyncFd; struct RingBufTest { _bpf: Ebpf, @@ -178,49 +176,42 @@ async fn ring_buf_async_with_drops() { seen += 1; } }; - use futures::future::{ - Either::{Left, Right}, - select, - }; - let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { - tokio::spawn(async { - for value in v { - ring_buf_trigger_ebpf_program(value); - } - }) - })); - let readable = { - let mut writer = writer; - loop { - let readable = Box::pin(async_fd.readable_mut()); - writer = match select(readable, writer).await { - Left((guard, writer)) => { - let mut guard = guard.unwrap(); - process_ring_buf(guard.get_inner_mut()); - guard.clear_ready(); - writer - } - Right((writer, readable)) => { - writer.unwrap(); - break readable; + let mut writer = + futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { + tokio::spawn(async { + for value in v { + ring_buf_trigger_ebpf_program(value); } + }) + })); + loop { + let readable = async_fd.readable_mut(); + futures::pin_mut!(readable); + match futures::future::select(readable, &mut writer).await { + futures::future::Either::Left((guard, _writer)) => { + let mut guard = guard.unwrap(); + process_ring_buf(guard.get_inner_mut()); + guard.clear_ready(); } - } - }; + futures::future::Either::Right((writer, readable)) => { + writer.unwrap(); + + // If there's more to read, we should receive a readiness notification in a timely + // manner. If we don't then, then assert that there's nothing else to read. Note + // that it's important to wait some time before attempting to read, otherwise we may + // catch up with the producer before epoll has an opportunity to send a + // notification; our consumer thread can race with the kernel epoll check. + match tokio::time::timeout(Duration::from_millis(10), readable).await { + Err(tokio::time::error::Elapsed { .. }) => (), + Ok(guard) => { + let mut guard = guard.unwrap(); + process_ring_buf(guard.get_inner_mut()); + guard.clear_ready(); + } + } - // If there's more to read, we should receive a readiness notification in a timely manner. - // If we don't then, then assert that there's nothing else to read. Note that it's important - // to wait some time before attempting to read, otherwise we may catch up with the producer - // before epoll has an opportunity to send a notification; our consumer thread can race - // with the kernel epoll check. - let sleep_fut = sleep(Duration::from_millis(10)); - tokio::pin!(sleep_fut); - match select(sleep_fut, readable).await { - Left(((), _)) => {} - Right((guard, _)) => { - let mut guard = guard.unwrap(); - process_ring_buf(guard.get_inner_mut()); - guard.clear_ready(); + break; + } } } @@ -280,7 +271,7 @@ async fn ring_buf_async_no_drop() { for (value, duration) in data { // Sleep a tad so we feel confident that the consumer will keep up // and no messages will be dropped. - sleep(duration).await; + tokio::time::sleep(duration).await; ring_buf_trigger_ebpf_program(value); } })