Compare commits

..

3 Commits

11 changed files with 546 additions and 91 deletions

126
Cargo.lock generated
View File

@@ -643,6 +643,7 @@ dependencies = [
"bytes",
"d3270-common",
"futures",
"lazy_static",
"rand 0.8.5",
"serde",
"serde_json",
@@ -652,6 +653,7 @@ dependencies = [
"tokio-stream",
"tracing",
"tracing-fmt",
"tracing-subscriber 0.3.17",
]
[[package]]
@@ -1119,6 +1121,16 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f"
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
@@ -1139,6 +1151,15 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "maybe-uninit"
version = "2.0.0"
@@ -1163,6 +1184,16 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -1204,6 +1235,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "owning_ref"
version = "0.4.1"
@@ -1219,6 +1256,29 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"smallvec 1.10.0",
"windows-sys 0.45.0",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@@ -1395,6 +1455,15 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "regex"
version = "1.8.1"
@@ -1462,6 +1531,12 @@ version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.5"
@@ -1587,6 +1662,15 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook"
version = "0.3.15"
@@ -1633,6 +1717,12 @@ dependencies = [
"maybe-uninit",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "socket2"
version = "0.4.9"
@@ -1773,6 +1863,16 @@ dependencies = [
"syn 2.0.15",
]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "tide"
version = "0.16.0"
@@ -1889,8 +1989,10 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite 0.2.9",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
@@ -1970,7 +2072,7 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "880547feb88739526e322a366be2411c41c797f0dabcddfe99a3216e5a664f71"
dependencies = [
"tracing-subscriber",
"tracing-subscriber 0.1.6",
]
[[package]]
@@ -1993,10 +2095,28 @@ dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers",
"matchers 0.0.1",
"owning_ref",
"regex",
"smallvec",
"smallvec 0.6.14",
"tracing-core",
"tracing-log",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers 0.1.0",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec 1.10.0",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]

View File

@@ -11,6 +11,7 @@ pub mod operation;
pub mod types;
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all="kebab-case")]
pub enum Indication {
Bell {}, // TODO: make sure this emits/parses {"bell": {}}
/// Indicates that the host connection has changed state.
@@ -61,8 +62,6 @@ pub enum Indication {
Setting(Setting),
/// I/O statistics
Stats(Stats),
/// Reports the terminal name sent to the host during TELNET negotiation
TerminalName(TerminalName),
/// Change in the scrollbar thumb
Thumb(Thumb),
/// Indicates the name of the trace file
@@ -101,6 +100,8 @@ pub enum InitializeIndication {
ScreenMode(ScreenMode),
/// Setting changed
Setting(Setting),
/// Reports the terminal name sent to the host during TELNET negotiation
TerminalName(TerminalName),
/// Scroll thumb position
Thumb(Thumb),
/// Indicates build-time TLS config

View File

@@ -61,12 +61,12 @@ pub enum ConnectionState {
TelnetPending,
ConnectedNvt,
ConnectedNvtCharmode,
#[serde(rename="connected-3270")]
Connected3270,
ConnectedUnbound,
ConnectedENvt,
ConnectedESscp,
#[serde(rename = "connected-e-tn3270e")]
ConnectedETn3270e,
ConnectedSscp,
ConnectedTn3270e,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)]
@@ -154,7 +154,7 @@ pub enum OiaField {
},
/// Host command timer (minutes:seconds)
Script {
value: String,
value: bool,
},
Timing {
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -200,8 +200,9 @@ impl OiaField {
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct Proxy {
pub name: String,
pub username: String,
pub port: u16,
pub username: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
@@ -209,14 +210,15 @@ pub struct Setting {
pub name: String,
/// I'd love something other than depending on serde_json for this.
pub value: Option<serde_json::Value>,
pub cause: ActionCause,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cause: Option<ActionCause>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone)]
pub struct ScreenMode {
pub model: u8,
pub rows: u8,
pub cols: u8,
pub columns: u8,
pub color: bool,
pub oversize: bool,
pub extended: bool,
@@ -284,7 +286,7 @@ pub enum FileTransferState {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub struct Passthru {
pub p_tag: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -304,7 +306,7 @@ pub struct Popup {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub enum PopupType {
/// Error message from a connection attempt
ConnectError,
@@ -321,14 +323,14 @@ pub enum PopupType {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub struct Row {
pub row: u8,
pub changes: Vec<Change>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub enum CountOrText {
Count(usize),
Text(String),
@@ -357,7 +359,7 @@ pub struct Screen {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub struct RunResult {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub r_tag: Option<String>,
@@ -371,7 +373,7 @@ pub struct RunResult {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub struct Scroll {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fg: Option<Color>,
@@ -380,7 +382,7 @@ pub struct Scroll {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename = "kebab-case")]
#[serde(rename_all = "kebab-case")]
pub struct Stats {
pub bytes_received: usize,
pub bytes_sent: usize,
@@ -435,12 +437,23 @@ mod test {
#[test]
pub fn connection_state_serializes_as_expected() {
assert_eq!(
serde_json::to_string(&ConnectionState::ConnectedETn3270e).unwrap(),
r#""connected-e-tn3270e""#
serde_json::to_string(&ConnectionState::ConnectedTn3270e).unwrap(),
r#""connected-tn3270e""#
);
assert_eq!(
serde_json::to_string(&ConnectionState::ConnectedESscp).unwrap(),
r#""connected-e-sscp""#
serde_json::to_string(&ConnectionState::ConnectedSscp).unwrap(),
r#""connected-sscp""#
);
}
}
fn parse_row() {
let instr = r#"[{"row":1,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","text":"z/OS V1R13 PUT Level 1401"},{"column":26,"fg":"red","gr":"highlight,selectable","count":26},{"column":52,"fg":"red","gr":"highlight,selectable","text":"IP Address = 10.24.74.32 "}]},{"row":2,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":51},{"column":52,"fg":"red","gr":"highlight,selectable","text":"VTAM Terminal = SC0TCP05 "}]},{"row":3,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":4,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":23},{"column":24,"fg":"red","gr":"highlight,selectable","text":"Application Developer System"},{"column":52,"fg":"red","gr":"highlight,selectable","count":29}]},{"row":5,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":6,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":32},{"column":33,"fg":"red","gr":"highlight,selectable","text":"// OOOOOOO SSSSS"},{"column":52,"fg":"red","gr":"highlight,selectable","count":29}]},{"row":7,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":31},{"column":32,"fg":"red","gr":"highlight,selectable","text":"// OO OO SS"},{"column":47,"fg":"red","gr":"highlight,selectable","count":34}]},{"row":8,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":23},{"column":24,"fg":"red","gr":"highlight,selectable","text":"zzzzzz // OO OO SS"},{"column":46,"fg":"red","gr":"highlight,selectable","count":35}]},{"row":9,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":25},{"column":26,"fg":"red","gr":"highlight,selectable","text":"zz // OO OO SSSS"},{"column":47,"fg":"red","gr":"highlight,selectable","count":34}]},{"row":10,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":23},{"column":24,"fg":"red","gr":"highlight,selectable","text":"zz // OO OO SS"},{"column":49,"fg":"red","gr":"highlight,selectable","count":32}]},{"row":11,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":21},{"column":22,"fg":"red","gr":"highlight,selectable","text":"zz // OO OO SS"},{"column":48,"fg":"red","gr":"highlight,selectable","count":33}]},{"row":12,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":19},{"column":20,"fg":"red","gr":"highlight,selectable","text":"zzzzzz // OOOOOOO SSSS"},{"column":45,"fg":"red","gr":"highlight,selectable","count":36}]},{"row":13,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":14,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":15,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":19},{"column":20,"fg":"red","gr":"highlight,selectable","text":"System Customization - ADCD.Z113H.*"},{"column":55,"fg":"red","gr":"highlight,selectable","count":26}]},{"row":16,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":17,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":18,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":19,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":80}]},{"row":20,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","text":" ===> Enter \"LOGON\" followed by the TSO userid. Example \"LOGON IBMUSER\" or "}]},{"row":21,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","text":" ===> Enter L followed by the APPLID"},{"column":37,"fg":"red","gr":"highlight,selectable","count":44}]},{"row":22,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","text":" ===> Examples: \"L TSO\", \"L CICSTS41\", \"L CICSTS42\", \"L IMS11\", \"L IMS12\" "}]},{"row":23,"changes":[{"column":1,"fg":"red","gr":"highlight,selectable","count":79},{"column":80,"fg":"green","count":1}]},{"row":24,"changes":[{"column":1,"fg":"green","count":79},{"column":80,"fg":"red","gr":"highlight,selectable","count":1}]}]"#;
if let Err(err) = serde_json::from_slice::<Vec<Row>>(instr.as_bytes()) {
let pos = err.column();
println!("Parse error: {err}");
let (pre, post) = instr.split_at(err.column());
println!("Context: {pre}\x1b[1;31m{post}\x1b[0m");
panic!("{}", err);
}
}
}

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
// {"run":{"actions":[{"action":"Connect","args":["10.24.74.37:3270"]}]}}
// {"run":{"actions":"Key(a)"}}
// {"run":{"actions":[{"action":"Key","args":["a"]}]}}
// Operations
#[derive(Serialize, Deserialize, Debug, Hash, Eq, PartialEq, Clone)]
#[serde(rename_all = "kebab-case")]

View File

@@ -97,6 +97,9 @@ impl FromStr for GraphicRendition {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s == "default" {
return Ok(GraphicRendition::empty());
}
s.split(",")
.map(|attr| {
FLAG_NAMES
@@ -136,7 +139,7 @@ mod test {
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Copy, Clone)]
#[serde(rename = "camelCase")]
#[serde(rename_all = "camelCase")]
#[repr(u8)]
pub enum Color {
NeutralBlack,

View File

@@ -1,6 +1,6 @@
use crate::b3270::indication::{
Change, Connection, ConnectionState, CountOrText, Cursor, Erase, Oia, OiaFieldName, Row,
RunResult, Screen, ScreenMode, Scroll, Setting, TerminalName, Thumb, Tls, TraceFile,
RunResult, Screen, ScreenMode, Scroll, Setting, Thumb, Tls, TraceFile,
};
use crate::b3270::types::{Color, GraphicRendition, PackedAttr};
use crate::b3270::{Indication, InitializeIndication};
@@ -23,7 +23,6 @@ pub struct Tracker {
cursor: Cursor,
connection: Connection,
formatted: bool,
terminal_name: Option<TerminalName>,
trace_file: Option<String>,
tls: Option<Tls>,
@@ -62,7 +61,7 @@ impl Tracker {
self.erase.bg = erase.bg.or(self.erase.bg);
let rows = self.erase.logical_rows.unwrap_or(self.screen_mode.rows) as usize;
let cols = self.erase.logical_cols.unwrap_or(self.screen_mode.cols) as usize;
let cols = self.erase.logical_cols.unwrap_or(self.screen_mode.columns) as usize;
self.screen = vec![
vec![
@@ -92,6 +91,7 @@ impl Tracker {
| InitializeIndication::Models(_)
| InitializeIndication::Prefixes { .. }
| InitializeIndication::Proxies(_)
| InitializeIndication::TerminalName(_)
| InitializeIndication::TlsHello(_)
| InitializeIndication::Tls(_)
| InitializeIndication::TraceFile(_) => static_init.push(indicator),
@@ -169,7 +169,7 @@ impl Tracker {
self.screen_mode = *mode;
self.handle_indication(&mut Indication::Erase(Erase {
logical_rows: Some(self.screen_mode.rows),
logical_cols: Some(self.screen_mode.cols),
logical_cols: Some(self.screen_mode.columns),
fg: None,
bg: None,
}));
@@ -187,9 +187,6 @@ impl Tracker {
Indication::Setting(setting) => {
self.settings.insert(setting.name.clone(), setting.clone());
}
Indication::TerminalName(term) => {
self.terminal_name = Some(term.clone());
}
Indication::Thumb(thumb) => {
self.thumb = thumb.clone();
}
@@ -239,9 +236,6 @@ impl Tracker {
state: self.formatted,
},
];
if let Some(terminal_name) = self.terminal_name.clone() {
result.push(Indication::TerminalName(terminal_name));
}
if let Some(trace_file) = self.trace_file.clone() {
result.push(Indication::TraceFile(TraceFile {
name: Some(trace_file),
@@ -283,7 +277,7 @@ impl Tracker {
.map(Self::format_row)
.enumerate()
.map(|(row_id, changes)| Row {
row: row_id as u8 - 1,
row: row_id as u8 + 1,
changes,
})
.collect(),
@@ -297,7 +291,7 @@ impl Default for Tracker {
screen: vec![],
oia: Default::default(),
screen_mode: ScreenMode {
cols: 80,
columns: 80,
rows: 43,
color: true,
model: 4,
@@ -329,7 +323,6 @@ impl Default for Tracker {
cause: None,
},
formatted: false,
terminal_name: None,
trace_file: None,
tls: None,
static_init: vec![],

View File

@@ -9,14 +9,16 @@ edition = "2021"
serde = { version = "1.0.162", features = ["derive"]}
serde_json = "1.0.96"
anyhow = "1.0.71"
tokio = { version = "1.28.0", features = ["rt", "macros", "sync", "process", "rt-multi-thread", "bytes", "io-util"] }
tokio = { version = "1.28.0", features = ["full"] }
tide = "0.16.0"
tide-websockets = "0.4.0"
d3270-common = {path = "../d3270-common"}
bytes = "1.4.0"
tracing = "0.1.37"
tracing-fmt = "0.1.1"
tracing-subscriber = { version = "0.3.17", features = ["registry", "env-filter"] }
futures = "0.3.28"
tokio-stream = { version = "0.1.14", features = ["sync"] }
rand = "0.8.5"
base64 = "0.21.0"
base64 = "0.21.0"
lazy_static = "1.4.0"

View File

@@ -1,36 +1,26 @@
use anyhow::anyhow;
use base64::engine::general_purpose::STANDARD as B64_STANDARD;
use base64::Engine;
use bytes::Buf;
use d3270_common::b3270::indication::RunResult;
use d3270_common::b3270::operation::Action;
use d3270_common::b3270::{operation, Indication, Operation};
use d3270_common::tracker::{Disposition, Tracker};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use rand::RngCore;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::anyhow;
use base64::engine::general_purpose::STANDARD as B64_STANDARD;
use base64::Engine;
use bytes::Buf;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use rand::RngCore;
use tokio::io::{AsyncBufReadExt, AsyncWrite, BufReader, Lines};
use tokio::process::{Child, ChildStdout};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::{error, info, warn};
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
pub struct B3270 {
tracker: Tracker, //
child: Child, //
comm: mpsc::Receiver<B3270Request>, //
ind_chan: broadcast::Sender<Indication>, //
child_reader: Lines<BufReader<ChildStdout>>, //
use d3270_common::b3270::indication::RunResult;
use d3270_common::b3270::operation::{Action, Run};
use d3270_common::b3270::{operation, Indication, Operation};
use d3270_common::tracker::{Disposition, Tracker};
write_buf: VecDeque<u8>,
action_response_map: HashMap<String, oneshot::Sender<RunResult>>, //
}
pub enum B3270Request {
enum B3270Request {
Action(Vec<Action>, oneshot::Sender<RunResult>),
Resync(oneshot::Sender<(Vec<Indication>, broadcast::Receiver<Indication>)>),
}
@@ -43,21 +33,41 @@ enum HandleReceiveState {
broadcast::Receiver<Indication>,
),
TryRestart(
BoxFuture<'static, Result<(), ()>>,
Pin<Box<dyn Future<Output = Result<(), ()>> + Send + Sync>>,
oneshot::Receiver<(Vec<Indication>, broadcast::Receiver<Indication>)>,
),
}
pub struct Handle {
pub struct ArbiterHandle {
sender: mpsc::Sender<B3270Request>,
receiver: Option<HandleReceiveState>,
}
impl Stream for Handle {
impl ArbiterHandle {
pub async fn send_action(
&self,
action: Action,
) -> anyhow::Result<oneshot::Receiver<RunResult>> {
self.send_actions(vec![action]).await
}
#[instrument(skip(self))]
pub async fn send_actions(
&self,
actions: Vec<Action>,
) -> anyhow::Result<oneshot::Receiver<RunResult>> {
let (os_snd, os_rcv) = oneshot::channel();
self.sender
.send(B3270Request::Action(actions, os_snd))
.await
.map_err(|_| anyhow!("Failed to send action to arbiter"))?;
Ok(os_rcv)
}
}
impl Stream for ArbiterHandle {
type Item = Indication;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// iter tracks whether any progress has been made
loop {
match self.receiver.take() {
Some(HandleReceiveState::TryRestart(mut fut, receiver)) => {
@@ -100,8 +110,11 @@ impl Stream for Handle {
self.receiver = Some(HandleReceiveState::Steady(rcvr));
return Poll::Ready(Some(msg));
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => {
warn!("Dropped messages from b3270 server; starting resync");
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) => {
info!(
dropped = n,
"Dropped messages from b3270 server; starting resync"
);
let (os_snd, os_rcv) = oneshot::channel();
let fut = self
.sender
@@ -110,15 +123,18 @@ impl Stream for Handle {
.map_ok(move |permit| {
permit.send(B3270Request::Resync(os_snd));
})
.map_err(|_| ())
.boxed();
self.receiver = Some(HandleReceiveState::TryRestart(fut, os_rcv));
.map_err(|_| ());
self.receiver = Some(HandleReceiveState::TryRestart(Box::pin(fut), os_rcv));
}
Poll::Ready(None) => {
warn!("Failed to receive from b3270 server");
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
Poll::Pending => {
self.receiver = Some(HandleReceiveState::Steady(rcvr));
return Poll::Pending;
}
},
None => {
@@ -129,12 +145,44 @@ impl Stream for Handle {
}
}
#[derive(Clone)]
pub struct ArbiterHandleRequester(mpsc::Sender<B3270Request>);
impl ArbiterHandleRequester {
#[instrument(skip(self))]
pub async fn connect(&self) -> anyhow::Result<ArbiterHandle> {
let (conn_send, conn_rcv) = oneshot::channel();
self.0
.send(B3270Request::Resync(conn_send))
.await
.map_err(|_| anyhow!("Failed to send request to arbiter"))?;
let (indications, rcvr) = conn_rcv.await?;
Ok(ArbiterHandle {
sender: self.0.clone(),
receiver: Some(HandleReceiveState::Resume(indications.into_iter(), rcvr)),
})
}
}
pub struct B3270 {
tracker: Tracker,
child: Child,
comm: mpsc::Receiver<B3270Request>,
ind_chan: broadcast::Sender<Indication>,
child_reader: Lines<BufReader<ChildStdout>>,
write_buf: VecDeque<u8>,
action_response_map: HashMap<String, oneshot::Sender<RunResult>>,
}
impl B3270 {
pub fn spawn(
mut child: Child,
initial_actions: &[Action],
) -> (
tokio::task::JoinHandle<anyhow::Error>,
mpsc::Sender<B3270Request>,
ArbiterHandleRequester,
) {
let (subproc_snd, subproc_rcv) = mpsc::channel(10);
let child_reader = child
@@ -144,16 +192,32 @@ impl B3270 {
let child_reader = BufReader::new(child_reader).lines();
// A single connect can result in a flurry of messages, so we need a big buffer
let (ind_chan, _) = broadcast::channel(100);
let mut write_buf = VecDeque::new();
// Queue any initial actions.
let act_str = serde_json::to_string(&Operation::Run(Run{
actions: initial_actions.to_vec(),
type_: Some("keybind".to_owned()),
r_tag: None,
})).unwrap();
trace!(json=%act_str, "Writing initialization action");
write_buf.extend(act_str.as_bytes());
write_buf.push_back(b'\n');
let proc = B3270 {
child,
child_reader,
tracker: Tracker::default(),
comm: subproc_rcv,
ind_chan,
write_buf: VecDeque::new(),
write_buf,
action_response_map: Default::default(),
};
(tokio::task::spawn(proc), subproc_snd)
(
tokio::task::spawn(proc.instrument(info_span!("arbiter"))),
ArbiterHandleRequester(subproc_snd),
)
}
}
@@ -167,9 +231,12 @@ impl Future for B3270 {
while let Poll::Ready(buf) = Pin::new(&mut self.child_reader).poll_next_line(cx) {
match buf {
Ok(Some(line)) => match serde_json::from_str(&line) {
Ok(ind) => indications.push(ind),
Ok(ind) => {
trace!(json = %line, "Received indication");
indications.push(ind)
}
Err(error) => {
warn!(%error, msg=line, "Failed to parse indication");
warn!(%error, msg=%line, "Failed to parse indication");
}
},
// EOF on stdin; this is a big problem
@@ -239,9 +306,10 @@ impl Future for B3270 {
type_: Some("keymap".to_owned()),
actions,
});
let result = serde_json::to_writer(&mut self.write_buf, &op);
match result {
Ok(()) => {
match serde_json::to_string(&op) {
Ok(op_str) => {
trace!(json = op_str, "Sending operation");
self.write_buf.extend(op_str.bytes());
self.write_buf.push_back(b'\n');
self.action_response_map.insert(tag, response_chan);
}

View File

@@ -0,0 +1,87 @@
use crate::arbiter::{ArbiterHandle, ArbiterHandleRequester};
use d3270_common::b3270::indication::RunResult;
use d3270_common::b3270::operation::Run;
use d3270_common::b3270::{Indication, Operation};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::FutureExt;
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::warn;
pub struct GenConnection {
handle: ArbiterHandle,
waiting_actions: FuturesUnordered<ReplaceTag>,
}
struct ReplaceTag {
tag: Option<String>,
rcvr: oneshot::Receiver<RunResult>,
}
impl Future for ReplaceTag {
type Output = Option<Indication>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.rcvr.poll_unpin(cx)).ok().map(|run_res| {
Indication::RunResult(RunResult {
r_tag: self.tag.take(),
..run_res
})
}))
}
}
impl GenConnection {
pub async fn new(ahr: ArbiterHandleRequester) -> anyhow::Result<Self> {
let handle = ahr.connect().await?;
Ok(Self {
handle,
waiting_actions: FuturesUnordered::new(),
})
}
pub async fn handle_client_line(&mut self, line: String) -> anyhow::Result<()> {
let op = serde_json::from_str(&line)?;
match op {
Operation::Run(Run { actions, r_tag, .. }) => {
let rcvr = self.handle.send_actions(actions).await?;
self.waiting_actions.push(ReplaceTag { tag: r_tag, rcvr });
}
_ => warn!(json = line, "Unsupported operation from client"),
}
Ok(())
}
pub fn poll_indication(&mut self, cx: &mut Context) -> Poll<Option<Indication>> {
let mut any_can_continue = false;
match self.waiting_actions.poll_next_unpin(cx) {
Poll::Ready(Some(ind)) => {
return Poll::Ready(ind);
}
Poll::Ready(None) => {}
Poll::Pending => any_can_continue = true,
}
match self.handle.poll_next_unpin(cx) {
Poll::Ready(Some(ind)) => {
return Poll::Ready(Some(ind));
}
Poll::Ready(None) => {}
Poll::Pending => any_can_continue = true,
}
if any_can_continue {
Poll::Pending
} else {
Poll::Ready(None)
}
}
pub async fn next_indication(&mut self) -> Option<Indication> {
poll_fn(|cx| self.poll_indication(cx)).await
}
}

View File

@@ -1,13 +1,73 @@
use anyhow::anyhow;
use std::ffi::OsString;
use std::future::Future;
use std::pin::Pin;
use std::process::Stdio;
use std::str::FromStr;
use std::task::{ready, Context, Poll};
use anyhow::anyhow;
use futures::future::select_all;
use futures::FutureExt;
use tokio::task::JoinHandle;
use tracing::{error, info};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::prelude::*;
use d3270_common::b3270::operation::Action;
pub mod arbiter;
pub mod gen_connection;
pub mod tcp_server;
struct TaggedJoinHandle {
handle: JoinHandle<anyhow::Error>,
tag: &'static str,
}
impl Future for TaggedJoinHandle {
type Output = (&'static str, anyhow::Error);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let err = match ready!(self.handle.poll_unpin(cx)) {
Ok(err) => err,
Err(err) => err.into(),
};
Poll::Ready((self.tag, err))
}
}
trait JoinHandleTagExt {
fn tagged(self, tag: &'static str) -> TaggedJoinHandle;
}
impl JoinHandleTagExt for JoinHandle<anyhow::Error> {
fn tagged(self, tag: &'static str) -> TaggedJoinHandle {
TaggedJoinHandle { handle: self, tag }
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut subprocess_args = vec![OsString::from_str("-json").unwrap()];
// Configure logging
tracing_subscriber::registry()
.with(tracing_subscriber::filter::EnvFilter::from_default_env())
.with(
tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::ACTIVE)
).init();
info!("Test");
let mut subprocess_args = vec![
OsString::from_str("-json").unwrap(),
OsString::from_str("-utf8").unwrap(),
];
let mut args_iter = std::env::args_os().peekable();
let mut connect_str = None;
let mut tcp_listen = None;
args_iter.next(); // skip program name.
while let Some(arg) = args_iter.next() {
// we default to one of the ignored args
match arg.to_str().unwrap_or("-json") {
@@ -26,6 +86,16 @@ async fn main() -> anyhow::Result<()> {
})
.map(Some)?;
}
"-tcp-listen" => {
tcp_listen = args_iter
.next()
.ok_or_else(|| anyhow!("Arg required for -tcp-listen"))?
.into_string()
.map_err(|_| anyhow!("Failed to parse tcp-listen address"))?
.parse()
.map(Some)
.map_err(|_| anyhow!("Invalid listen address"))?;
}
"-e" => {
'skip: while let Some(arg) = args_iter.peek() {
if arg.to_str().unwrap_or("").starts_with("-") {
@@ -38,18 +108,31 @@ async fn main() -> anyhow::Result<()> {
}
}
let _connect_str = connect_str.ok_or_else(|| anyhow!("No connect string given"))?;
let connect_str = connect_str.ok_or_else(|| anyhow!("No connect string given"))?;
info!(args=?subprocess_args, "Starting b3270");
let subproc = tokio::process::Command::new("b3270")
.args(&subprocess_args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (_server, _server_req) = connection::B3270::spawn(subproc);
// TODO: make connection before starting listeners
let mut handles: Vec<TaggedJoinHandle> = vec![];
let (arbiter, arbiter_req) = arbiter::B3270::spawn(
subproc,
&[Action {
action: "Connect".to_owned(),
args: vec![connect_str],
}],
);
handles.push(arbiter.tagged("arbiter"));
if let Some(addr) = tcp_listen {
let tcp_listener = tcp_server::listener_proc(addr, arbiter_req.clone()).await?;
handles.push(tcp_listener.tagged("tcp_listener"));
}
let ((source, error), _, _) = select_all(handles).await;
error!(source, %error, "A core task failed");
Ok(())
}
pub mod connection;

85
d3270d/src/tcp_server.rs Normal file
View File

@@ -0,0 +1,85 @@
use std::net::SocketAddr;
use anyhow::bail;
use futures::never::Never;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::select;
use tokio::task::JoinHandle;
use tracing::{error, info, info_span, Instrument, instrument};
use crate::arbiter::ArbiterHandleRequester;
use crate::gen_connection::GenConnection;
#[instrument(skip(handle_requester))]
pub async fn listener_proc(
socket: SocketAddr,
handle_requester: ArbiterHandleRequester,
) -> anyhow::Result<JoinHandle<anyhow::Error>> {
let listener = match tokio::net::TcpListener::bind(socket.clone()).await {
Err(error) => {
error!(?socket, ?error, "Failed to bind");
return Err(error.into());
}
Ok(listener) => listener
};
let span = info_span!(target: "connection-handling", "tcp_listener", addr=%socket);
Ok(tokio::spawn(
async move {
let error = listener_task(listener, handle_requester).await.unwrap_err();
error!(%error, "TCP listener failed to accept");
error
}
.instrument(span),
))
}
async fn listener_task(
listener: TcpListener,
handle_requester: ArbiterHandleRequester,
) -> anyhow::Result<Never> {
loop {
let (conn, client_addr) = listener.accept().await?;
let handle_requester = handle_requester.clone();
let conn_span =
info_span!(target: "connection-handling", "tcp_accept", client=%client_addr);
tokio::spawn(
async move {
info!("Accepted connection");
if let Err(error) = handle_tcp_connection(conn, handle_requester).await {
error!(%error, "Connection handler failed");
} else {
info!("Connection closed");
}
}
.instrument(conn_span),
);
}
}
async fn handle_tcp_connection(
mut conn: TcpStream,
handle_requester: ArbiterHandleRequester,
) -> anyhow::Result<()> {
let (stream_rd, mut stream_wr) = conn.split();
let mut stream_rd = BufReader::new(stream_rd).lines();
let mut conn = GenConnection::new(handle_requester).await?;
loop {
select! {
line = stream_rd.next_line() => match line? {
Some(line) => conn.handle_client_line(line).await?,
None => bail!("Connection closed"),
},
ind = conn.next_indication() => match ind {
None => bail!("Arbiter lost"),
Some(ind) => {
let mut ind = serde_json::to_vec(&ind)?;
ind.push(b'\n');
stream_wr.write_all(ind.as_slice()).await?;
}
},
}
}
}