Scanning works, but is an order of magnitude slower than expected

This commit is contained in:
2024-05-04 18:04:48 +02:00
parent ef4b4b7390
commit 856da79166
7 changed files with 685 additions and 52 deletions

View File

@@ -7,12 +7,16 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use futures::future::BoxFuture;
use futures::FutureExt;
use openssl::ssl::{Ssl, SslContext, SslContextBuilder, SslMethod, SslMode, SslOptions, SslVerifyMode};
use openssl::x509::verify::{X509CheckFlags, X509VerifyFlags};
use serde::{de, Deserialize, Deserializer};
use serde::de::{Error, Unexpected};
use tokio::sync::Semaphore;
use crate::report::JsonConfig;
use crate::report::{JsonConfig, Reporter};
#[derive(Copy, Clone, Debug)]
pub enum IpRange {
@@ -212,9 +216,10 @@ pub struct TopConfig {
pub struct Host {
pub ip: IpAddr,
pub semaphore: Arc<Semaphore>,
pub parallelism: usize,
pub ports: Vec<u16>,
pub live_port: Option<NonZeroU16>,
pub completed_ports: u16,
}
/// The subset of config needed by probe tasks.
@@ -224,54 +229,90 @@ pub struct ProbeConfig {
pub connect_timeout: Duration,
pub handshake_timeout: Duration,
pub global_semaphore: Arc<Semaphore>,
pub reporter: Reporter,
pub ssl_ctx: SslContext,
}
pub struct Config {
pub hosts: Vec<Host>,
pub probe_config: ProbeConfig,
pub reporting_backend: BoxFuture<'static, ()>,
}
pub fn load_config(path: impl AsRef<Path>) -> anyhow::Result<Config> {
let content = std::fs::read_to_string(path)?;
let top_config = toml::from_str::<TopConfig>(&content)?;
let target_config = top_config.targets;
let host_parallelism = target_config.host_parallelism.unwrap_or(5); // conservative default
// Construct the list of hosts and host/port pairs.
let mut hosts_seen = HashSet::new();
let mut hosts = Vec::new();
let mut probes: usize = 0;
for range in target_config.hosts {
for ip in range {
if !hosts_seen.insert(ip) {
continue;
}
hosts.push(Host {
ip,
live_port: NonZeroU16::new(target_config.host_live_port),
semaphore: Arc::new(Semaphore::new(host_parallelism)),
ports: target_config.ports.clone(),
});
probes += target_config.ports.len();
}
impl TopConfig {
pub fn load(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let content = std::fs::read_to_string(path)?;
let top_config = toml::from_str::<TopConfig>(&content)?;
Ok(top_config)
}
// Configure the reporter
let (reporter, backend) = crate::report::configure_backend(top_config.output)?;
pub fn compile(&self) -> anyhow::Result<Config> {
let target_config = &self.targets;
let probe_config = ProbeConfig {
retry_count: target_config.retry_count.unwrap_or(1),
connect_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)),
handshake_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)),
// 900 is a sane default in case we're crossing a NAT boundary; if not, this can safely be 100's
// of thousands, depending on system resources.
global_semaphore: Arc::new(Semaphore::new(target_config.global_parallelism.unwrap_or(900))),
};
let host_parallelism = target_config.host_parallelism.unwrap_or(5); // conservative default
// Construct the list of hosts and host/port pairs.
Ok(Config{
hosts,
probe_config,
})
let mut hosts_seen = HashSet::new();
let mut hosts = Vec::new();
let mut probes: usize = 0;
for range in target_config.hosts.iter() {
for ip in *range {
if !hosts_seen.insert(ip) {
continue;
}
hosts.push(Host {
ip,
live_port: NonZeroU16::new(target_config.host_live_port),
parallelism: host_parallelism,
ports: target_config.ports.clone(),
completed_ports: 0,
});
probes += target_config.ports.len();
}
}
// Configure the reporter
let (backend, reporter) = crate::report::configure_backend(&self.output)?;
// Configure OpenSSL
// This is partially cribbed from openssl::ssl::connectors
let mut ctx = SslContextBuilder::new(SslMethod::tls_client())?;
let opts = SslOptions::ALL | SslOptions::NO_COMPRESSION | SslOptions::NO_SSLV2 | SslOptions::NO_SSLV3;
let opts = opts & !SslOptions::DONT_INSERT_EMPTY_FRAGMENTS;
ctx.set_options(opts);
ctx.set_mode(SslMode::AUTO_RETRY | SslMode::ACCEPT_MOVING_WRITE_BUFFER | SslMode::ENABLE_PARTIAL_WRITE);
if openssl::version::number() >= 0x1_00_01_08_0 {
ctx.set_mode(SslMode::RELEASE_BUFFERS);
}
// This is insecure, but this insecurity is the entire point of this program. We want to catch unknown roots and self-signed certs
ctx.set_verify(SslVerifyMode::NONE);
{
let param = ctx.verify_param_mut();
param.set_flags(X509VerifyFlags::CRL_CHECK_ALL)?;
param.set_hostflags(X509CheckFlags::NEVER_CHECK_SUBJECT);
param.set_host("")?;
};
ctx.set_default_verify_paths()?;
let probe_config = ProbeConfig {
retry_count: target_config.retry_count.unwrap_or(1),
connect_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)),
handshake_timeout: Duration::from_secs_f32(target_config.handshake_timeout.unwrap_or(5.)),
// 900 is a sane default in case we're crossing a NAT boundary; if not, this can safely be 100's
// of thousands, depending on system resources.
global_semaphore: Arc::new(Semaphore::new(target_config.global_parallelism.unwrap_or(900))),
ssl_ctx: ctx.build(),
reporter,
};
Ok(Config{
hosts,
probe_config,
reporting_backend: backend.boxed(),
})
}
}

