Implemented TCP listener

This commit is contained in:
2023-05-11 15:43:00 +02:00
parent 0352f64c0f
commit 49c30aad34
6 changed files with 349 additions and 36 deletions

59
Cargo.lock generated
View File

@@ -643,6 +643,7 @@ dependencies = [
"bytes",
"d3270-common",
"futures",
"lazy_static",
"rand 0.8.5",
"serde",
"serde_json",
@@ -1119,6 +1120,16 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f"
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
@@ -1219,6 +1230,29 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"smallvec 1.10.0",
"windows-sys 0.45.0",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@@ -1395,6 +1429,15 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "regex"
version = "1.8.1"
@@ -1462,6 +1505,12 @@ version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.5"
@@ -1633,6 +1682,12 @@ dependencies = [
"maybe-uninit",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "socket2"
version = "0.4.9"
@@ -1889,8 +1944,10 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite 0.2.9",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
@@ -1996,7 +2053,7 @@ dependencies = [
"matchers",
"owning_ref",
"regex",
"smallvec",
"smallvec 0.6.14",
"tracing-core",
"tracing-log",
]

View File

@@ -9,7 +9,7 @@ edition = "2021"
serde = { version = "1.0.162", features = ["derive"]}
serde_json = "1.0.96"
anyhow = "1.0.71"
tokio = { version = "1.28.0", features = ["rt", "macros", "sync", "process", "rt-multi-thread", "bytes", "io-util"] }
tokio = { version = "1.28.0", features = ["full"] }
tide = "0.16.0"
tide-websockets = "0.4.0"
d3270-common = {path = "../d3270-common"}
@@ -19,4 +19,5 @@ tracing-fmt = "0.1.1"
futures = "0.3.28"
tokio-stream = { version = "0.1.14", features = ["sync"] }
rand = "0.8.5"
base64 = "0.21.0"
base64 = "0.21.0"
lazy_static = "1.4.0"

View File

@@ -1,23 +1,24 @@
use anyhow::anyhow;
use base64::engine::general_purpose::STANDARD as B64_STANDARD;
use base64::Engine;
use bytes::Buf;
use d3270_common::b3270::indication::RunResult;
use d3270_common::b3270::operation::Action;
use d3270_common::b3270::{operation, Indication, Operation};
use d3270_common::tracker::{Disposition, Tracker};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use rand::RngCore;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::anyhow;
use base64::engine::general_purpose::STANDARD as B64_STANDARD;
use base64::Engine;
use bytes::Buf;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use rand::RngCore;
use tokio::io::{AsyncBufReadExt, AsyncWrite, BufReader, Lines};
use tokio::process::{Child, ChildStdout};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{error, info, warn};
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
use d3270_common::b3270::indication::RunResult;
use d3270_common::b3270::operation::Action;
use d3270_common::b3270::{operation, Indication, Operation};
use d3270_common::tracker::{Disposition, Tracker};
enum B3270Request {
Action(Vec<Action>, oneshot::Sender<RunResult>),
@@ -32,7 +33,7 @@ enum HandleReceiveState {
broadcast::Receiver<Indication>,
),
TryRestart(
BoxFuture<'static, Result<(), ()>>,
Pin<Box<dyn Future<Output = Result<(), ()>> + Send + Sync>>,
oneshot::Receiver<(Vec<Indication>, broadcast::Receiver<Indication>)>,
),
}
@@ -48,6 +49,8 @@ impl ArbiterHandle {
) -> anyhow::Result<oneshot::Receiver<RunResult>> {
self.send_actions(vec![action]).await
}
#[instrument(skip(self))]
pub async fn send_actions(
&self,
actions: Vec<Action>,
@@ -107,8 +110,11 @@ impl Stream for ArbiterHandle {
self.receiver = Some(HandleReceiveState::Steady(rcvr));
return Poll::Ready(Some(msg));
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => {
warn!("Dropped messages from b3270 server; starting resync");
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
info!(
dropped = n,
"Dropped messages from b3270 server; starting resync"
);
let (os_snd, os_rcv) = oneshot::channel();
let fut = self
.sender
@@ -117,9 +123,9 @@ impl Stream for ArbiterHandle {
.map_ok(move |permit| {
permit.send(B3270Request::Resync(os_snd));
})
.map_err(|_| ())
.boxed();
self.receiver = Some(HandleReceiveState::TryRestart(fut, os_rcv));
.map_err(|_| ());
self.receiver = Some(HandleReceiveState::TryRestart(Box::pin(fut), os_rcv));
}
Poll::Ready(None) => {
warn!("Failed to receive from b3270 server");
@@ -140,9 +146,10 @@ impl Stream for ArbiterHandle {
}
#[derive(Clone)]
pub struct B3270HandleRequester(mpsc::Sender<B3270Request>);
pub struct ArbiterHandleRequester(mpsc::Sender<B3270Request>);
impl B3270HandleRequester {
impl ArbiterHandleRequester {
#[instrument(skip(self))]
pub async fn connect(&self) -> anyhow::Result<ArbiterHandle> {
let (conn_send, conn_rcv) = oneshot::channel();
self.0
@@ -172,7 +179,11 @@ pub struct B3270 {
impl B3270 {
pub fn spawn(
mut child: Child,
) -> (tokio::task::JoinHandle<anyhow::Error>, B3270HandleRequester) {
initial_actions: &[Action],
) -> (
tokio::task::JoinHandle<anyhow::Error>,
ArbiterHandleRequester,
) {
let (subproc_snd, subproc_rcv) = mpsc::channel(10);
let child_reader = child
.stdout
@@ -181,16 +192,27 @@ impl B3270 {
let child_reader = BufReader::new(child_reader).lines();
// A single connect can result in a flurry of messages, so we need a big buffer
let (ind_chan, _) = broadcast::channel(100);
let mut write_buf = VecDeque::new();
// Queue any initial actions.
for action in initial_actions {
serde_json::to_writer(&mut write_buf, action).unwrap();
write_buf.push_back(b'\n');
}
let proc = B3270 {
child,
child_reader,
tracker: Tracker::default(),
comm: subproc_rcv,
ind_chan,
write_buf: VecDeque::new(),
write_buf,
action_response_map: Default::default(),
};
(tokio::task::spawn(proc), B3270HandleRequester(subproc_snd))
(
tokio::task::spawn(proc.instrument(info_span!("arbiter"))),
ArbiterHandleRequester(subproc_snd),
)
}
}
@@ -204,7 +226,10 @@ impl Future for B3270 {
while let Poll::Ready(buf) = Pin::new(&mut self.child_reader).poll_next_line(cx) {
match buf {
Ok(Some(line)) => match serde_json::from_str(&line) {
Ok(ind) => indications.push(ind),
Ok(ind) => {
trace!(json = line, "Received indication");
indications.push(ind)
}
Err(error) => {
warn!(%error, msg=line, "Failed to parse indication");
}
@@ -276,9 +301,10 @@ impl Future for B3270 {
type_: Some("keymap".to_owned()),
actions,
});
let result = serde_json::to_writer(&mut self.write_buf, &op);
match result {
Ok(()) => {
match serde_json::to_string(&op) {
Ok(op_str) => {
trace!(json = op_str, "Sending operation");
self.write_buf.extend(op_str.bytes());
self.write_buf.push_back(b'\n');
self.action_response_map.insert(tag, response_chan);
}

View File

@@ -0,0 +1,87 @@
use crate::arbiter::{ArbiterHandle, ArbiterHandleRequester};
use d3270_common::b3270::indication::RunResult;
use d3270_common::b3270::operation::Run;
use d3270_common::b3270::{Indication, Operation};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::FutureExt;
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::warn;
pub struct GenConnection {
handle: ArbiterHandle,
waiting_actions: FuturesUnordered<ReplaceTag>,
}
struct ReplaceTag {
tag: Option<String>,
rcvr: oneshot::Receiver<RunResult>,
}
impl Future for ReplaceTag {
type Output = Option<Indication>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.rcvr.poll_unpin(cx)).ok().map(|run_res| {
Indication::RunResult(RunResult {
r_tag: self.tag.take(),
..run_res
})
}))
}
}
impl GenConnection {
pub async fn new(ahr: ArbiterHandleRequester) -> anyhow::Result<Self> {
let handle = ahr.connect().await?;
Ok(Self {
handle,
waiting_actions: FuturesUnordered::new(),
})
}
pub async fn handle_client_line(&mut self, line: String) -> anyhow::Result<()> {
let op = serde_json::from_str(&line)?;
match op {
Operation::Run(Run { actions, r_tag, .. }) => {
let rcvr = self.handle.send_actions(actions).await?;
self.waiting_actions.push(ReplaceTag { tag: r_tag, rcvr });
}
_ => warn!(json = line, "Unsupported operation from client"),
}
Ok(())
}
pub fn poll_indication(&mut self, cx: &mut Context) -> Poll<Option<Indication>> {
let mut any_can_continue = false;
match self.waiting_actions.poll_next_unpin(cx) {
Poll::Ready(Some(ind)) => {
return Poll::Ready(ind);
}
Poll::Ready(None) => {}
Poll::Pending => any_can_continue = true,
}
match self.handle.poll_next_unpin(cx) {
Poll::Ready(Some(ind)) => {
return Poll::Ready(Some(ind));
}
Poll::Ready(None) => {}
Poll::Pending => any_can_continue = true,
}
if any_can_continue {
Poll::Pending
} else {
Poll::Ready(None)
}
}
pub async fn next_indication(&mut self) -> Option<Indication> {
poll_fn(|cx| self.poll_indication(cx)).await
}
}

View File

@@ -1,13 +1,55 @@
use anyhow::anyhow;
use std::ffi::OsString;
use std::future::Future;
use std::pin::Pin;
use std::process::Stdio;
use std::str::FromStr;
use std::task::{ready, Context, Poll};
use anyhow::anyhow;
use futures::future::select_all;
use futures::FutureExt;
use tokio::task::JoinHandle;
use tracing::error;
use d3270_common::b3270::operation::Action;
pub mod arbiter;
pub mod gen_connection;
pub mod tcp_server;
struct TaggedJoinHandle {
handle: JoinHandle<anyhow::Error>,
tag: &'static str,
}
impl Future for TaggedJoinHandle {
type Output = (&'static str, anyhow::Error);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let err = match ready!(self.handle.poll_unpin(cx)) {
Ok(err) => err,
Err(err) => err.into(),
};
Poll::Ready((self.tag, err))
}
}
trait JoinHandleTagExt {
fn tagged(self, tag: &'static str) -> TaggedJoinHandle;
}
impl JoinHandleTagExt for JoinHandle<anyhow::Error> {
fn tagged(self, tag: &'static str) -> TaggedJoinHandle {
TaggedJoinHandle { handle: self, tag }
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut subprocess_args = vec![OsString::from_str("-json").unwrap()];
let mut args_iter = std::env::args_os().peekable();
let mut connect_str = None;
let mut tcp_listen = None;
while let Some(arg) = args_iter.next() {
// we default to one of the ignored args
match arg.to_str().unwrap_or("-json") {
@@ -26,6 +68,16 @@ async fn main() -> anyhow::Result<()> {
})
.map(Some)?;
}
"-tcp-listen" => {
tcp_listen = args_iter
.next()
.ok_or_else(|| anyhow!("Arg required for -tcp-listen"))?
.into_string()
.map_err(|_| anyhow!("Failed to parse tcp-listen address"))?
.parse()
.map(Some)
.map_err(|_| anyhow!("Invalid listen address"))?;
}
"-e" => {
'skip: while let Some(arg) = args_iter.peek() {
if arg.to_str().unwrap_or("").starts_with("-") {
@@ -38,7 +90,7 @@ async fn main() -> anyhow::Result<()> {
}
}
let _connect_str = connect_str.ok_or_else(|| anyhow!("No connect string given"))?;
let connect_str = connect_str.ok_or_else(|| anyhow!("No connect string given"))?;
let subproc = tokio::process::Command::new("b3270")
.args(&subprocess_args)
@@ -46,10 +98,22 @@ async fn main() -> anyhow::Result<()> {
.stdout(Stdio::piped())
.spawn()?;
let (_server, _server_req) = arbiter::B3270::spawn(subproc);
// TODO: make connection before starting listeners
let mut handles: Vec<TaggedJoinHandle> = vec![];
let (arbiter, arbiter_req) = arbiter::B3270::spawn(
subproc,
&[Action {
action: "Connect".to_owned(),
args: vec![connect_str],
}],
);
handles.push(arbiter.tagged("arbiter"));
if let Some(addr) = tcp_listen {
let tcp_listener = tcp_server::listener_proc(addr, arbiter_req.clone()).await?;
handles.push(tcp_listener.tagged("tcp_listener"));
}
let ((source, error), _, _) = select_all(handles).await;
error!(source, %error, "A core task failed");
Ok(())
}
pub mod arbiter;

78
d3270d/src/tcp_server.rs Normal file
View File

@@ -0,0 +1,78 @@
use std::net::SocketAddr;
use anyhow::bail;
use futures::never::Never;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::select;
use tokio::task::JoinHandle;
use tracing::{error, info, info_span, Instrument};
use crate::arbiter::ArbiterHandleRequester;
use crate::gen_connection::GenConnection;
pub async fn listener_proc(
socket: SocketAddr,
handle_requester: ArbiterHandleRequester,
) -> anyhow::Result<JoinHandle<anyhow::Error>> {
let listener = tokio::net::TcpListener::bind(socket.clone()).await?;
let span = info_span!(target: "connection-handling", "tcp_listener", addr=%socket);
Ok(tokio::spawn(
async move {
let error = listener_task(listener, handle_requester).await.unwrap_err();
error!(%error, "TCP listener failed to accept");
error
}
.instrument(span),
))
}
async fn listener_task(
listener: TcpListener,
handle_requester: ArbiterHandleRequester,
) -> anyhow::Result<Never> {
loop {
let (conn, client_addr) = listener.accept().await?;
let handle_requester = handle_requester.clone();
let conn_span =
info_span!(target: "connection-handling", "tcp_accept", client=%client_addr);
tokio::spawn(
async move {
info!("Accepted connection");
if let Err(error) = handle_tcp_connection(conn, handle_requester).await {
error!(%error, "Connection handler failed");
} else {
info!("Connection closed");
}
}
.instrument(conn_span),
);
}
}
async fn handle_tcp_connection(
mut conn: TcpStream,
handle_requester: ArbiterHandleRequester,
) -> anyhow::Result<()> {
let (stream_rd, mut stream_wr) = conn.split();
let mut stream_rd = BufReader::new(stream_rd).lines();
let mut conn = GenConnection::new(handle_requester).await?;
loop {
select! {
line = stream_rd.next_line() => match line? {
Some(line) => conn.handle_client_line(line).await?,
None => bail!("Connection closed"),
},
ind = conn.next_indication() => match ind {
None => bail!("Arbiter lost"),
Some(ind) => {
let mut ind = serde_json::to_vec(&ind)?;
ind.push(b'\n');
stream_wr.write_all(ind.as_slice()).await?;
}
},
}
}
}