diff --git a/Cargo.lock b/Cargo.lock index a017eb0..7a5bf43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,6 +367,94 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -779,6 +867,7 @@ dependencies = [ "claims", "clap", "common", + "futures", "miette", "rustls", "serde", @@ -961,6 +1050,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/Cargo.toml b/Cargo.toml index e78c2b6..3649aad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ common = { path = "common" } aws-lc-rs = "1" base64 = "0.22" clap = { version = "4.5", features = ["derive"] } +futures = "0.3" miette = { version = "7", features = ["fancy"] } rcgen = "0.14" rustls = { version = "0.23", default-features = false, features = [ diff --git a/runner/Cargo.toml b/runner/Cargo.toml index adcb657..a2ff4ee 100644 --- a/runner/Cargo.toml +++ b/runner/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true [dependencies] clap.workspace = true common.workspace = true +futures.workspace = true miette.workspace = true rustls.workspace = true serde.workspace = true diff --git a/runner/src/main.rs b/runner/src/main.rs index dec1138..ed772c8 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -11,6 +11,7 @@ use common::{ BenchRecord, KeyExchangeMode, protocol::{read_payload, write_request}, }; +use futures::{StreamExt, stream::FuturesUnordered}; use miette::{Context, IntoDiagnostic}; use runner::{ args::Args, @@ -34,7 +35,7 @@ use std::{ sync::Arc, time::Instant, }; -use tokio::{net::TcpStream, sync::Semaphore, task::JoinHandle}; +use tokio::net::TcpStream; use tokio_rustls::TlsConnector; use tracing::info; use tracing_subscriber::EnvFilter; @@ -184,87 +185,67 @@ async fn run_benchmark( info!("warmup complete"); #[allow(clippy::cast_possible_truncation)] // concurrency is limited to reasonable values - let semaphore = Arc::new(Semaphore::new(config.concurrency as usize)); - let tasks = spawn_benchmark_tasks(config, &semaphore, tls_connector, server_name); - - { - let mut output = stdout(); - write_results(&mut output, tasks).await?; - output - .flush() - .into_diagnostic() - .context("failed to flush output")?; - } + let mut output = stdout(); + run_and_write(config, tls_connector, server_name, &mut output).await?; + output + .flush() + .into_diagnostic() + .context("failed to flush output")?; info!("benchmark complete"); Ok(()) } -type ReturnHandle = JoinHandle>; - -fn spawn_benchmark_tasks( - config: &runner::config::BenchmarkConfig, - semaphore: &Arc, - tls_connector: &TlsConnector, - server_name: &ServerName<'static>, -) -> Vec { - let server = config.server; - let payload_bytes = config.payload; - - (0..config.iters) - .map(|i| { - spawn_single_iteration( - i, - payload_bytes, - config.mode, - server, - semaphore.clone(), - tls_connector.clone(), - server_name.clone(), - ) - }) - .collect() -} - -fn spawn_single_iteration( +async fn run_single_iteration( i: u32, payload_bytes: u32, mode: KeyExchangeMode, server: SocketAddr, - semaphore: Arc, tls_connector: TlsConnector, server_name: ServerName<'static>, -) -> ReturnHandle { - tokio::spawn(async move { - let _permit = semaphore - .acquire() - .await - .expect("semaphore should not be closed"); +) -> miette::Result { + let result = run_iteration(server, payload_bytes, &tls_connector, &server_name).await?; - let result = run_iteration(server, payload_bytes, &tls_connector, &server_name).await?; - - Ok(BenchRecord { - iteration: u64::from(i), - mode, - payload_bytes: u64::from(payload_bytes), - tcp_ns: result.tcp, - handshake_ns: result.handshake, - ttlb_ns: result.ttlb, - }) + Ok(BenchRecord { + iteration: u64::from(i), + mode, + payload_bytes: u64::from(payload_bytes), + tcp_ns: result.tcp, + handshake_ns: result.handshake, + ttlb_ns: result.ttlb, }) } -// #[allow(clippy::future_not_send)] // dyn Write is not Send -async fn write_results( +async fn run_and_write( + config: &BenchmarkConfig, + tls_connector: &TlsConnector, + server_name: &ServerName<'static>, output: &mut W, - tasks: Vec, ) -> miette::Result<()> { - for task in tasks { - let record = task.await.into_diagnostic().context("task paniced")??; - writeln!(output, "{record}") - .into_diagnostic() - .context("failed to write record")?; + let mut in_flight = FuturesUnordered::new(); + let mut issued = 0; + + loop { + while issued < config.iters && in_flight.len() < config.concurrency as usize { + in_flight.push(run_single_iteration( + issued, + config.payload, + config.mode, + config.server, + tls_connector.clone(), + server_name.clone(), + )); + issued += 1; + } + + match in_flight.next().await { + Some(record) => writeln!(output, "{}", record?) + .into_diagnostic() + .context("failed to write record")?, + None => break, + } } + Ok(()) }