View File

@@ -1,10 +1,57 @@
use std::path::PathBuf;
use std::time::Duration;
use clap::Parser;
use tokio::time::Instant;
use tracing::Level;
use ascertain::{report, config};
pub mod scanner;
/// Tool to scan a subnet for visible TLS certificates
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
#[arg(short,long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
eprintln!("{}", std::mem::size_of::<config::Host>());
openssl::init();
let args = Cli::parse();
let config = config::TopConfig::load(args.config)?;
// Set up logging
tracing_subscriber::fmt().with_ansi(true)
.init();
// TODO: add args to twiddle the config
let config = config.compile()?;
// Start the reporting backend.
let reporting_backend = tokio::task::spawn(config.reporting_backend);
let start_time = Instant::now();
let scanner = scanner::Scanner::start(config.probe_config, config.hosts);
while scanner.is_running() {
tokio::time::sleep(Duration::from_secs(1)).await;
if let Ok(stats) = scanner.get_stats().await {
eprint!("\rHosts (F/C/T): {}/{}/{} Probes (F/C/T): {}/{}/{}",
stats.aborted_hosts, stats.completed_hosts, stats.total_hosts,
stats.aborted_probes, stats.completed_probes, stats.total_probes,
);
}
}
let stats = scanner.join().await??;
eprint!("\rHosts (F/C/T): {}/{}/{} Probes (F/C/T): {}/{}/{} COMPLETE",
stats.aborted_hosts, stats.completed_hosts, stats.total_hosts,
stats.aborted_probes, stats.completed_probes, stats.total_probes,
);
reporting_backend.await?;
eprintln!("Completed in {:?}", Instant::now() - start_time);
Ok(())
}

View File

