From 2e011bcad60affbf8de1e0635cedaf966d24e960 Mon Sep 17 00:00:00 2001 From: Mateusz Faderewski Date: Mon, 13 Mar 2023 13:35:39 +0100 Subject: [PATCH] stable remote connection --- sw/deployer/src/main.rs | 24 ++++-- sw/deployer/src/sc64/link.rs | 137 +++++++++++++++++++---------------- 2 files changed, 92 insertions(+), 69 deletions(-) diff --git a/sw/deployer/src/main.rs b/sw/deployer/src/main.rs index a1c7aff..c632762 100644 --- a/sw/deployer/src/main.rs +++ b/sw/deployer/src/main.rs @@ -762,19 +762,31 @@ fn handle_server_command(connection: Connection, args: &ServerArgs) -> Result<() sc64::run_server(port, args.address.clone(), |event| match event { sc64::ServerEvent::Listening(address) => { - println!("{}: Listening on address [{}]", "[Server]".bold(), address) + println!( + "{}: Listening on address [{}]", + "[Server]".bold(), + address.bright_blue() + ) } - sc64::ServerEvent::Connection(peer) => { - println!("{}: New connection from [{}]", "[Server]".bold(), peer); + sc64::ServerEvent::Connected(peer) => { + println!( + "{}: New connection from [{}]", + "[Server]".bold(), + peer.bright_green() + ); } sc64::ServerEvent::Disconnected(peer) => { - println!("{}: Client disconnected [{}]", "[Server]".bold(), peer); + println!( + "{}: Client disconnected [{}]", + "[Server]".bold(), + peer.green() + ); } sc64::ServerEvent::Err(error) => { println!( - "{}: Client disconnected with error: {}", + "{}: Client disconnected - server error: {}", "[Server]".bold(), - error + error.red() ); } })?; diff --git a/sw/deployer/src/sc64/link.rs b/sw/deployer/src/sc64/link.rs index cbdee88..a8fc6e4 100644 --- a/sw/deployer/src/sc64/link.rs +++ b/sw/deployer/src/sc64/link.rs @@ -1,7 +1,7 @@ use super::error::Error; use std::{ collections::VecDeque, - io::{BufRead, BufReader, BufWriter, ErrorKind, Read, Write}, + io::{BufReader, BufWriter, ErrorKind, Read, Write}, net::{TcpListener, TcpStream}, time::{Duration, Instant}, }; @@ -165,24 +165,35 @@ struct TcpBackend { stream: TcpStream, reader: BufReader, writer: BufWriter, + header_position: usize, + header: [u8; 4], } impl TcpBackend { - fn bytes_to_read(&mut self) -> Result { - self.stream.set_nonblocking(true)?; - let result = self.reader.fill_buf(); - let length = match result { - Ok(buffer) => buffer.len(), - Err(error) => { - if error.kind() == ErrorKind::WouldBlock { - 0 - } else { - return Err(error.into()); - } + fn read_header(&mut self, block: bool) -> Result { + self.stream.set_nonblocking(!block)?; + while self.header_position != 4 { + self.header_position += + match self.reader.read(&mut self.header[self.header_position..4]) { + Ok(0) => return Err(Error::new("Unexpected end of stream")), + Ok(length) => length, + Err(error) => match error.kind() { + ErrorKind::Interrupted => 0, + ErrorKind::WouldBlock => 0, + _ => return Err(error.into()), + }, + }; + if !block { + break; } - }; + } self.stream.set_nonblocking(false)?; - return Ok(length); + if self.header_position == 4 { + self.header_position = 0; + return Ok(true); + } else { + return Ok(false); + } } } @@ -211,10 +222,8 @@ impl Backend for TcpBackend { ) -> Result, Error> { let mut buffer = [0u8; 4]; - while matches!(data_type, DataType::Response) || self.bytes_to_read()? >= 4 { - self.reader.read_exact(&mut buffer)?; - let payload_data_type: DataType = u32::from_be_bytes(buffer).try_into()?; - + while self.read_header(matches!(data_type, DataType::Response))? { + let payload_data_type: DataType = u32::from_be_bytes(self.header).try_into()?; match payload_data_type { DataType::Response => { let mut response_info = vec![0u8; 2]; @@ -278,6 +287,8 @@ fn new_tcp_backend(address: &str) -> Result { stream, reader, writer, + header_position: 0, + header: [0, 0, 0, 0], }) } @@ -386,7 +397,7 @@ pub fn list_local_devices() -> Result, Error> { pub enum ServerEvent { Listening(String), - Connection(String), + Connected(String), Disconnected(String), Err(String), } @@ -419,6 +430,7 @@ fn server_accept_connection( stream: &mut TcpStream, ) -> Result<(), Error> { let peer = stream.peer_addr()?.to_string(); + event_callback(ServerEvent::Connected(peer.clone())); stream.set_write_timeout(Some(Duration::from_secs(10)))?; stream.set_read_timeout(Some(Duration::from_secs(10)))?; @@ -430,56 +442,55 @@ fn server_accept_connection( serial_backend.reset()?; let mut packets: VecDeque = VecDeque::new(); - - let mut buffer = [0u8; 4]; - let mut keepalive = Instant::now(); - - event_callback(ServerEvent::Connection(peer.clone())); + let mut header_position = 0; + let mut header = [0u8; 4]; loop { stream.set_nonblocking(true)?; - match reader.read_exact(&mut buffer) { - Ok(()) => { - stream.set_nonblocking(false)?; - - let data_type: DataType = u32::from_be_bytes(buffer).try_into()?; - - if !matches!(data_type, DataType::Command) { - return Err(Error::new("Received data type wasn't a command data type")); - } - - let mut id_buffer = [0u8; 1]; - let mut args = [0u32; 2]; - - reader.read_exact(&mut id_buffer)?; - reader.read_exact(&mut buffer)?; - args[0] = u32::from_be_bytes(buffer); - reader.read_exact(&mut buffer)?; - args[1] = u32::from_be_bytes(buffer); - - reader.read_exact(&mut buffer)?; - let command_data_length = u32::from_be_bytes(buffer) as usize; - let mut data = vec![0u8; command_data_length]; - reader.read_exact(&mut data)?; - - serial_backend.send_command(&Command { - id: id_buffer[0], - args, - data: &data, - })?; - - continue; + header_position += match stream.read(&mut header[header_position..4]) { + Ok(0) => { + event_callback(ServerEvent::Disconnected(peer.clone())); + return Ok(()); } - Err(error) => { - if error.kind() != ErrorKind::WouldBlock { - event_callback(ServerEvent::Disconnected(peer.clone())); - return Ok(()); - } - stream.set_nonblocking(false)?; + Ok(length) => length, + Err(error) => match error.kind() { + ErrorKind::WouldBlock => 0, + _ => return Err(error.into()), + }, + }; + stream.set_nonblocking(false)?; + + if header_position == 4 { + header_position = 0; + + let data_type: DataType = u32::from_be_bytes(header).try_into()?; + + if !matches!(data_type, DataType::Command) { + return Err(Error::new("Received data type wasn't a command data type")); } - } - if let Some(response) = + + let mut buffer = [0u8; 4]; + let mut id_buffer = [0u8; 1]; + let mut args = [0u32; 2]; + + reader.read_exact(&mut id_buffer)?; + reader.read_exact(&mut buffer)?; + args[0] = u32::from_be_bytes(buffer); + reader.read_exact(&mut buffer)?; + args[1] = u32::from_be_bytes(buffer); + + reader.read_exact(&mut buffer)?; + let command_data_length = u32::from_be_bytes(buffer) as usize; + let mut data = vec![0u8; command_data_length]; + reader.read_exact(&mut data)?; + + serial_backend.send_command(&Command { + id: id_buffer[0], + args, + data: &data, + })?; + } else if let Some(response) = serial_backend.process_incoming_data(DataType::Packet, &mut packets)? { writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?;