Renamed connection to arbiter, which should be clearer
This commit is contained in:
@@ -19,18 +19,7 @@ use tokio::sync::{broadcast, mpsc, oneshot};
|
|||||||
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
|
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
pub struct B3270 {
|
enum B3270Request {
|
||||||
tracker: Tracker, //
|
|
||||||
child: Child, //
|
|
||||||
comm: mpsc::Receiver<B3270Request>, //
|
|
||||||
ind_chan: broadcast::Sender<Indication>, //
|
|
||||||
child_reader: Lines<BufReader<ChildStdout>>, //
|
|
||||||
|
|
||||||
write_buf: VecDeque<u8>,
|
|
||||||
action_response_map: HashMap<String, oneshot::Sender<RunResult>>, //
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum B3270Request {
|
|
||||||
Action(Vec<Action>, oneshot::Sender<RunResult>),
|
Action(Vec<Action>, oneshot::Sender<RunResult>),
|
||||||
Resync(oneshot::Sender<(Vec<Indication>, broadcast::Receiver<Indication>)>),
|
Resync(oneshot::Sender<(Vec<Indication>, broadcast::Receiver<Indication>)>),
|
||||||
}
|
}
|
||||||
@@ -47,17 +36,35 @@ enum HandleReceiveState {
|
|||||||
oneshot::Receiver<(Vec<Indication>, broadcast::Receiver<Indication>)>,
|
oneshot::Receiver<(Vec<Indication>, broadcast::Receiver<Indication>)>,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
pub struct Handle {
|
pub struct ArbiterHandle {
|
||||||
sender: mpsc::Sender<B3270Request>,
|
sender: mpsc::Sender<B3270Request>,
|
||||||
receiver: Option<HandleReceiveState>,
|
receiver: Option<HandleReceiveState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for Handle {
|
impl ArbiterHandle {
|
||||||
|
pub async fn send_action(
|
||||||
|
&self,
|
||||||
|
action: Action,
|
||||||
|
) -> anyhow::Result<oneshot::Receiver<RunResult>> {
|
||||||
|
self.send_actions(vec![action]).await
|
||||||
|
}
|
||||||
|
pub async fn send_actions(
|
||||||
|
&self,
|
||||||
|
actions: Vec<Action>,
|
||||||
|
) -> anyhow::Result<oneshot::Receiver<RunResult>> {
|
||||||
|
let (os_snd, os_rcv) = oneshot::channel();
|
||||||
|
self.sender
|
||||||
|
.send(B3270Request::Action(actions, os_snd))
|
||||||
|
.await
|
||||||
|
.map_err(|_| anyhow!("Failed to send action to arbiter"))?;
|
||||||
|
Ok(os_rcv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for ArbiterHandle {
|
||||||
type Item = Indication;
|
type Item = Indication;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
// iter tracks whether any progress has been made
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.receiver.take() {
|
match self.receiver.take() {
|
||||||
Some(HandleReceiveState::TryRestart(mut fut, receiver)) => {
|
Some(HandleReceiveState::TryRestart(mut fut, receiver)) => {
|
||||||
@@ -118,7 +125,10 @@ impl Stream for Handle {
|
|||||||
warn!("Failed to receive from b3270 server");
|
warn!("Failed to receive from b3270 server");
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
}
|
}
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => {
|
||||||
|
self.receiver = Some(HandleReceiveState::Steady(rcvr));
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
None => {
|
None => {
|
||||||
@@ -129,13 +139,40 @@ impl Stream for Handle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct B3270HandleRequester(mpsc::Sender<B3270Request>);
|
||||||
|
|
||||||
|
impl B3270HandleRequester {
|
||||||
|
pub async fn connect(&self) -> anyhow::Result<ArbiterHandle> {
|
||||||
|
let (conn_send, conn_rcv) = oneshot::channel();
|
||||||
|
self.0
|
||||||
|
.send(B3270Request::Resync(conn_send))
|
||||||
|
.await
|
||||||
|
.map_err(|_| anyhow!("Failed to send request to arbiter"))?;
|
||||||
|
|
||||||
|
let (indications, rcvr) = conn_rcv.await?;
|
||||||
|
Ok(ArbiterHandle {
|
||||||
|
sender: self.0.clone(),
|
||||||
|
receiver: Some(HandleReceiveState::Resume(indications.into_iter(), rcvr)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct B3270 {
|
||||||
|
tracker: Tracker,
|
||||||
|
child: Child,
|
||||||
|
comm: mpsc::Receiver<B3270Request>,
|
||||||
|
ind_chan: broadcast::Sender<Indication>,
|
||||||
|
child_reader: Lines<BufReader<ChildStdout>>,
|
||||||
|
|
||||||
|
write_buf: VecDeque<u8>,
|
||||||
|
action_response_map: HashMap<String, oneshot::Sender<RunResult>>,
|
||||||
|
}
|
||||||
|
|
||||||
impl B3270 {
|
impl B3270 {
|
||||||
pub fn spawn(
|
pub fn spawn(
|
||||||
mut child: Child,
|
mut child: Child,
|
||||||
) -> (
|
) -> (tokio::task::JoinHandle<anyhow::Error>, B3270HandleRequester) {
|
||||||
tokio::task::JoinHandle<anyhow::Error>,
|
|
||||||
mpsc::Sender<B3270Request>,
|
|
||||||
) {
|
|
||||||
let (subproc_snd, subproc_rcv) = mpsc::channel(10);
|
let (subproc_snd, subproc_rcv) = mpsc::channel(10);
|
||||||
let child_reader = child
|
let child_reader = child
|
||||||
.stdout
|
.stdout
|
||||||
@@ -153,7 +190,7 @@ impl B3270 {
|
|||||||
write_buf: VecDeque::new(),
|
write_buf: VecDeque::new(),
|
||||||
action_response_map: Default::default(),
|
action_response_map: Default::default(),
|
||||||
};
|
};
|
||||||
(tokio::task::spawn(proc), subproc_snd)
|
(tokio::task::spawn(proc), B3270HandleRequester(subproc_snd))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,10 +46,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
.spawn()?;
|
.spawn()?;
|
||||||
|
|
||||||
let (_server, _server_req) = connection::B3270::spawn(subproc);
|
let (_server, _server_req) = arbiter::B3270::spawn(subproc);
|
||||||
// TODO: make connection before starting listeners
|
// TODO: make connection before starting listeners
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod connection;
|
pub mod arbiter;
|
||||||
|
|||||||
Reference in New Issue
Block a user