From 0352f64c0fa5365c02934078916626b1fb5c7cf5 Mon Sep 17 00:00:00 2001 From: TQ Hirsch Date: Thu, 11 May 2023 12:45:47 +0200 Subject: [PATCH] Renamed connection to arbiter, which should be clearer --- d3270d/src/{connection.rs => arbiter.rs} | 81 +++++++++++++++++------- d3270d/src/main.rs | 4 +- 2 files changed, 61 insertions(+), 24 deletions(-) rename d3270d/src/{connection.rs => arbiter.rs} (85%) diff --git a/d3270d/src/connection.rs b/d3270d/src/arbiter.rs similarity index 85% rename from d3270d/src/connection.rs rename to d3270d/src/arbiter.rs index de3a659..0e0c932 100644 --- a/d3270d/src/connection.rs +++ b/d3270d/src/arbiter.rs @@ -19,18 +19,7 @@ use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tracing::{error, info, warn}; -pub struct B3270 { - tracker: Tracker, // - child: Child, // - comm: mpsc::Receiver, // - ind_chan: broadcast::Sender, // - child_reader: Lines>, // - - write_buf: VecDeque, - action_response_map: HashMap>, // -} - -pub enum B3270Request { +enum B3270Request { Action(Vec, oneshot::Sender), Resync(oneshot::Sender<(Vec, broadcast::Receiver)>), } @@ -47,17 +36,35 @@ enum HandleReceiveState { oneshot::Receiver<(Vec, broadcast::Receiver)>, ), } -pub struct Handle { +pub struct ArbiterHandle { sender: mpsc::Sender, receiver: Option, } -impl Stream for Handle { +impl ArbiterHandle { + pub async fn send_action( + &self, + action: Action, + ) -> anyhow::Result> { + self.send_actions(vec![action]).await + } + pub async fn send_actions( + &self, + actions: Vec, + ) -> anyhow::Result> { + let (os_snd, os_rcv) = oneshot::channel(); + self.sender + .send(B3270Request::Action(actions, os_snd)) + .await + .map_err(|_| anyhow!("Failed to send action to arbiter"))?; + Ok(os_rcv) + } +} + +impl Stream for ArbiterHandle { type Item = Indication; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // iter tracks whether any progress has been made - loop { match self.receiver.take() { Some(HandleReceiveState::TryRestart(mut fut, receiver)) => { @@ -118,7 +125,10 @@ impl Stream for Handle { warn!("Failed to receive from b3270 server"); return Poll::Ready(None); } - Poll::Pending => return Poll::Pending, + Poll::Pending => { + self.receiver = Some(HandleReceiveState::Steady(rcvr)); + return Poll::Pending; + } }, None => { @@ -129,13 +139,40 @@ impl Stream for Handle { } } +#[derive(Clone)] +pub struct B3270HandleRequester(mpsc::Sender); + +impl B3270HandleRequester { + pub async fn connect(&self) -> anyhow::Result { + let (conn_send, conn_rcv) = oneshot::channel(); + self.0 + .send(B3270Request::Resync(conn_send)) + .await + .map_err(|_| anyhow!("Failed to send request to arbiter"))?; + + let (indications, rcvr) = conn_rcv.await?; + Ok(ArbiterHandle { + sender: self.0.clone(), + receiver: Some(HandleReceiveState::Resume(indications.into_iter(), rcvr)), + }) + } +} + +pub struct B3270 { + tracker: Tracker, + child: Child, + comm: mpsc::Receiver, + ind_chan: broadcast::Sender, + child_reader: Lines>, + + write_buf: VecDeque, + action_response_map: HashMap>, +} + impl B3270 { pub fn spawn( mut child: Child, - ) -> ( - tokio::task::JoinHandle, - mpsc::Sender, - ) { + ) -> (tokio::task::JoinHandle, B3270HandleRequester) { let (subproc_snd, subproc_rcv) = mpsc::channel(10); let child_reader = child .stdout @@ -153,7 +190,7 @@ impl B3270 { write_buf: VecDeque::new(), action_response_map: Default::default(), }; - (tokio::task::spawn(proc), subproc_snd) + (tokio::task::spawn(proc), B3270HandleRequester(subproc_snd)) } } diff --git a/d3270d/src/main.rs b/d3270d/src/main.rs index 90e515f..5a9d55d 100644 --- a/d3270d/src/main.rs +++ b/d3270d/src/main.rs @@ -46,10 +46,10 @@ async fn main() -> anyhow::Result<()> { .stdout(Stdio::piped()) .spawn()?; - let (_server, _server_req) = connection::B3270::spawn(subproc); + let (_server, _server_req) = arbiter::B3270::spawn(subproc); // TODO: make connection before starting listeners Ok(()) } -pub mod connection; +pub mod arbiter;