Now with rudimentary support for reading incoming records

This commit is contained in:
2020-11-11 16:17:02 +01:00
parent c294e8033f
commit cfdfedb5b4
3 changed files with 45 additions and 11 deletions

View File

@@ -12,3 +12,4 @@ structopt = "0.3.20"
anyhow = "1.0.33" anyhow = "1.0.33"
thiserror = "1.0.21" thiserror = "1.0.21"
bitflags = "1.2.1" bitflags = "1.2.1"
hex = "0.4.2"

View File

@@ -5,7 +5,7 @@ use tn3270s::tn3270;
#[derive(StructOpt)] #[derive(StructOpt)]
pub struct Cli { pub struct Cli {
#[structopt(short="h", long = "host", default_value="[::1]")] #[structopt(short="h", long = "host", default_value="::1")]
host: String, host: String,
#[structopt(short="p", long = "port", default_value="2101")] #[structopt(short="p", long = "port", default_value="2101")]
port: u16, port: u16,
@@ -14,7 +14,7 @@ pub struct Cli {
fn run(mut session: tn3270::Session) -> anyhow::Result<()> { fn run(mut session: tn3270::Session) -> anyhow::Result<()> {
use tn3270::stream::*; use tn3270::stream::*;
let mut record = WriteCommand::new(WriteCommandCode::Write, WCC::RESET); let mut record = WriteCommand::new(WriteCommandCode::Write, WCC::RESET | WCC::KBD_RESTORE);
let bufsz = BufferAddressCalculator { width: 80, height: 24 }; let bufsz = BufferAddressCalculator { width: 80, height: 24 };
record.set_buffer_address(0) record.set_buffer_address(0)
.erase_unprotected_to_address(bufsz.encode_address(79, 23)) .erase_unprotected_to_address(bufsz.encode_address(79, 23))
@@ -23,6 +23,14 @@ fn run(mut session: tn3270::Session) -> anyhow::Result<()> {
.send_text("Hello from Rust!"); .send_text("Hello from Rust!");
session.send_record(record)?; session.send_record(record)?;
let record = session.receive_record(None)?;
if let Some(record) = record {
eprintln!("Incoming record: {:?}", hex::encode(record));
} else {
eprintln!("No record");
}
std::thread::sleep(Duration::from_secs(60)); std::thread::sleep(Duration::from_secs(60));
Ok(()) Ok(())
} }

View File

@@ -9,20 +9,21 @@ use libtelnet_rs::{
use std::net::TcpStream; use std::net::TcpStream;
use std::io::{Write, Read}; use std::io::{Write, Read};
use std::time::Duration; use std::time::Duration;
use std::collections::VecDeque;
pub mod stream; pub mod stream;
pub struct Session { pub struct Session {
parser: Parser, parser: Parser,
ibuf: Vec<u8>,
obuf: Vec<u8>,
stream: std::net::TcpStream, stream: std::net::TcpStream,
term_type: Option<Vec<u8>>, term_type: Option<Vec<u8>>,
is_eor: bool, is_eor: bool,
is_bin: bool, is_bin: bool,
incoming_records: VecDeque<Vec<u8>>,
cur_record: Vec<u8>, cur_record: Vec<u8>,
} }
@@ -32,8 +33,7 @@ impl Session {
pub fn new(stream: TcpStream) -> Result<Self, Error> { pub fn new(stream: TcpStream) -> Result<Self, Error> {
let mut session = Session { let mut session = Session {
parser: Parser::new(), parser: Parser::new(),
ibuf: Vec::new(), incoming_records: VecDeque::new(),
obuf: Vec::new(),
stream, stream,
term_type: None, term_type: None,
is_bin: false, is_bin: false,
@@ -57,10 +57,9 @@ impl Session {
opt.local_state && opt.remote_state opt.local_state && opt.remote_state
} }
fn process_events(&mut self, mut events: Vec<TelnetEvents>) -> Result<Vec<Vec<u8>>, Error> { fn process_events(&mut self, mut events: Vec<TelnetEvents>) -> Result<(), Error> {
let mut extra_events = Vec::new(); let mut extra_events = Vec::new();
let mut sendbuf = Vec::new(); let mut sendbuf = Vec::new();
let mut records_in = Vec::new();
while !events.is_empty() || !extra_events.is_empty() { while !events.is_empty() || !extra_events.is_empty() {
events.append(&mut extra_events); events.append(&mut extra_events);
extra_events.truncate(0); extra_events.truncate(0);
@@ -69,7 +68,7 @@ impl Session {
TelnetEvents::DataSend(ref mut data) => sendbuf.append(data), TelnetEvents::DataSend(ref mut data) => sendbuf.append(data),
TelnetEvents::DataReceive(ref mut data) => self.cur_record.append(data), TelnetEvents::DataReceive(ref mut data) => self.cur_record.append(data),
TelnetEvents::IAC(TelnetIAC { command: tn_cmd::EOR }) => TelnetEvents::IAC(TelnetIAC { command: tn_cmd::EOR }) =>
records_in.push(std::mem::replace(&mut self.cur_record, Vec::new())), self.incoming_records.push_back(std::mem::replace(&mut self.cur_record, Vec::new())),
TelnetEvents::IAC(iac) => eprintln!("Unknown IAC {}", iac.command), TelnetEvents::IAC(iac) => eprintln!("Unknown IAC {}", iac.command),
TelnetEvents::Negotiation(TelnetNegotiation { command: tn_cmd::WILL, option: tn_opt::TTYPE }) => { TelnetEvents::Negotiation(TelnetNegotiation { command: tn_cmd::WILL, option: tn_opt::TTYPE }) => {
eprintln!("WILL ttype"); eprintln!("WILL ttype");
@@ -110,8 +109,8 @@ impl Session {
} }
eprintln!("Sending: {:?}", &sendbuf); eprintln!("Sending: {:?}", &sendbuf);
self.stream.write(sendbuf.as_slice())?; self.stream.write_all(sendbuf.as_slice())?;
Ok(records_in) Ok(())
} }
fn is_ready(&self) -> bool { fn is_ready(&self) -> bool {
@@ -154,5 +153,31 @@ impl Session {
self.stream.write_all(send_data.as_slice()) self.stream.write_all(send_data.as_slice())
} }
pub fn receive_record(&mut self, timeout: Option<Duration>) -> std::io::Result<Option<Vec<u8>>> {
if !self.incoming_records.is_empty() {
return Ok(self.incoming_records.pop_front());
}
self.stream.set_read_timeout(timeout)?;
let mut buf = vec![0; 1024];
let mut len = self.stream.read(buf.as_mut_slice())?;
if len != 0 {
self.stream.set_nonblocking(true)?;
while len != 0 {
let events = self.parser.receive(&buf[..len]);
self.process_events(events);
len = match self.stream.read(buf.as_mut_slice()) {
Ok(len) => len,
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => 0,
Err(err) => return Err(err),
};
}
self.stream.set_nonblocking(false)?;
}
self.stream.set_read_timeout(None)?;
Ok(self.incoming_records.pop_front())
}
} }