diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index c7bf633f..70cc4def 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -6,6 +6,7 @@ use std::{ atomic::{AtomicBool, Ordering}, }, thread, + time::Duration, }; use anyhow::Context as _; @@ -19,10 +20,7 @@ use aya_obj::generated::BPF_RINGBUF_HDR_SZ; use integration_common::ring_buf::Registers; use rand::Rng as _; use test_log::test; -use tokio::{ - io::unix::AsyncFd, - time::{Duration, sleep}, -}; +use tokio::io::unix::AsyncFd; struct RingBufTest { _bpf: Ebpf, @@ -178,49 +176,43 @@ async fn ring_buf_async_with_drops() { seen += 1; } }; - use futures::future::{ - Either::{Left, Right}, - select, - }; - let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { - tokio::spawn(async { - for value in v { - ring_buf_trigger_ebpf_program(value); - } - }) - })); - let readable = { - let mut writer = writer; + // Wrap in a scope to release the mutable borrow of `async_fd`. + { + tokio::pin!(let readable = async_fd.readable_mut();); + let mut writer = + futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { + tokio::spawn(async { + for value in v { + ring_buf_trigger_ebpf_program(value); + } + }) + })); loop { - let readable = Box::pin(async_fd.readable_mut()); - writer = match select(readable, writer).await { - Left((guard, writer)) => { + match futures::future::select(&mut readable, &mut writer).await { + futures::future::Either::Left((guard, _writer)) => { let mut guard = guard.unwrap(); process_ring_buf(guard.get_inner_mut()); guard.clear_ready(); - writer } - Right((writer, readable)) => { + futures::future::Either::Right((writer, _readable)) => { writer.unwrap(); - break readable; + break; } } } - }; - // If there's more to read, we should receive a readiness notification in a timely manner. - // If we don't then, then assert that there's nothing else to read. Note that it's important - // to wait some time before attempting to read, otherwise we may catch up with the producer - // before epoll has an opportunity to send a notification; our consumer thread can race - // with the kernel epoll check. - let sleep_fut = sleep(Duration::from_millis(10)); - tokio::pin!(sleep_fut); - match select(sleep_fut, readable).await { - Left(((), _)) => {} - Right((guard, _)) => { - let mut guard = guard.unwrap(); - process_ring_buf(guard.get_inner_mut()); - guard.clear_ready(); + // If there's more to read, we should receive a readiness notification in a timely manner. + // If we don't then, then assert that there's nothing else to read. Note that it's important + // to wait some time before attempting to read, otherwise we may catch up with the producer + // before epoll has an opportunity to send a notification; our consumer thread can race with + // the kernel epoll check. + match tokio::time::timeout(Duration::from_millis(10), readable).await { + Err(tokio::time::error::Elapsed { .. }) => (), + Ok(guard) => { + let mut guard = guard.unwrap(); + process_ring_buf(guard.get_inner_mut()); + guard.clear_ready(); + } } } @@ -280,7 +272,7 @@ async fn ring_buf_async_no_drop() { for (value, duration) in data { // Sleep a tad so we feel confident that the consumer will keep up // and no messages will be dropped. - sleep(duration).await; + tokio::time::sleep(duration).await; ring_buf_trigger_ebpf_program(value); } })