From c8952b4ee8bdd99db2f1e7e4c4fc7430e83cc523 Mon Sep 17 00:00:00 2001 From: Mateusz Faderewski Date: Fri, 10 Mar 2023 12:25:26 +0100 Subject: [PATCH] deployer server fixes --- sw/deployer/src/sc64/link.rs | 60 +++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/sw/deployer/src/sc64/link.rs b/sw/deployer/src/sc64/link.rs index 69ae1eb..cbdee88 100644 --- a/sw/deployer/src/sc64/link.rs +++ b/sw/deployer/src/sc64/link.rs @@ -3,13 +3,14 @@ use std::{ collections::VecDeque, io::{BufRead, BufReader, BufWriter, ErrorKind, Read, Write}, net::{TcpListener, TcpStream}, - time::Duration, + time::{Duration, Instant}, }; enum DataType { Command, Response, Packet, + KeepAlive, } impl From for u32 { @@ -18,6 +19,7 @@ impl From for u32 { DataType::Command => 1, DataType::Response => 2, DataType::Packet => 3, + DataType::KeepAlive => 0xCAFEBEEF, } } } @@ -29,6 +31,7 @@ impl TryFrom for DataType { 1 => Self::Command, 2 => Self::Response, 3 => Self::Packet, + 0xCAFEBEEF => Self::KeepAlive, _ => return Err(Error::new("Unknown data type")), }) } @@ -150,11 +153,10 @@ impl Backend for SerialBackend { } fn new_serial_backend(port: &str) -> Result { - let mut backend = SerialBackend { - serial: serialport::new(port, 115_200) - .timeout(Duration::from_secs(10)) - .open()?, - }; + let serial = serialport::new(port, 115_200) + .timeout(Duration::from_secs(10)) + .open()?; + let mut backend = SerialBackend { serial }; backend.reset()?; Ok(backend) } @@ -248,6 +250,7 @@ impl Backend for TcpBackend { break; } } + DataType::KeepAlive => {} _ => return Err(Error::new("Unexpected payload data type received")), }; } @@ -415,10 +418,13 @@ fn server_accept_connection( event_callback: fn(ServerEvent), stream: &mut TcpStream, ) -> Result<(), Error> { + let peer = stream.peer_addr()?.to_string(); + stream.set_write_timeout(Some(Duration::from_secs(10)))?; stream.set_read_timeout(Some(Duration::from_secs(10)))?; - let peer = stream.peer_addr()?.to_string(); + let mut reader = BufReader::new(stream.try_clone()?); + let mut writer = BufWriter::new(stream.try_clone()?); let mut serial_backend = new_serial_backend(port)?; serial_backend.reset()?; @@ -427,11 +433,13 @@ fn server_accept_connection( let mut buffer = [0u8; 4]; + let mut keepalive = Instant::now(); + event_callback(ServerEvent::Connection(peer.clone())); loop { stream.set_nonblocking(true)?; - match stream.read_exact(&mut buffer) { + match reader.read_exact(&mut buffer) { Ok(()) => { stream.set_nonblocking(false)?; @@ -444,16 +452,16 @@ fn server_accept_connection( let mut id_buffer = [0u8; 1]; let mut args = [0u32; 2]; - stream.read_exact(&mut id_buffer)?; - stream.read_exact(&mut buffer)?; + reader.read_exact(&mut id_buffer)?; + reader.read_exact(&mut buffer)?; args[0] = u32::from_be_bytes(buffer); - stream.read_exact(&mut buffer)?; + reader.read_exact(&mut buffer)?; args[1] = u32::from_be_bytes(buffer); - stream.read_exact(&mut buffer)?; + reader.read_exact(&mut buffer)?; let command_data_length = u32::from_be_bytes(buffer) as usize; let mut data = vec![0u8; command_data_length]; - stream.read_exact(&mut data)?; + reader.read_exact(&mut data)?; serial_backend.send_command(&Command { id: id_buffer[0], @@ -474,18 +482,22 @@ fn server_accept_connection( if let Some(response) = serial_backend.process_incoming_data(DataType::Packet, &mut packets)? { - stream.write_all(&u32::to_be_bytes(DataType::Response.into()))?; - stream.write_all(&[response.id])?; - stream.write_all(&[response.error as u8])?; - stream.write_all(&(response.data.len() as u32).to_be_bytes())?; - stream.write_all(&response.data)?; - stream.flush()?; + writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?; + writer.write_all(&[response.id])?; + writer.write_all(&[response.error as u8])?; + writer.write_all(&(response.data.len() as u32).to_be_bytes())?; + writer.write_all(&response.data)?; + writer.flush()?; } else if let Some(packet) = packets.pop_front() { - stream.write_all(&u32::to_be_bytes(DataType::Packet.into()))?; - stream.write_all(&[packet.id])?; - stream.write_all(&(packet.data.len() as u32).to_be_bytes())?; - stream.write_all(&packet.data)?; - stream.flush()?; + writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?; + writer.write_all(&[packet.id])?; + writer.write_all(&(packet.data.len() as u32).to_be_bytes())?; + writer.write_all(&packet.data)?; + writer.flush()?; + } else if keepalive.elapsed() > Duration::from_secs(5) { + keepalive = Instant::now(); + writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?; + writer.flush()?; } else { std::thread::sleep(Duration::from_millis(1)); }