deployer server fixes

This commit is contained in:
Mateusz Faderewski 2023-03-10 12:25:26 +01:00
parent 3c2128b811
commit c8952b4ee8

View File

@ -3,13 +3,14 @@ use std::{
collections::VecDeque, collections::VecDeque,
io::{BufRead, BufReader, BufWriter, ErrorKind, Read, Write}, io::{BufRead, BufReader, BufWriter, ErrorKind, Read, Write},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
time::Duration, time::{Duration, Instant},
}; };
enum DataType { enum DataType {
Command, Command,
Response, Response,
Packet, Packet,
KeepAlive,
} }
impl From<DataType> for u32 { impl From<DataType> for u32 {
@ -18,6 +19,7 @@ impl From<DataType> for u32 {
DataType::Command => 1, DataType::Command => 1,
DataType::Response => 2, DataType::Response => 2,
DataType::Packet => 3, DataType::Packet => 3,
DataType::KeepAlive => 0xCAFEBEEF,
} }
} }
} }
@ -29,6 +31,7 @@ impl TryFrom<u32> for DataType {
1 => Self::Command, 1 => Self::Command,
2 => Self::Response, 2 => Self::Response,
3 => Self::Packet, 3 => Self::Packet,
0xCAFEBEEF => Self::KeepAlive,
_ => return Err(Error::new("Unknown data type")), _ => return Err(Error::new("Unknown data type")),
}) })
} }
@ -150,11 +153,10 @@ impl Backend for SerialBackend {
} }
fn new_serial_backend(port: &str) -> Result<SerialBackend, Error> { fn new_serial_backend(port: &str) -> Result<SerialBackend, Error> {
let mut backend = SerialBackend { let serial = serialport::new(port, 115_200)
serial: serialport::new(port, 115_200) .timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10)) .open()?;
.open()?, let mut backend = SerialBackend { serial };
};
backend.reset()?; backend.reset()?;
Ok(backend) Ok(backend)
} }
@ -248,6 +250,7 @@ impl Backend for TcpBackend {
break; break;
} }
} }
DataType::KeepAlive => {}
_ => return Err(Error::new("Unexpected payload data type received")), _ => return Err(Error::new("Unexpected payload data type received")),
}; };
} }
@ -415,10 +418,13 @@ fn server_accept_connection(
event_callback: fn(ServerEvent), event_callback: fn(ServerEvent),
stream: &mut TcpStream, stream: &mut TcpStream,
) -> Result<(), Error> { ) -> Result<(), Error> {
let peer = stream.peer_addr()?.to_string();
stream.set_write_timeout(Some(Duration::from_secs(10)))?; stream.set_write_timeout(Some(Duration::from_secs(10)))?;
stream.set_read_timeout(Some(Duration::from_secs(10)))?; stream.set_read_timeout(Some(Duration::from_secs(10)))?;
let peer = stream.peer_addr()?.to_string(); let mut reader = BufReader::new(stream.try_clone()?);
let mut writer = BufWriter::new(stream.try_clone()?);
let mut serial_backend = new_serial_backend(port)?; let mut serial_backend = new_serial_backend(port)?;
serial_backend.reset()?; serial_backend.reset()?;
@ -427,11 +433,13 @@ fn server_accept_connection(
let mut buffer = [0u8; 4]; let mut buffer = [0u8; 4];
let mut keepalive = Instant::now();
event_callback(ServerEvent::Connection(peer.clone())); event_callback(ServerEvent::Connection(peer.clone()));
loop { loop {
stream.set_nonblocking(true)?; stream.set_nonblocking(true)?;
match stream.read_exact(&mut buffer) { match reader.read_exact(&mut buffer) {
Ok(()) => { Ok(()) => {
stream.set_nonblocking(false)?; stream.set_nonblocking(false)?;
@ -444,16 +452,16 @@ fn server_accept_connection(
let mut id_buffer = [0u8; 1]; let mut id_buffer = [0u8; 1];
let mut args = [0u32; 2]; let mut args = [0u32; 2];
stream.read_exact(&mut id_buffer)?; reader.read_exact(&mut id_buffer)?;
stream.read_exact(&mut buffer)?; reader.read_exact(&mut buffer)?;
args[0] = u32::from_be_bytes(buffer); args[0] = u32::from_be_bytes(buffer);
stream.read_exact(&mut buffer)?; reader.read_exact(&mut buffer)?;
args[1] = u32::from_be_bytes(buffer); args[1] = u32::from_be_bytes(buffer);
stream.read_exact(&mut buffer)?; reader.read_exact(&mut buffer)?;
let command_data_length = u32::from_be_bytes(buffer) as usize; let command_data_length = u32::from_be_bytes(buffer) as usize;
let mut data = vec![0u8; command_data_length]; let mut data = vec![0u8; command_data_length];
stream.read_exact(&mut data)?; reader.read_exact(&mut data)?;
serial_backend.send_command(&Command { serial_backend.send_command(&Command {
id: id_buffer[0], id: id_buffer[0],
@ -474,18 +482,22 @@ fn server_accept_connection(
if let Some(response) = if let Some(response) =
serial_backend.process_incoming_data(DataType::Packet, &mut packets)? serial_backend.process_incoming_data(DataType::Packet, &mut packets)?
{ {
stream.write_all(&u32::to_be_bytes(DataType::Response.into()))?; writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?;
stream.write_all(&[response.id])?; writer.write_all(&[response.id])?;
stream.write_all(&[response.error as u8])?; writer.write_all(&[response.error as u8])?;
stream.write_all(&(response.data.len() as u32).to_be_bytes())?; writer.write_all(&(response.data.len() as u32).to_be_bytes())?;
stream.write_all(&response.data)?; writer.write_all(&response.data)?;
stream.flush()?; writer.flush()?;
} else if let Some(packet) = packets.pop_front() { } else if let Some(packet) = packets.pop_front() {
stream.write_all(&u32::to_be_bytes(DataType::Packet.into()))?; writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?;
stream.write_all(&[packet.id])?; writer.write_all(&[packet.id])?;
stream.write_all(&(packet.data.len() as u32).to_be_bytes())?; writer.write_all(&(packet.data.len() as u32).to_be_bytes())?;
stream.write_all(&packet.data)?; writer.write_all(&packet.data)?;
stream.flush()?; writer.flush()?;
} else if keepalive.elapsed() > Duration::from_secs(5) {
keepalive = Instant::now();
writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?;
writer.flush()?;
} else { } else {
std::thread::sleep(Duration::from_millis(1)); std::thread::sleep(Duration::from_millis(1));
} }