From b915a7b4e30569dafdfdb521a1d8461d1b25409b Mon Sep 17 00:00:00 2001 From: Mateusz Faderewski Date: Tue, 2 Jul 2024 23:06:42 +0200 Subject: [PATCH] server improvements --- sw/deployer/src/sc64/link.rs | 2 +- sw/deployer/src/sc64/server.rs | 130 ++++++++++++++------------------- 2 files changed, 54 insertions(+), 78 deletions(-) diff --git a/sw/deployer/src/sc64/link.rs b/sw/deployer/src/sc64/link.rs index e82104d..029870e 100644 --- a/sw/deployer/src/sc64/link.rs +++ b/sw/deployer/src/sc64/link.rs @@ -64,7 +64,7 @@ const SERIAL_PREFIX: &str = "serial://"; const FTDI_PREFIX: &str = "ftdi://"; const RESET_TIMEOUT: Duration = Duration::from_secs(1); -const POLL_TIMEOUT: Duration = Duration::from_millis(10); +const POLL_TIMEOUT: Duration = Duration::from_millis(5); const READ_TIMEOUT: Duration = Duration::from_secs(5); const WRITE_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/sw/deployer/src/sc64/server.rs b/sw/deployer/src/sc64/server.rs index 42af2c2..735d70f 100644 --- a/sw/deployer/src/sc64/server.rs +++ b/sw/deployer/src/sc64/server.rs @@ -4,11 +4,7 @@ use super::{ list_local_devices, new_local, AsynchronousPacket, Command, DataType, Response, UsbPacket, }, }; -use std::{ - io::{Read, Write}, - net::{TcpListener, TcpStream}, - time::{Duration, Instant}, -}; +use std::io::{Read, Write}; pub enum ServerEvent { Listening(String), @@ -18,60 +14,40 @@ pub enum ServerEvent { } struct StreamHandler { - stream: TcpStream, + stream: std::net::TcpStream, + reader: std::io::BufReader, + writer: std::io::BufWriter, } -const POLL_TIMEOUT: Duration = Duration::from_millis(1); -const READ_TIMEOUT: Duration = Duration::from_secs(5); -const WRITE_TIMEOUT: Duration = Duration::from_secs(5); -const KEEPALIVE_PERIOD: Duration = Duration::from_secs(5); +const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); +const WRITE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); +const KEEPALIVE_PERIOD: std::time::Duration = std::time::Duration::from_secs(5); impl StreamHandler { - fn new(stream: TcpStream) -> std::io::Result { + fn new(stream: std::net::TcpStream) -> std::io::Result { + let reader = std::io::BufReader::new(stream.try_clone()?); + let writer = std::io::BufWriter::new(stream.try_clone()?); stream.set_read_timeout(Some(READ_TIMEOUT))?; stream.set_write_timeout(Some(WRITE_TIMEOUT))?; - Ok(StreamHandler { stream }) - } - - fn try_read_exact(&mut self, buffer: &mut [u8]) -> std::io::Result> { - let mut position = 0; - let length = buffer.len(); - let timeout = Instant::now(); - - self.stream.set_read_timeout(Some(POLL_TIMEOUT))?; - - while position < length { - match self.stream.read(&mut buffer[position..length]) { - Ok(0) => return Err(std::io::ErrorKind::UnexpectedEof.into()), - Ok(bytes) => position += bytes, - Err(error) => match error.kind() { - std::io::ErrorKind::Interrupted - | std::io::ErrorKind::TimedOut - | std::io::ErrorKind::WouldBlock => { - if position == 0 { - break; - } - } - _ => return Err(error), - }, - } - if timeout.elapsed() > READ_TIMEOUT { - return Err(std::io::ErrorKind::TimedOut.into()); - } - } - - self.stream.set_read_timeout(Some(READ_TIMEOUT))?; - - if position > 0 { - Ok(Some(())) - } else { - Ok(None) - } + Ok(StreamHandler { + stream, + reader, + writer, + }) } fn try_read_header(&mut self) -> std::io::Result> { + self.stream.set_nonblocking(true)?; let mut header = [0u8; 4]; - Ok(self.try_read_exact(&mut header)?.map(|_| header)) + let result = match self.reader.read_exact(&mut header) { + Ok(()) => Ok(Some(header)), + Err(error) => match error.kind() { + std::io::ErrorKind::WouldBlock => Ok(None), + _ => Err(error), + }, + }; + self.stream.set_nonblocking(false)?; + result } fn receive_command(&mut self) -> std::io::Result> { @@ -88,18 +64,18 @@ impl StreamHandler { let mut id_buffer = [0u8; 1]; let mut args = [0u32; 2]; - self.stream.read_exact(&mut id_buffer)?; + self.reader.read_exact(&mut id_buffer)?; let id = id_buffer[0]; - self.stream.read_exact(&mut buffer)?; + self.reader.read_exact(&mut buffer)?; args[0] = u32::from_be_bytes(buffer); - self.stream.read_exact(&mut buffer)?; + self.reader.read_exact(&mut buffer)?; args[1] = u32::from_be_bytes(buffer); - self.stream.read_exact(&mut buffer)?; + self.reader.read_exact(&mut buffer)?; let command_data_length = u32::from_be_bytes(buffer) as usize; let mut data = vec![0u8; command_data_length]; - self.stream.read_exact(&mut data)?; + self.reader.read_exact(&mut data)?; Ok(Some(Command { id, args, data })) } else { @@ -108,32 +84,32 @@ impl StreamHandler { } fn send_response(&mut self, response: Response) -> std::io::Result<()> { - self.stream + self.writer .write_all(&u32::to_be_bytes(DataType::Response.into()))?; - self.stream.write_all(&[response.id])?; - self.stream.write_all(&[response.error as u8])?; - self.stream + self.writer.write_all(&[response.id])?; + self.writer.write_all(&[response.error as u8])?; + self.writer .write_all(&(response.data.len() as u32).to_be_bytes())?; - self.stream.write_all(&response.data)?; - self.stream.flush()?; + self.writer.write_all(&response.data)?; + self.writer.flush()?; Ok(()) } fn send_packet(&mut self, packet: AsynchronousPacket) -> std::io::Result<()> { - self.stream + self.writer .write_all(&u32::to_be_bytes(DataType::Packet.into()))?; - self.stream.write_all(&[packet.id])?; - self.stream + self.writer.write_all(&[packet.id])?; + self.writer .write_all(&(packet.data.len() as u32).to_be_bytes())?; - self.stream.write_all(&packet.data)?; - self.stream.flush()?; + self.writer.write_all(&packet.data)?; + self.writer.flush()?; Ok(()) } fn send_keepalive(&mut self) -> std::io::Result<()> { - self.stream + self.writer .write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?; - self.stream.flush()?; + self.writer.flush()?; Ok(()) } } @@ -141,16 +117,9 @@ impl StreamHandler { fn server_accept_connection(port: String, connection: &mut StreamHandler) -> Result<(), Error> { let mut link = new_local(&port)?; - let mut keepalive = Instant::now(); + let mut keepalive = std::time::Instant::now(); loop { - while let Some(usb_packet) = link.receive_response_or_packet()? { - match usb_packet { - UsbPacket::Response(response) => connection.send_response(response)?, - UsbPacket::AsynchronousPacket(packet) => connection.send_packet(packet)?, - } - } - match connection.receive_command() { Ok(Some(command)) => { link.execute_command_raw(&command, true, true)?; @@ -162,8 +131,15 @@ fn server_accept_connection(port: String, connection: &mut StreamHandler) -> Res }, }; + if let Some(usb_packet) = link.receive_response_or_packet()? { + match usb_packet { + UsbPacket::Response(response) => connection.send_response(response)?, + UsbPacket::AsynchronousPacket(packet) => connection.send_packet(packet)?, + } + } + if keepalive.elapsed() > KEEPALIVE_PERIOD { - keepalive = Instant::now(); + keepalive = std::time::Instant::now(); connection.send_keepalive().ok(); } } @@ -175,7 +151,7 @@ pub fn run( event_callback: fn(ServerEvent), ) -> Result<(), Error> { let port = port.unwrap_or(list_local_devices()?[0].port.clone()); - let listener = TcpListener::bind(address)?; + let listener = std::net::TcpListener::bind(address)?; let listening_address = listener.local_addr()?; event_callback(ServerEvent::Listening(listening_address.to_string()));