From 856da79166d3dc9e6a67a2d315beb255fde14766 Mon Sep 17 00:00:00 2001 From: TQ Hirsch Date: Sat, 4 May 2024 18:04:48 +0200 Subject: [PATCH] Scanning works, but is an order of magnitude slower than expected --- Cargo.lock | 231 ++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 + local.toml | 8 ++ src/config.rs | 123 +++++++++++++------- src/main.rs | 49 +++++++- src/report.rs | 22 ++-- src/scanner.rs | 301 ++++++++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 685 insertions(+), 52 deletions(-) create mode 100644 local.toml diff --git a/Cargo.lock b/Cargo.lock index dd944fa..7b27fd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" + +[[package]] +name = "anstyle-parse" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.82" @@ -45,6 +94,7 @@ dependencies = [ "anyhow", "base64", "chrono", + "clap", "futures", "hex", "openssl", @@ -53,8 +103,10 @@ dependencies = [ "serde_with", "thiserror", "tokio", + "tokio-openssl", "toml", "tracing", + "tracing-subscriber", ] [[package]] @@ -135,6 +187,52 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "clap" +version = "4.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" + +[[package]] +name = "colorchoice" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -161,7 +259,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn", ] @@ -320,6 +418,12 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -386,6 +490,12 @@ dependencies = [ "serde", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + [[package]] name = "itoa" version = "1.0.11" @@ -401,6 +511,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.154" @@ -449,6 +565,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -527,6 +653,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -689,6 +821,15 @@ dependencies = [ "syn", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "slab" version = "0.4.9" @@ -720,6 +861,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "2.0.60" @@ -751,6 +898,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.36" @@ -811,6 +968,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-openssl" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ffab79df67727f6acf57f1ff743091873c24c579b1e2ce4d8f53e47ded4d63d" +dependencies = [ + "futures-util", + "openssl", + "openssl-sys", + "tokio", +] + [[package]] name = "toml" version = "0.8.12" @@ -874,6 +1043,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -882,6 +1077,18 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -948,6 +1155,28 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 59c1258..e3c9483 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" anyhow = "1.0.82" base64 = "0.22.1" chrono = { version = "0.4.38", features = ["serde"] } +clap = { version = "4.5.4", features = ["derive", "cargo"] } futures = "0.3.30" hex = { version = "0.4.3", features = ["serde"] } openssl = "0.10.64" @@ -15,5 +16,7 @@ serde_json = "1.0.116" serde_with = { version = "3.8.1", features = ["base64"] } thiserror = "1.0.59" tokio = { version = "1.37.0", features = ["rt-multi-thread", "fs", "io-util", "net", "sync", "time", "macros", "parking_lot"] } +tokio-openssl = "0.6.4" toml = "0.8.12" tracing = { version = "0.1.40" } +tracing-subscriber = "0.3.18" diff --git a/local.toml b/local.toml new file mode 100644 index 0000000..2f5b32f --- /dev/null +++ b/local.toml @@ -0,0 +1,8 @@ +[targets] +hosts = ["10.24.74.8/24"] +ports = [443, 80, 8443, 636] + +[output] +format = "json" +output_file = "certs.jsonl" +issuer_file = "issuers.jsonl" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index e9b32b1..ff3efea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,12 +7,16 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use futures::future::BoxFuture; +use futures::FutureExt; +use openssl::ssl::{Ssl, SslContext, SslContextBuilder, SslMethod, SslMode, SslOptions, SslVerifyMode}; +use openssl::x509::verify::{X509CheckFlags, X509VerifyFlags}; use serde::{de, Deserialize, Deserializer}; use serde::de::{Error, Unexpected}; use tokio::sync::Semaphore; -use crate::report::JsonConfig; +use crate::report::{JsonConfig, Reporter}; #[derive(Copy, Clone, Debug)] pub enum IpRange { @@ -212,9 +216,10 @@ pub struct TopConfig { pub struct Host { pub ip: IpAddr, - pub semaphore: Arc, + pub parallelism: usize, pub ports: Vec, pub live_port: Option, + pub completed_ports: u16, } /// The subset of config needed by probe tasks. @@ -224,54 +229,90 @@ pub struct ProbeConfig { pub connect_timeout: Duration, pub handshake_timeout: Duration, pub global_semaphore: Arc, + pub reporter: Reporter, + pub ssl_ctx: SslContext, } pub struct Config { pub hosts: Vec, pub probe_config: ProbeConfig, + pub reporting_backend: BoxFuture<'static, ()>, } -pub fn load_config(path: impl AsRef) -> anyhow::Result { - let content = std::fs::read_to_string(path)?; - let top_config = toml::from_str::(&content)?; - let target_config = top_config.targets; - - let host_parallelism = target_config.host_parallelism.unwrap_or(5); // conservative default - // Construct the list of hosts and host/port pairs. - - let mut hosts_seen = HashSet::new(); - let mut hosts = Vec::new(); - let mut probes: usize = 0; - - for range in target_config.hosts { - for ip in range { - if !hosts_seen.insert(ip) { - continue; - } - hosts.push(Host { - ip, - live_port: NonZeroU16::new(target_config.host_live_port), - semaphore: Arc::new(Semaphore::new(host_parallelism)), - ports: target_config.ports.clone(), - }); - probes += target_config.ports.len(); - } +impl TopConfig { + pub fn load(path: impl AsRef) -> anyhow::Result { + let content = std::fs::read_to_string(path)?; + let top_config = toml::from_str::(&content)?; + Ok(top_config) } - // Configure the reporter - let (reporter, backend) = crate::report::configure_backend(top_config.output)?; + pub fn compile(&self) -> anyhow::Result { + let target_config = &self.targets; - let probe_config = ProbeConfig { - retry_count: target_config.retry_count.unwrap_or(1), - connect_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)), - handshake_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)), - // 900 is a sane default in case we're crossing a NAT boundary; if not, this can safely be 100's - // of thousands, depending on system resources. - global_semaphore: Arc::new(Semaphore::new(target_config.global_parallelism.unwrap_or(900))), - }; + let host_parallelism = target_config.host_parallelism.unwrap_or(5); // conservative default + // Construct the list of hosts and host/port pairs. - Ok(Config{ - hosts, - probe_config, - }) + let mut hosts_seen = HashSet::new(); + let mut hosts = Vec::new(); + let mut probes: usize = 0; + + for range in target_config.hosts.iter() { + for ip in *range { + if !hosts_seen.insert(ip) { + continue; + } + hosts.push(Host { + ip, + live_port: NonZeroU16::new(target_config.host_live_port), + parallelism: host_parallelism, + ports: target_config.ports.clone(), + completed_ports: 0, + }); + probes += target_config.ports.len(); + } + } + + // Configure the reporter + let (backend, reporter) = crate::report::configure_backend(&self.output)?; + + // Configure OpenSSL + // This is partially cribbed from openssl::ssl::connectors + let mut ctx = SslContextBuilder::new(SslMethod::tls_client())?; + let opts = SslOptions::ALL | SslOptions::NO_COMPRESSION | SslOptions::NO_SSLV2 | SslOptions::NO_SSLV3; + let opts = opts & !SslOptions::DONT_INSERT_EMPTY_FRAGMENTS; + ctx.set_options(opts); + + ctx.set_mode(SslMode::AUTO_RETRY | SslMode::ACCEPT_MOVING_WRITE_BUFFER | SslMode::ENABLE_PARTIAL_WRITE); + if openssl::version::number() >= 0x1_00_01_08_0 { + ctx.set_mode(SslMode::RELEASE_BUFFERS); + } + + // This is insecure, but this insecurity is the entire point of this program. We want to catch unknown roots and self-signed certs + ctx.set_verify(SslVerifyMode::NONE); + { + let param = ctx.verify_param_mut(); + param.set_flags(X509VerifyFlags::CRL_CHECK_ALL)?; + param.set_hostflags(X509CheckFlags::NEVER_CHECK_SUBJECT); + param.set_host("")?; + }; + ctx.set_default_verify_paths()?; + + let probe_config = ProbeConfig { + retry_count: target_config.retry_count.unwrap_or(1), + connect_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)), + handshake_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)), + // 900 is a sane default in case we're crossing a NAT boundary; if not, this can safely be 100's + // of thousands, depending on system resources. + global_semaphore: Arc::new(Semaphore::new(target_config.global_parallelism.unwrap_or(900))), + ssl_ctx: ctx.build(), + reporter, + }; + + Ok(Config{ + hosts, + probe_config, + reporting_backend: backend.boxed(), + }) + + } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 76ecdcb..13fa659 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,57 @@ +use std::path::PathBuf; +use std::time::Duration; +use clap::Parser; +use tokio::time::Instant; +use tracing::Level; use ascertain::{report, config}; pub mod scanner; +/// Tool to scan a subnet for visible TLS certificates +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Cli { + #[arg(short,long)] + config: PathBuf, +} #[tokio::main] async fn main() -> anyhow::Result<()> { - eprintln!("{}", std::mem::size_of::()); + openssl::init(); + let args = Cli::parse(); + + let config = config::TopConfig::load(args.config)?; + + // Set up logging + tracing_subscriber::fmt().with_ansi(true) + .init(); + + // TODO: add args to twiddle the config + let config = config.compile()?; + + // Start the reporting backend. + let reporting_backend = tokio::task::spawn(config.reporting_backend); + let start_time = Instant::now(); + let scanner = scanner::Scanner::start(config.probe_config, config.hosts); + + while scanner.is_running() { + tokio::time::sleep(Duration::from_secs(1)).await; + if let Ok(stats) = scanner.get_stats().await { + eprint!("\rHosts (F/C/T): {}/{}/{} Probes (F/C/T): {}/{}/{}", + stats.aborted_hosts, stats.completed_hosts, stats.total_hosts, + stats.aborted_probes, stats.completed_probes, stats.total_probes, + ); + } + } + + let stats = scanner.join().await??; + eprint!("\rHosts (F/C/T): {}/{}/{} Probes (F/C/T): {}/{}/{} COMPLETE", + stats.aborted_hosts, stats.completed_hosts, stats.total_hosts, + stats.aborted_probes, stats.completed_probes, stats.total_probes, + ); + + reporting_backend.await?; + eprintln!("Completed in {:?}", Instant::now() - start_time); + Ok(()) } diff --git a/src/report.rs b/src/report.rs index 3563437..889d27b 100644 --- a/src/report.rs +++ b/src/report.rs @@ -19,7 +19,7 @@ use thiserror::Error; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::{mpsc, RwLock}; use tokio::sync::mpsc::Sender; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use crate::config::OutputFormat; @@ -76,9 +76,15 @@ pub struct CertInfo { pub authority_key_id: Vec, #[serde(with="hex")] pub subject_key_id: Vec, - + pub verification_state: VerificationState, } +#[derive(Serialize, Debug, Copy, Clone, Eq, PartialEq)] +pub enum VerificationState { + Unknown, + Valid, + Invalid, +} fn asn1time_to_datetime(date: &Asn1TimeRef) -> anyhow::Result> { let res = Asn1Time::from_unix(0).unwrap().diff(date)?; let timestamp = res.days as i64 * 86400 + res.secs as i64; @@ -120,7 +126,8 @@ impl CertInfo { key_type: describe_key(data.public_key()?.as_ref()), signature_type: data.signature_algorithm().object().nid().short_name()?.to_owned(), authority_key_id: data.authority_key_id().map_or(Vec::new(), |id| id.as_slice().to_vec()), - subject_key_id: data.subject_key_id().map_or(Vec::new(), |id| id.as_slice().to_vec()) + subject_key_id: data.subject_key_id().map_or(Vec::new(), |id| id.as_slice().to_vec()), + verification_state: VerificationState::Unknown, }) } } @@ -222,6 +229,7 @@ impl Reporter { } pub async fn report_probe(&self, report: ProbeReport) -> Result<(), ReportingError> { + info!(ip=%report.host, "Received report"); if self.report_chan.send(report).await.is_err() { error!("Report formatter has exited early"); Err(ReportingError::ReportFormatterFailed) @@ -231,12 +239,12 @@ impl Reporter { } } -fn start_json(config: JsonConfig) -> anyhow::Result<(impl Future+Send, Reporter)> { +fn start_json(config: &JsonConfig) -> anyhow::Result<(impl Future+Send, Reporter)> { let (issuer_send, mut issuer_recv) = mpsc::channel::(5); let (report_send, mut report_recv) = mpsc::channel(5); - let report_file = tokio::fs::File::from_std(std::fs::File::create(config.output_file)?); - let issuer_writer = config.issuer_file.map(std::fs::File::create).transpose()?.map(tokio::fs::File::from_std); + let report_file = tokio::fs::File::from_std(std::fs::File::create(&config.output_file)?); + let issuer_writer = config.issuer_file.as_ref().map(std::fs::File::create).transpose()?.map(tokio::fs::File::from_std); let has_issuer = issuer_writer.is_some(); let container = config.container; let issuer_fut = async move { @@ -313,7 +321,7 @@ fn start_json(config: JsonConfig) -> anyhow::Result<(impl Future+Send /// Configure the reporting backend -pub(crate) fn configure_backend(config: OutputFormat) -> anyhow::Result<(impl Future+Send, Reporter)> { +pub(crate) fn configure_backend(config: &OutputFormat) -> anyhow::Result<(impl Future+Send, Reporter)> { match config { OutputFormat::Json(json) => start_json(json) } diff --git a/src/scanner.rs b/src/scanner.rs index 767dbdb..d99626a 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -1,4 +1,301 @@ -use crate::config::Config; +use std::future::ready; +use std::io::ErrorKind; +use std::net::{IpAddr, SocketAddr}; +use std::os::linux::raw::stat; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; -pub fn scan(config: Config) { +use anyhow::bail; +use futures::{pin_mut, StreamExt}; +use futures::FutureExt; +use futures::stream::FuturesUnordered; +use openssl::ssl::Ssl; +use openssl::x509::X509VerifyResult; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; +use tokio_openssl::SslStream; +use tracing::{error, info, trace, warn}; + +use ascertain::config::{Host, ProbeConfig}; +use ascertain::report::{CertInfo, ProbeReport, ReportError, ReportPayload, VerificationState}; + +// TODO: determine whether it's faster to send a message +struct StatsInternal { + total_hosts: usize, + total_probes: usize, + completed_hosts: AtomicUsize, + completed_probes: AtomicUsize, + aborted_probes: AtomicUsize, + aborted_hosts: AtomicUsize, +} + +pub struct Stats { + pub total_hosts: usize, + pub total_probes: usize, + pub completed_hosts: usize, + pub completed_probes: usize, + pub aborted_probes: usize, + pub aborted_hosts: usize, +} + +impl StatsInternal { + fn make_public(&self) -> Stats { + Stats { + total_hosts: self.total_hosts, + total_probes: self.total_probes, + completed_hosts: self.completed_hosts.load(Ordering::Relaxed), + completed_probes: self.completed_probes.load(Ordering::Relaxed), + aborted_hosts: self.aborted_hosts.load(Ordering::Relaxed), + aborted_probes: self.aborted_probes.load(Ordering::Relaxed), + } + } +} + +enum Command { + GetStats(oneshot::Sender), +} + +struct ScannerBackend { + probe_config: ProbeConfig, + hosts: Vec, + stats: Arc, + command_chan: mpsc::Receiver, +} + +impl ScannerBackend { + + async fn scan_probe(addr: SocketAddr, probe_config: &ProbeConfig, stats: &StatsInternal, do_ssl: bool) -> anyhow::Result<()> { + // serial scan + let scan_date = chrono::Local::now(); + let connection_token = probe_config.global_semaphore.acquire().await?; + + let conn = match tokio::net::TcpStream::connect(addr).await { + Ok(conn) => conn, + Err(error) => match error.kind() { + ErrorKind::ConnectionRefused => bail!(ReportError::ConnectionRefused), + ErrorKind::TimedOut => { + let err = ReportError::ConnectionTimeout; + probe_config.reporter.report_probe(ProbeReport { + host: addr, + scan_date, + result: ReportPayload::Error {msg: err}, + }).await?; + bail!(err); + }, + _ => { + error!(%error, "Connection failed for unexpected reason"); + bail!(error); + } + } + }; + + if !do_ssl { + // That's all we need. Skip the shutdown as we haven't written anything. + info!(%addr, "Ending liveness check"); + return Ok(()) + } + info!(%addr, "Beginning SSL connect"); + + + // Set up SSL context + let ssl = Ssl::new(probe_config.ssl_ctx.as_ref())?; + let ssl = SslStream::new(ssl, conn)?; + pin_mut!(ssl); + select! { + result = ssl.as_mut().connect() => { + if let Err(error) = result { + error!(%error, "SSL handshake failed"); + let err = ReportError::ProtocolError; + probe_config.reporter.report_probe(ProbeReport { + host: addr, + scan_date, + result: ReportPayload::Error {msg: err}, + }).await?; + bail!(err); + } + } + _ = tokio::time::sleep(probe_config.handshake_timeout) => { + let err = ReportError::HandshakeTimeout; + probe_config.reporter.report_probe(ProbeReport { + host: addr, + scan_date, + result: ReportPayload::Error {msg: err}, + }).await?; + bail!(err); + } + } + info!(%addr, "SSL handshake complete"); + + // we have a connection now; grab the SSL cert and chain and report them + let ssl_inner = ssl.ssl(); + if let Some(cert) = ssl_inner.peer_certificate() { + let mut info = CertInfo::extract(cert.as_ref())?; + info.verification_state = if ssl_inner.verify_result() == X509VerifyResult::OK { + VerificationState::Valid + } else { + VerificationState::Invalid + }; + probe_config.reporter.report_probe(ProbeReport { + scan_date, + host: addr, + result: ReportPayload::Success { + certificate: info, + } + }).await?; + } else { + warn!(%addr, "No SSL certificate received"); + } + if let Some(chain) = ssl_inner.peer_cert_chain() { + probe_config.reporter.report_issuers(chain).await; + } + drop(connection_token); + Ok(()) + } + + async fn scan_host_internal(host: &mut Host, probe_config: &ProbeConfig, stats: &StatsInternal) -> anyhow::Result<()> { + // TODO: allow multiple liveness ports (e.g., for scanning both Windows and Linux hosts) + if let Some(port) = host.live_port { + Self::scan_probe(SocketAddr::new(host.ip, port.get()), probe_config, stats, false).await?; + } + // TODO: merge liveness check with SSL check + let mut running_checks = FuturesUnordered::new(); + loop { + tokio::select! { + biased; + // Fill queue if needed + _ = ready(()), if running_checks.len() < host.parallelism && !host.ports.is_empty() => { + let port = host.ports.pop().unwrap(); + let addr = SocketAddr::new(host.ip, port); + running_checks.push(FutureExt::map(Self::scan_probe( + addr, + probe_config, + stats, + true, + ), move |res| (addr, res))); + }, + // Check if probe complete + Some((_addr, _result)) = running_checks.next() => { + stats.completed_probes.fetch_add(1, Ordering::Relaxed); + }, + else => return Ok(()), + } + } + } + + async fn scan_host(mut host: Host, probe_config: ProbeConfig, stats: Arc) -> (IpAddr, anyhow::Result<()>) { + let ip = host.ip; + let results = Self::scan_host_internal(&mut host, &probe_config, &*stats).await; + // remove aborted probes from total + stats.aborted_probes.fetch_add(host.ports.len() - host.completed_ports as usize, Ordering::Relaxed); + (ip, results) + } + + async fn run(mut self) -> anyhow::Result { + let mut tasks = tokio::task::JoinSet::new(); + let host_scale = self.probe_config.global_semaphore.available_permits(); + let host_scale = host_scale + host_scale / 16; // allow an extra 6% to cover for inefficiency in respawning hosts. + + + + // Spawn the initial set of host probes + for _ in 0..host_scale { + if let Some(host) = self.hosts.pop() { + tasks.spawn(Self::scan_host(host, self.probe_config.clone(), self.stats.clone())); + } else { + break + } + } + + loop { + info!(nhosts_remaining=tasks.len(), "Beginning iteration"); + select! { + Some(completion) = tasks.join_next() => { + + match completion { + Ok((ip, result)) => { + self.stats.completed_hosts.fetch_add(1, Ordering::Relaxed); + let error = result.err().map(tracing::field::display); + let success = error.is_none(); + // I'd love to make a scan failure a warning, but that's not an option here. + info!(%ip, success, error, "Host scan complete") + }, + Err(error) => { + self.stats.aborted_hosts.fetch_add(1, Ordering::Relaxed); + error!(%error, "Host scan failed catastrophically") + } + } + if let Some(host) = self.hosts.pop() { + // start the next one, if there is one + // If not, eventually the task set will drain and the branch will be disabled. + tasks.spawn(Self::scan_host(host, self.probe_config.clone(), self.stats.clone())); + } + if self.hosts.is_empty() && tasks.is_empty() { + break; + } + }, + Some(command) = self.command_chan.recv() => { + match command { + Command::GetStats(rchan) => { + rchan.send(self.stats.make_public()).ok(); + } + } + }, + else => { + info!("All scans complete"); + break; + } + } + } + Ok(self.stats.make_public()) + } +} + +pub struct Scanner { + join_handle: JoinHandle>, + command_handle: mpsc::Sender +} + +impl Scanner { + pub fn start(probe_config: ProbeConfig, hosts: Vec) -> Self { + let total_hosts = hosts.len(); + let total_probes = hosts.iter().map(|host| host.ports.len()).sum(); + let stats = Arc::new(StatsInternal { + total_hosts, + total_probes, + completed_probes: AtomicUsize::new(0), + completed_hosts: AtomicUsize::new(0), + aborted_hosts: AtomicUsize::new(0), + aborted_probes: AtomicUsize::new(0), + }); + + let (send_command, recv_command) = mpsc::channel(1); + + let backend = ScannerBackend { + stats, + probe_config, + hosts, + command_chan: recv_command, + }; + + Self { + command_handle: send_command, + join_handle: tokio::task::spawn(backend.run()), + } + } + + pub fn join(self) -> JoinHandle> { + self.join_handle + } + + pub async fn get_stats(&self) -> anyhow::Result { + let (send, recv) = oneshot::channel(); + self.command_handle.send(Command::GetStats(send)).await?; + + recv.await.map_err(Into::into) + } + + pub fn is_running(&self) -> bool { + !self.join_handle.is_finished() + } } \ No newline at end of file