refactor(runner): bound memory usage with FuturesUnordered worker pool

This commit is contained in:
2026-02-25 17:13:36 +02:00
parent b519c4f059
commit 31f5fc5b44
4 changed files with 142 additions and 64 deletions

View File

@@ -7,6 +7,7 @@ edition.workspace = true
[dependencies]
clap.workspace = true
common.workspace = true
futures.workspace = true
miette.workspace = true
rustls.workspace = true
serde.workspace = true

View File

@@ -11,6 +11,7 @@ use common::{
BenchRecord, KeyExchangeMode,
protocol::{read_payload, write_request},
};
use futures::{StreamExt, stream::FuturesUnordered};
use miette::{Context, IntoDiagnostic};
use runner::{
args::Args,
@@ -34,7 +35,7 @@ use std::{
sync::Arc,
time::Instant,
};
use tokio::{net::TcpStream, sync::Semaphore, task::JoinHandle};
use tokio::net::TcpStream;
use tokio_rustls::TlsConnector;
use tracing::info;
use tracing_subscriber::EnvFilter;
@@ -184,87 +185,67 @@ async fn run_benchmark(
info!("warmup complete");
#[allow(clippy::cast_possible_truncation)] // concurrency is limited to reasonable values
let semaphore = Arc::new(Semaphore::new(config.concurrency as usize));
let tasks = spawn_benchmark_tasks(config, &semaphore, tls_connector, server_name);
{
let mut output = stdout();
write_results(&mut output, tasks).await?;
output
.flush()
.into_diagnostic()
.context("failed to flush output")?;
}
let mut output = stdout();
run_and_write(config, tls_connector, server_name, &mut output).await?;
output
.flush()
.into_diagnostic()
.context("failed to flush output")?;
info!("benchmark complete");
Ok(())
}
type ReturnHandle = JoinHandle<miette::Result<BenchRecord>>;
fn spawn_benchmark_tasks(
config: &runner::config::BenchmarkConfig,
semaphore: &Arc<Semaphore>,
tls_connector: &TlsConnector,
server_name: &ServerName<'static>,
) -> Vec<ReturnHandle> {
let server = config.server;
let payload_bytes = config.payload;
(0..config.iters)
.map(|i| {
spawn_single_iteration(
i,
payload_bytes,
config.mode,
server,
semaphore.clone(),
tls_connector.clone(),
server_name.clone(),
)
})
.collect()
}
fn spawn_single_iteration(
async fn run_single_iteration(
i: u32,
payload_bytes: u32,
mode: KeyExchangeMode,
server: SocketAddr,
semaphore: Arc<Semaphore>,
tls_connector: TlsConnector,
server_name: ServerName<'static>,
) -> ReturnHandle {
tokio::spawn(async move {
let _permit = semaphore
.acquire()
.await
.expect("semaphore should not be closed");
) -> miette::Result<BenchRecord> {
let result = run_iteration(server, payload_bytes, &tls_connector, &server_name).await?;
let result = run_iteration(server, payload_bytes, &tls_connector, &server_name).await?;
Ok(BenchRecord {
iteration: u64::from(i),
mode,
payload_bytes: u64::from(payload_bytes),
tcp_ns: result.tcp,
handshake_ns: result.handshake,
ttlb_ns: result.ttlb,
})
Ok(BenchRecord {
iteration: u64::from(i),
mode,
payload_bytes: u64::from(payload_bytes),
tcp_ns: result.tcp,
handshake_ns: result.handshake,
ttlb_ns: result.ttlb,
})
}
// #[allow(clippy::future_not_send)] // dyn Write is not Send
async fn write_results<W: Write + Send>(
async fn run_and_write<W: Write + Send>(
config: &BenchmarkConfig,
tls_connector: &TlsConnector,
server_name: &ServerName<'static>,
output: &mut W,
tasks: Vec<ReturnHandle>,
) -> miette::Result<()> {
for task in tasks {
let record = task.await.into_diagnostic().context("task paniced")??;
writeln!(output, "{record}")
.into_diagnostic()
.context("failed to write record")?;
let mut in_flight = FuturesUnordered::new();
let mut issued = 0;
loop {
while issued < config.iters && in_flight.len() < config.concurrency as usize {
in_flight.push(run_single_iteration(
issued,
config.payload,
config.mode,
config.server,
tls_connector.clone(),
server_name.clone(),
));
issued += 1;
}
match in_flight.next().await {
Some(record) => writeln!(output, "{}", record?)
.into_diagnostic()
.context("failed to write record")?,
None => break,
}
}
Ok(())
}