@@ -19,7 +19,7 @@ use thiserror::Error;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc::Sender;
use tracing::{error, warn};
use tracing::{error, info, warn};
use crate::config::OutputFormat;
@@ -76,9 +76,15 @@ pub struct CertInfo {
pub authority_key_id: Vec<u8>,
#[serde(with="hex")]
pub subject_key_id: Vec<u8>,
pub verification_state: VerificationState,
}
#[derive(Serialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum VerificationState {
Unknown,
Valid,
Invalid,
}
fn asn1time_to_datetime(date: &Asn1TimeRef) -> anyhow::Result<chrono::DateTime<Utc>> {
let res = Asn1Time::from_unix(0).unwrap().diff(date)?;
let timestamp = res.days as i64 * 86400 + res.secs as i64;
@@ -120,7 +126,8 @@ impl CertInfo {
key_type: describe_key(data.public_key()?.as_ref()),
signature_type: data.signature_algorithm().object().nid().short_name()?.to_owned(),
authority_key_id: data.authority_key_id().map_or(Vec::new(), |id| id.as_slice().to_vec()),
subject_key_id: data.subject_key_id().map_or(Vec::new(), |id| id.as_slice().to_vec())
subject_key_id: data.subject_key_id().map_or(Vec::new(), |id| id.as_slice().to_vec()),
verification_state: VerificationState::Unknown,
})
}
}
@@ -222,6 +229,7 @@ impl Reporter {
}
pub async fn report_probe(&self, report: ProbeReport) -> Result<(), ReportingError> {
info!(ip=%report.host, "Received report");
if self.report_chan.send(report).await.is_err() {
error!("Report formatter has exited early");
Err(ReportingError::ReportFormatterFailed)
@@ -231,12 +239,12 @@ impl Reporter {
}
}
fn start_json(config: JsonConfig) -> anyhow::Result<(impl Future<Output=()>+Send, Reporter)> {
fn start_json(config: &JsonConfig) -> anyhow::Result<(impl Future<Output=()>+Send, Reporter)> {
let (issuer_send, mut issuer_recv) = mpsc::channel::<X509>(5);
let (report_send, mut report_recv) = mpsc::channel(5);
let report_file = tokio::fs::File::from_std(std::fs::File::create(config.output_file)?);
let issuer_writer = config.issuer_file.map(std::fs::File::create).transpose()?.map(tokio::fs::File::from_std);
let report_file = tokio::fs::File::from_std(std::fs::File::create(&config.output_file)?);
let issuer_writer = config.issuer_file.as_ref().map(std::fs::File::create).transpose()?.map(tokio::fs::File::from_std);
let has_issuer = issuer_writer.is_some();
let container = config.container;
let issuer_fut = async move {
@@ -313,7 +321,7 @@ fn start_json(config: JsonConfig) -> anyhow::Result<(impl Future<Output=()>+Send
/// Configure the reporting backend
pub(crate) fn configure_backend(config: OutputFormat) -> anyhow::Result<(impl Future<Output=()>+Send, Reporter)> {
pub(crate) fn configure_backend(config: &OutputFormat) -> anyhow::Result<(impl Future<Output=()>+Send, Reporter)> {
match config {
OutputFormat::Json(json) => start_json(json)
}

View File

@@ -1,4 +1,301 @@
use crate::config::Config;
use std::future::ready;
use std::io::ErrorKind;
use std::net::{IpAddr, SocketAddr};
use std::os::linux::raw::stat;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
pub fn scan(config: Config) {
use anyhow::bail;
use futures::{pin_mut, StreamExt};
use futures::FutureExt;
use futures::stream::FuturesUnordered;
use openssl::ssl::Ssl;
use openssl::x509::X509VerifyResult;
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio_openssl::SslStream;
use tracing::{error, info, trace, warn};
use ascertain::config::{Host, ProbeConfig};
use ascertain::report::{CertInfo, ProbeReport, ReportError, ReportPayload, VerificationState};
// TODO: determine whether it's faster to send a message
struct StatsInternal {
total_hosts: usize,
total_probes: usize,
completed_hosts: AtomicUsize,
completed_probes: AtomicUsize,
aborted_probes: AtomicUsize,
aborted_hosts: AtomicUsize,
}
pub struct Stats {
pub total_hosts: usize,
pub total_probes: usize,
pub completed_hosts: usize,
pub completed_probes: usize,
pub aborted_probes: usize,
pub aborted_hosts: usize,
}
impl StatsInternal {
fn make_public(&self) -> Stats {
Stats {
total_hosts: self.total_hosts,
total_probes: self.total_probes,
completed_hosts: self.completed_hosts.load(Ordering::Relaxed),
completed_probes: self.completed_probes.load(Ordering::Relaxed),
aborted_hosts: self.aborted_hosts.load(Ordering::Relaxed),
aborted_probes: self.aborted_probes.load(Ordering::Relaxed),
}
}
}
enum Command {
GetStats(oneshot::Sender<Stats>),
}
struct ScannerBackend {
probe_config: ProbeConfig,
hosts: Vec<Host>,
stats: Arc<StatsInternal>,
command_chan: mpsc::Receiver<Command>,
}
impl ScannerBackend {
async fn scan_probe(addr: SocketAddr, probe_config: &ProbeConfig, stats: &StatsInternal, do_ssl: bool) -> anyhow::Result<()> {
// serial scan
let scan_date = chrono::Local::now();
let connection_token = probe_config.global_semaphore.acquire().await?;
let conn = match tokio::net::TcpStream::connect(addr).await {
Ok(conn) => conn,
Err(error) => match error.kind() {
ErrorKind::ConnectionRefused => bail!(ReportError::ConnectionRefused),
ErrorKind::TimedOut => {
let err = ReportError::ConnectionTimeout;
probe_config.reporter.report_probe(ProbeReport {
host: addr,
scan_date,
result: ReportPayload::Error {msg: err},
}).await?;
bail!(err);
},
_ => {
error!(%error, "Connection failed for unexpected reason");
bail!(error);
}
}
};
if !do_ssl {
// That's all we need. Skip the shutdown as we haven't written anything.
info!(%addr, "Ending liveness check");
return Ok(())
}
info!(%addr, "Beginning SSL connect");
// Set up SSL context
let ssl = Ssl::new(probe_config.ssl_ctx.as_ref())?;
let ssl = SslStream::new(ssl, conn)?;
pin_mut!(ssl);
select! {
result = ssl.as_mut().connect() => {
if let Err(error) = result {
error!(%error, "SSL handshake failed");
let err = ReportError::ProtocolError;
probe_config.reporter.report_probe(ProbeReport {
host: addr,
scan_date,
result: ReportPayload::Error {msg: err},
}).await?;
bail!(err);
}
}
_ = tokio::time::sleep(probe_config.handshake_timeout) => {
let err = ReportError::HandshakeTimeout;
probe_config.reporter.report_probe(ProbeReport {
host: addr,
scan_date,
result: ReportPayload::Error {msg: err},
}).await?;
bail!(err);
}
}
info!(%addr, "SSL handshake complete");
// we have a connection now; grab the SSL cert and chain and report them
let ssl_inner = ssl.ssl();
if let Some(cert) = ssl_inner.peer_certificate() {
let mut info = CertInfo::extract(cert.as_ref())?;
info.verification_state = if ssl_inner.verify_result() == X509VerifyResult::OK {
VerificationState::Valid
} else {
VerificationState::Invalid
};
probe_config.reporter.report_probe(ProbeReport {
scan_date,
host: addr,
result: ReportPayload::Success {
certificate: info,
}
}).await?;
} else {
warn!(%addr, "No SSL certificate received");
}
if let Some(chain) = ssl_inner.peer_cert_chain() {
probe_config.reporter.report_issuers(chain).await;
}
drop(connection_token);
Ok(())
}
async fn scan_host_internal(host: &mut Host, probe_config: &ProbeConfig, stats: &StatsInternal) -> anyhow::Result<()> {
// TODO: allow multiple liveness ports (e.g., for scanning both Windows and Linux hosts)
if let Some(port) = host.live_port {
Self::scan_probe(SocketAddr::new(host.ip, port.get()), probe_config, stats, false).await?;
}
// TODO: merge liveness check with SSL check
let mut running_checks = FuturesUnordered::new();
loop {
tokio::select! {
biased;
// Fill queue if needed
_ = ready(()), if running_checks.len() < host.parallelism && !host.ports.is_empty() => {
let port = host.ports.pop().unwrap();
let addr = SocketAddr::new(host.ip, port);
running_checks.push(FutureExt::map(Self::scan_probe(
addr,
probe_config,
stats,
true,
), move |res| (addr, res)));
},
// Check if probe complete
Some((_addr, _result)) = running_checks.next() => {
stats.completed_probes.fetch_add(1, Ordering::Relaxed);
},
else => return Ok(()),
}
}
}
async fn scan_host(mut host: Host, probe_config: ProbeConfig, stats: Arc<StatsInternal>) -> (IpAddr, anyhow::Result<()>) {
let ip = host.ip;
let results = Self::scan_host_internal(&mut host, &probe_config, &*stats).await;
// remove aborted probes from total
stats.aborted_probes.fetch_add(host.ports.len() - host.completed_ports as usize, Ordering::Relaxed);
(ip, results)
}
async fn run(mut self) -> anyhow::Result<Stats> {
let mut tasks = tokio::task::JoinSet::new();
let host_scale = self.probe_config.global_semaphore.available_permits();
let host_scale = host_scale + host_scale / 16; // allow an extra 6% to cover for inefficiency in respawning hosts.
// Spawn the initial set of host probes
for _ in 0..host_scale {
if let Some(host) = self.hosts.pop() {
tasks.spawn(Self::scan_host(host, self.probe_config.clone(), self.stats.clone()));
} else {
break
}
}
loop {
info!(nhosts_remaining=tasks.len(), "Beginning iteration");
select! {
Some(completion) = tasks.join_next() => {
match completion {
Ok((ip, result)) => {
self.stats.completed_hosts.fetch_add(1, Ordering::Relaxed);
let error = result.err().map(tracing::field::display);
let success = error.is_none();
// I'd love to make a scan failure a warning, but that's not an option here.
info!(%ip, success, error, "Host scan complete")
},
Err(error) => {
self.stats.aborted_hosts.fetch_add(1, Ordering::Relaxed);
error!(%error, "Host scan failed catastrophically")
}
}
if let Some(host) = self.hosts.pop() {
// start the next one, if there is one
// If not, eventually the task set will drain and the branch will be disabled.
tasks.spawn(Self::scan_host(host, self.probe_config.clone(), self.stats.clone()));
}
if self.hosts.is_empty() && tasks.is_empty() {
break;
}
},
Some(command) = self.command_chan.recv() => {
match command {
Command::GetStats(rchan) => {
rchan.send(self.stats.make_public()).ok();
}
}
},
else => {
info!("All scans complete");
break;
}
}
}
Ok(self.stats.make_public())
}
}
pub struct Scanner {
join_handle: JoinHandle<anyhow::Result<Stats>>,
command_handle: mpsc::Sender<Command>
}
impl Scanner {
pub fn start(probe_config: ProbeConfig, hosts: Vec<Host>) -> Self {
let total_hosts = hosts.len();
let total_probes = hosts.iter().map(|host| host.ports.len()).sum();
let stats = Arc::new(StatsInternal {
total_hosts,
total_probes,
completed_probes: AtomicUsize::new(0),
completed_hosts: AtomicUsize::new(0),
aborted_hosts: AtomicUsize::new(0),
aborted_probes: AtomicUsize::new(0),
});
let (send_command, recv_command) = mpsc::channel(1);
let backend = ScannerBackend {
stats,
probe_config,
hosts,
command_chan: recv_command,
};
Self {
command_handle: send_command,
join_handle: tokio::task::spawn(backend.run()),
}
}
pub fn join(self) -> JoinHandle<anyhow::Result<Stats>> {
self.join_handle
}
pub async fn get_stats(&self) -> anyhow::Result<Stats> {
let (send, recv) = oneshot::channel();
self.command_handle.send(Command::GetStats(send)).await?;
recv.await.map_err(Into::into)
}
pub fn is_running(&self) -> bool {
!self.join_handle.is_finished()
}
}