integration-test: remove an allocation

reviewable/pr1288/r1
Tamir Duberstein 3 weeks ago
parent cfff75166c
commit 29756f02b1
No known key found for this signature in database

@ -6,6 +6,7 @@ use std::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
}, },
thread, thread,
time::Duration,
}; };
use anyhow::Context as _; use anyhow::Context as _;
@ -19,10 +20,7 @@ use aya_obj::generated::BPF_RINGBUF_HDR_SZ;
use integration_common::ring_buf::Registers; use integration_common::ring_buf::Registers;
use rand::Rng as _; use rand::Rng as _;
use test_log::test; use test_log::test;
use tokio::{ use tokio::io::unix::AsyncFd;
io::unix::AsyncFd,
time::{Duration, sleep},
};
struct RingBufTest { struct RingBufTest {
_bpf: Ebpf, _bpf: Ebpf,
@ -178,49 +176,43 @@ async fn ring_buf_async_with_drops() {
seen += 1; seen += 1;
} }
}; };
use futures::future::{ // Wrap in a scope to release the mutable borrow of `async_fd`.
Either::{Left, Right}, {
select, tokio::pin!(let readable = async_fd.readable_mut(););
}; let mut writer =
let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| {
tokio::spawn(async { tokio::spawn(async {
for value in v { for value in v {
ring_buf_trigger_ebpf_program(value); ring_buf_trigger_ebpf_program(value);
} }
}) })
})); }));
let readable = {
let mut writer = writer;
loop { loop {
let readable = Box::pin(async_fd.readable_mut()); match futures::future::select(&mut readable, &mut writer).await {
writer = match select(readable, writer).await { futures::future::Either::Left((guard, _writer)) => {
Left((guard, writer)) => {
let mut guard = guard.unwrap(); let mut guard = guard.unwrap();
process_ring_buf(guard.get_inner_mut()); process_ring_buf(guard.get_inner_mut());
guard.clear_ready(); guard.clear_ready();
writer
} }
Right((writer, readable)) => { futures::future::Either::Right((writer, _readable)) => {
writer.unwrap(); writer.unwrap();
break readable; break;
} }
} }
} }
};
// If there's more to read, we should receive a readiness notification in a timely manner. // If there's more to read, we should receive a readiness notification in a timely manner.
// If we don't then, then assert that there's nothing else to read. Note that it's important // If we don't then, then assert that there's nothing else to read. Note that it's important
// to wait some time before attempting to read, otherwise we may catch up with the producer // to wait some time before attempting to read, otherwise we may catch up with the producer
// before epoll has an opportunity to send a notification; our consumer thread can race // before epoll has an opportunity to send a notification; our consumer thread can race with
// with the kernel epoll check. // the kernel epoll check.
let sleep_fut = sleep(Duration::from_millis(10)); match tokio::time::timeout(Duration::from_millis(10), readable).await {
tokio::pin!(sleep_fut); Err(tokio::time::error::Elapsed { .. }) => (),
match select(sleep_fut, readable).await { Ok(guard) => {
Left(((), _)) => {} let mut guard = guard.unwrap();
Right((guard, _)) => { process_ring_buf(guard.get_inner_mut());
let mut guard = guard.unwrap(); guard.clear_ready();
process_ring_buf(guard.get_inner_mut()); }
guard.clear_ready();
} }
} }
@ -280,7 +272,7 @@ async fn ring_buf_async_no_drop() {
for (value, duration) in data { for (value, duration) in data {
// Sleep a tad so we feel confident that the consumer will keep up // Sleep a tad so we feel confident that the consumer will keep up
// and no messages will be dropped. // and no messages will be dropped.
sleep(duration).await; tokio::time::sleep(duration).await;
ring_buf_trigger_ebpf_program(value); ring_buf_trigger_ebpf_program(value);
} }
}) })

Loading…
Cancel
Save