diff --git a/Cargo.lock b/Cargo.lock index 046d140..c46b4cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -643,6 +643,7 @@ dependencies = [ "bytes", "d3270-common", "futures", + "lazy_static", "rand 0.8.5", "serde", "serde_json", @@ -1119,6 +1120,16 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f" +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.17" @@ -1219,6 +1230,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "smallvec 1.10.0", + "windows-sys 0.45.0", +] + [[package]] name = "percent-encoding" version = "2.2.0" @@ -1395,6 +1429,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.8.1" @@ -1462,6 +1505,12 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "scratch" version = "1.0.5" @@ -1633,6 +1682,12 @@ dependencies = [ "maybe-uninit", ] +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + [[package]] name = "socket2" version = "0.4.9" @@ -1889,8 +1944,10 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite 0.2.9", "signal-hook-registry", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -1996,7 +2053,7 @@ dependencies = [ "matchers", "owning_ref", "regex", - "smallvec", + "smallvec 0.6.14", "tracing-core", "tracing-log", ] diff --git a/d3270d/Cargo.toml b/d3270d/Cargo.toml index ed5e544..7bf24c7 100644 --- a/d3270d/Cargo.toml +++ b/d3270d/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" serde = { version = "1.0.162", features = ["derive"]} serde_json = "1.0.96" anyhow = "1.0.71" -tokio = { version = "1.28.0", features = ["rt", "macros", "sync", "process", "rt-multi-thread", "bytes", "io-util"] } +tokio = { version = "1.28.0", features = ["full"] } tide = "0.16.0" tide-websockets = "0.4.0" d3270-common = {path = "../d3270-common"} @@ -19,4 +19,5 @@ tracing-fmt = "0.1.1" futures = "0.3.28" tokio-stream = { version = "0.1.14", features = ["sync"] } rand = "0.8.5" -base64 = "0.21.0" \ No newline at end of file +base64 = "0.21.0" +lazy_static = "1.4.0" \ No newline at end of file diff --git a/d3270d/src/arbiter.rs b/d3270d/src/arbiter.rs index 0e0c932..29887ac 100644 --- a/d3270d/src/arbiter.rs +++ b/d3270d/src/arbiter.rs @@ -1,23 +1,24 @@ -use anyhow::anyhow; -use base64::engine::general_purpose::STANDARD as B64_STANDARD; -use base64::Engine; -use bytes::Buf; -use d3270_common::b3270::indication::RunResult; -use d3270_common::b3270::operation::Action; -use d3270_common::b3270::{operation, Indication, Operation}; -use d3270_common::tracker::{Disposition, Tracker}; -use futures::future::BoxFuture; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; -use rand::RngCore; use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; + +use anyhow::anyhow; +use base64::engine::general_purpose::STANDARD as B64_STANDARD; +use base64::Engine; +use bytes::Buf; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; +use rand::RngCore; use tokio::io::{AsyncBufReadExt, AsyncWrite, BufReader, Lines}; use tokio::process::{Child, ChildStdout}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; -use tracing::{error, info, warn}; +use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; + +use d3270_common::b3270::indication::RunResult; +use d3270_common::b3270::operation::Action; +use d3270_common::b3270::{operation, Indication, Operation}; +use d3270_common::tracker::{Disposition, Tracker}; enum B3270Request { Action(Vec, oneshot::Sender), @@ -32,7 +33,7 @@ enum HandleReceiveState { broadcast::Receiver, ), TryRestart( - BoxFuture<'static, Result<(), ()>>, + Pin> + Send + Sync>>, oneshot::Receiver<(Vec, broadcast::Receiver)>, ), } @@ -48,6 +49,8 @@ impl ArbiterHandle { ) -> anyhow::Result> { self.send_actions(vec![action]).await } + + #[instrument(skip(self))] pub async fn send_actions( &self, actions: Vec, @@ -107,8 +110,11 @@ impl Stream for ArbiterHandle { self.receiver = Some(HandleReceiveState::Steady(rcvr)); return Poll::Ready(Some(msg)); } - Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => { - warn!("Dropped messages from b3270 server; starting resync"); + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => { + info!( + dropped = n, + "Dropped messages from b3270 server; starting resync" + ); let (os_snd, os_rcv) = oneshot::channel(); let fut = self .sender @@ -117,9 +123,9 @@ impl Stream for ArbiterHandle { .map_ok(move |permit| { permit.send(B3270Request::Resync(os_snd)); }) - .map_err(|_| ()) - .boxed(); - self.receiver = Some(HandleReceiveState::TryRestart(fut, os_rcv)); + .map_err(|_| ()); + + self.receiver = Some(HandleReceiveState::TryRestart(Box::pin(fut), os_rcv)); } Poll::Ready(None) => { warn!("Failed to receive from b3270 server"); @@ -140,9 +146,10 @@ impl Stream for ArbiterHandle { } #[derive(Clone)] -pub struct B3270HandleRequester(mpsc::Sender); +pub struct ArbiterHandleRequester(mpsc::Sender); -impl B3270HandleRequester { +impl ArbiterHandleRequester { + #[instrument(skip(self))] pub async fn connect(&self) -> anyhow::Result { let (conn_send, conn_rcv) = oneshot::channel(); self.0 @@ -172,7 +179,11 @@ pub struct B3270 { impl B3270 { pub fn spawn( mut child: Child, - ) -> (tokio::task::JoinHandle, B3270HandleRequester) { + initial_actions: &[Action], + ) -> ( + tokio::task::JoinHandle, + ArbiterHandleRequester, + ) { let (subproc_snd, subproc_rcv) = mpsc::channel(10); let child_reader = child .stdout @@ -181,16 +192,27 @@ impl B3270 { let child_reader = BufReader::new(child_reader).lines(); // A single connect can result in a flurry of messages, so we need a big buffer let (ind_chan, _) = broadcast::channel(100); + + let mut write_buf = VecDeque::new(); + // Queue any initial actions. + for action in initial_actions { + serde_json::to_writer(&mut write_buf, action).unwrap(); + write_buf.push_back(b'\n'); + } + let proc = B3270 { child, child_reader, tracker: Tracker::default(), comm: subproc_rcv, ind_chan, - write_buf: VecDeque::new(), + write_buf, action_response_map: Default::default(), }; - (tokio::task::spawn(proc), B3270HandleRequester(subproc_snd)) + ( + tokio::task::spawn(proc.instrument(info_span!("arbiter"))), + ArbiterHandleRequester(subproc_snd), + ) } } @@ -204,7 +226,10 @@ impl Future for B3270 { while let Poll::Ready(buf) = Pin::new(&mut self.child_reader).poll_next_line(cx) { match buf { Ok(Some(line)) => match serde_json::from_str(&line) { - Ok(ind) => indications.push(ind), + Ok(ind) => { + trace!(json = line, "Received indication"); + indications.push(ind) + } Err(error) => { warn!(%error, msg=line, "Failed to parse indication"); } @@ -276,9 +301,10 @@ impl Future for B3270 { type_: Some("keymap".to_owned()), actions, }); - let result = serde_json::to_writer(&mut self.write_buf, &op); - match result { - Ok(()) => { + match serde_json::to_string(&op) { + Ok(op_str) => { + trace!(json = op_str, "Sending operation"); + self.write_buf.extend(op_str.bytes()); self.write_buf.push_back(b'\n'); self.action_response_map.insert(tag, response_chan); } diff --git a/d3270d/src/gen_connection.rs b/d3270d/src/gen_connection.rs new file mode 100644 index 0000000..a25eb1a --- /dev/null +++ b/d3270d/src/gen_connection.rs @@ -0,0 +1,87 @@ +use crate::arbiter::{ArbiterHandle, ArbiterHandleRequester}; +use d3270_common::b3270::indication::RunResult; +use d3270_common::b3270::operation::Run; +use d3270_common::b3270::{Indication, Operation}; +use futures::stream::FuturesUnordered; +use futures::stream::StreamExt; +use futures::FutureExt; +use std::future::{poll_fn, Future}; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; +use tokio::sync::oneshot; +use tracing::warn; + +pub struct GenConnection { + handle: ArbiterHandle, + waiting_actions: FuturesUnordered, +} + +struct ReplaceTag { + tag: Option, + rcvr: oneshot::Receiver, +} + +impl Future for ReplaceTag { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(ready!(self.rcvr.poll_unpin(cx)).ok().map(|run_res| { + Indication::RunResult(RunResult { + r_tag: self.tag.take(), + ..run_res + }) + })) + } +} + +impl GenConnection { + pub async fn new(ahr: ArbiterHandleRequester) -> anyhow::Result { + let handle = ahr.connect().await?; + Ok(Self { + handle, + waiting_actions: FuturesUnordered::new(), + }) + } + + pub async fn handle_client_line(&mut self, line: String) -> anyhow::Result<()> { + let op = serde_json::from_str(&line)?; + match op { + Operation::Run(Run { actions, r_tag, .. }) => { + let rcvr = self.handle.send_actions(actions).await?; + self.waiting_actions.push(ReplaceTag { tag: r_tag, rcvr }); + } + _ => warn!(json = line, "Unsupported operation from client"), + } + Ok(()) + } + + pub fn poll_indication(&mut self, cx: &mut Context) -> Poll> { + let mut any_can_continue = false; + + match self.waiting_actions.poll_next_unpin(cx) { + Poll::Ready(Some(ind)) => { + return Poll::Ready(ind); + } + Poll::Ready(None) => {} + Poll::Pending => any_can_continue = true, + } + + match self.handle.poll_next_unpin(cx) { + Poll::Ready(Some(ind)) => { + return Poll::Ready(Some(ind)); + } + Poll::Ready(None) => {} + Poll::Pending => any_can_continue = true, + } + + if any_can_continue { + Poll::Pending + } else { + Poll::Ready(None) + } + } + + pub async fn next_indication(&mut self) -> Option { + poll_fn(|cx| self.poll_indication(cx)).await + } +} diff --git a/d3270d/src/main.rs b/d3270d/src/main.rs index 5a9d55d..4703c4d 100644 --- a/d3270d/src/main.rs +++ b/d3270d/src/main.rs @@ -1,13 +1,55 @@ -use anyhow::anyhow; use std::ffi::OsString; +use std::future::Future; +use std::pin::Pin; use std::process::Stdio; use std::str::FromStr; +use std::task::{ready, Context, Poll}; + +use anyhow::anyhow; +use futures::future::select_all; +use futures::FutureExt; +use tokio::task::JoinHandle; +use tracing::error; + +use d3270_common::b3270::operation::Action; + +pub mod arbiter; +pub mod gen_connection; +pub mod tcp_server; + +struct TaggedJoinHandle { + handle: JoinHandle, + tag: &'static str, +} + +impl Future for TaggedJoinHandle { + type Output = (&'static str, anyhow::Error); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let err = match ready!(self.handle.poll_unpin(cx)) { + Ok(err) => err, + Err(err) => err.into(), + }; + Poll::Ready((self.tag, err)) + } +} + +trait JoinHandleTagExt { + fn tagged(self, tag: &'static str) -> TaggedJoinHandle; +} + +impl JoinHandleTagExt for JoinHandle { + fn tagged(self, tag: &'static str) -> TaggedJoinHandle { + TaggedJoinHandle { handle: self, tag } + } +} #[tokio::main] async fn main() -> anyhow::Result<()> { let mut subprocess_args = vec![OsString::from_str("-json").unwrap()]; let mut args_iter = std::env::args_os().peekable(); let mut connect_str = None; + let mut tcp_listen = None; while let Some(arg) = args_iter.next() { // we default to one of the ignored args match arg.to_str().unwrap_or("-json") { @@ -26,6 +68,16 @@ async fn main() -> anyhow::Result<()> { }) .map(Some)?; } + "-tcp-listen" => { + tcp_listen = args_iter + .next() + .ok_or_else(|| anyhow!("Arg required for -tcp-listen"))? + .into_string() + .map_err(|_| anyhow!("Failed to parse tcp-listen address"))? + .parse() + .map(Some) + .map_err(|_| anyhow!("Invalid listen address"))?; + } "-e" => { 'skip: while let Some(arg) = args_iter.peek() { if arg.to_str().unwrap_or("").starts_with("-") { @@ -38,7 +90,7 @@ async fn main() -> anyhow::Result<()> { } } - let _connect_str = connect_str.ok_or_else(|| anyhow!("No connect string given"))?; + let connect_str = connect_str.ok_or_else(|| anyhow!("No connect string given"))?; let subproc = tokio::process::Command::new("b3270") .args(&subprocess_args) @@ -46,10 +98,22 @@ async fn main() -> anyhow::Result<()> { .stdout(Stdio::piped()) .spawn()?; - let (_server, _server_req) = arbiter::B3270::spawn(subproc); - // TODO: make connection before starting listeners + let mut handles: Vec = vec![]; + + let (arbiter, arbiter_req) = arbiter::B3270::spawn( + subproc, + &[Action { + action: "Connect".to_owned(), + args: vec![connect_str], + }], + ); + handles.push(arbiter.tagged("arbiter")); + if let Some(addr) = tcp_listen { + let tcp_listener = tcp_server::listener_proc(addr, arbiter_req.clone()).await?; + handles.push(tcp_listener.tagged("tcp_listener")); + } + let ((source, error), _, _) = select_all(handles).await; + error!(source, %error, "A core task failed"); Ok(()) } - -pub mod arbiter; diff --git a/d3270d/src/tcp_server.rs b/d3270d/src/tcp_server.rs new file mode 100644 index 0000000..b2496bb --- /dev/null +++ b/d3270d/src/tcp_server.rs @@ -0,0 +1,78 @@ +use std::net::SocketAddr; + +use anyhow::bail; +use futures::never::Never; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::task::JoinHandle; +use tracing::{error, info, info_span, Instrument}; + +use crate::arbiter::ArbiterHandleRequester; +use crate::gen_connection::GenConnection; + +pub async fn listener_proc( + socket: SocketAddr, + handle_requester: ArbiterHandleRequester, +) -> anyhow::Result> { + let listener = tokio::net::TcpListener::bind(socket.clone()).await?; + let span = info_span!(target: "connection-handling", "tcp_listener", addr=%socket); + Ok(tokio::spawn( + async move { + let error = listener_task(listener, handle_requester).await.unwrap_err(); + error!(%error, "TCP listener failed to accept"); + error + } + .instrument(span), + )) +} + +async fn listener_task( + listener: TcpListener, + handle_requester: ArbiterHandleRequester, +) -> anyhow::Result { + loop { + let (conn, client_addr) = listener.accept().await?; + let handle_requester = handle_requester.clone(); + let conn_span = + info_span!(target: "connection-handling", "tcp_accept", client=%client_addr); + tokio::spawn( + async move { + info!("Accepted connection"); + if let Err(error) = handle_tcp_connection(conn, handle_requester).await { + error!(%error, "Connection handler failed"); + } else { + info!("Connection closed"); + } + } + .instrument(conn_span), + ); + } +} + +async fn handle_tcp_connection( + mut conn: TcpStream, + handle_requester: ArbiterHandleRequester, +) -> anyhow::Result<()> { + let (stream_rd, mut stream_wr) = conn.split(); + let mut stream_rd = BufReader::new(stream_rd).lines(); + + let mut conn = GenConnection::new(handle_requester).await?; + + loop { + select! { + line = stream_rd.next_line() => match line? { + Some(line) => conn.handle_client_line(line).await?, + None => bail!("Connection closed"), + }, + ind = conn.next_indication() => match ind { + None => bail!("Arbiter lost"), + Some(ind) => { + let mut ind = serde_json::to_vec(&ind)?; + ind.push(b'\n'); + stream_wr.write_all(ind.as_slice()).await?; + } + }, + } + } +}