diff --git a/sw/deployer/src/sc64/link.rs b/sw/deployer/src/sc64/link.rs index 245d587..1a14b0b 100644 --- a/sw/deployer/src/sc64/link.rs +++ b/sw/deployer/src/sc64/link.rs @@ -54,11 +54,16 @@ pub struct Response { pub error: bool, } -pub struct Packet { +pub struct AsynchronousPacket { pub id: u8, pub data: Vec, } +pub enum UsbPacket { + Response(Response), + AsynchronousPacket(AsynchronousPacket), +} + const SERIAL_PREFIX: &str = "serial://"; const FTDI_PREFIX: &str = "ftdi://"; @@ -68,16 +73,18 @@ const READ_TIMEOUT: Duration = Duration::from_secs(5); const WRITE_TIMEOUT: Duration = Duration::from_secs(5); pub trait Backend { - fn reset(&mut self) -> Result<(), Error>; - - fn close(&self); - fn read(&mut self, buffer: &mut [u8]) -> std::io::Result; fn write(&mut self, buffer: &[u8]) -> std::io::Result<()>; fn flush(&mut self) -> std::io::Result<()>; + fn reset(&mut self) -> Result<(), Error> { + Ok(()) + } + + fn close(&mut self) {} + fn purge_incoming_data(&mut self) -> std::io::Result<()> { let timeout = Instant::now(); loop { @@ -100,7 +107,7 @@ pub trait Backend { } } - fn try_read(&mut self, buffer: &mut [u8], block: bool) -> Result, Error> { + fn try_read_exact(&mut self, buffer: &mut [u8], block: bool) -> Result, Error> { let mut position = 0; let length = buffer.len(); let timeout = Instant::now(); @@ -126,11 +133,11 @@ pub trait Backend { fn try_read_header(&mut self, block: bool) -> Result, Error> { let mut header = [0u8; 4]; - Ok(self.try_read(&mut header, block)?.map(|_| header)) + Ok(self.try_read_exact(&mut header, block)?.map(|_| header)) } fn read_exact(&mut self, buffer: &mut [u8]) -> Result<(), Error> { - match self.try_read(buffer, true)? { + match self.try_read_exact(buffer, true)? { Some(()) => Ok(()), None => Err(Error::new("Unexpected end of data")), } @@ -153,7 +160,7 @@ pub trait Backend { fn process_incoming_data( &mut self, data_type: DataType, - packets: &mut VecDeque, + packets: &mut VecDeque, ) -> Result, Error> { let block = matches!(data_type, DataType::Response); @@ -175,7 +182,7 @@ pub trait Backend { self.read_exact(&mut data)?; if packet_token { - packets.push_back(Packet { id, data }); + packets.push_back(AsynchronousPacket { id, data }); if matches!(data_type, DataType::Packet) { break; } @@ -193,6 +200,18 @@ pub struct SerialBackend { } impl Backend for SerialBackend { + fn read(&mut self, buffer: &mut [u8]) -> std::io::Result { + self.device.read(buffer) + } + + fn write(&mut self, buffer: &[u8]) -> std::io::Result<()> { + self.device.write_all(buffer) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.device.flush() + } + fn reset(&mut self) -> Result<(), Error> { self.device.set_dtr(true)?; let timeout = Instant::now(); @@ -221,20 +240,6 @@ impl Backend for SerialBackend { Ok(()) } - - fn close(&self) {} - - fn read(&mut self, buffer: &mut [u8]) -> std::io::Result { - self.device.read(buffer) - } - - fn write(&mut self, buffer: &[u8]) -> std::io::Result<()> { - self.device.write_all(buffer) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.device.flush() - } } fn new_serial_backend(port: &str) -> std::io::Result { @@ -249,6 +254,18 @@ struct FtdiBackend { } impl Backend for FtdiBackend { + fn read(&mut self, buffer: &mut [u8]) -> std::io::Result { + self.device.read(buffer) + } + + fn write(&mut self, buffer: &[u8]) -> std::io::Result<()> { + self.device.write_all(buffer) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + fn reset(&mut self) -> Result<(), Error> { self.device.set_dtr(true)?; let timeout = Instant::now(); @@ -277,20 +294,6 @@ impl Backend for FtdiBackend { Ok(()) } - - fn close(&self) {} - - fn read(&mut self, buffer: &mut [u8]) -> std::io::Result { - self.device.read(buffer) - } - - fn write(&mut self, buffer: &[u8]) -> std::io::Result<()> { - self.device.write_all(buffer) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } } fn new_ftdi_backend(port: &str) -> Result { @@ -306,14 +309,6 @@ struct TcpBackend { } impl Backend for TcpBackend { - fn reset(&mut self) -> Result<(), Error> { - Ok(()) - } - - fn close(&self) { - self.stream.shutdown(std::net::Shutdown::Both).ok(); - } - fn read(&mut self, buffer: &mut [u8]) -> std::io::Result { self.reader.read(buffer) } @@ -326,6 +321,10 @@ impl Backend for TcpBackend { self.writer.flush() } + fn close(&mut self) { + self.stream.shutdown(std::net::Shutdown::Both).ok(); + } + fn send_command(&mut self, command: &Command) -> Result<(), Error> { let payload_data_type: u32 = DataType::Command.into(); self.write(&payload_data_type.to_be_bytes())?; @@ -346,7 +345,7 @@ impl Backend for TcpBackend { fn process_incoming_data( &mut self, data_type: DataType, - packets: &mut VecDeque, + packets: &mut VecDeque, ) -> Result, Error> { let block = matches!(data_type, DataType::Response); while let Some(header) = self.try_read_header(block)? { @@ -379,7 +378,7 @@ impl Backend for TcpBackend { let mut data = vec![0u8; packet_data_length]; self.read_exact(&mut data)?; - packets.push_back(Packet { + packets.push_back(AsynchronousPacket { id: packet_info[0], data, }); @@ -397,18 +396,11 @@ impl Backend for TcpBackend { } fn new_tcp_backend(address: &str) -> Result { - let stream = match TcpStream::connect(address) { - Ok(stream) => { - stream.set_write_timeout(Some(WRITE_TIMEOUT))?; - stream.set_read_timeout(Some(POLL_TIMEOUT))?; - stream - } - Err(error) => { - return Err(Error::new( - format!("Couldn't connect to [{address}]: {error}").as_str(), - )) - } - }; + let stream = TcpStream::connect(address).map_err(|error| { + Error::new(format!("Couldn't connect to [{address}]: {error}").as_str()) + })?; + stream.set_write_timeout(Some(WRITE_TIMEOUT))?; + stream.set_read_timeout(Some(POLL_TIMEOUT))?; let reader = BufReader::new(stream.try_clone()?); let writer = BufWriter::new(stream.try_clone()?); Ok(TcpBackend { @@ -439,8 +431,8 @@ fn new_remote_backend(address: &str) -> Result, Error> { } pub struct Link { - pub backend: Box, - packets: VecDeque, + backend: Box, + packets: VecDeque, } impl Link { @@ -468,7 +460,7 @@ impl Link { Ok(response.data) } - fn receive_response(&mut self) -> Result { + pub fn receive_response(&mut self) -> Result { match self .backend .process_incoming_data(DataType::Response, &mut self.packets) @@ -483,7 +475,7 @@ impl Link { } } - pub fn receive_packet(&mut self) -> Result, Error> { + pub fn receive_packet(&mut self) -> Result, Error> { if self.packets.len() == 0 { let response = self .backend @@ -494,6 +486,19 @@ impl Link { } Ok(self.packets.pop_front()) } + + pub fn receive_response_or_packet(&mut self) -> Result, Error> { + let response = self + .backend + .process_incoming_data(DataType::Packet, &mut self.packets)?; + if let Some(response) = response { + return Ok(Some(UsbPacket::Response(response))); + } + if let Some(packet) = self.packets.pop_front() { + return Ok(Some(UsbPacket::AsynchronousPacket(packet))); + } + Ok(None) + } } impl Drop for Link { diff --git a/sw/deployer/src/sc64/mod.rs b/sw/deployer/src/sc64/mod.rs index 9bb83dc..7f0ce8f 100644 --- a/sw/deployer/src/sc64/mod.rs +++ b/sw/deployer/src/sc64/mod.rs @@ -876,13 +876,8 @@ impl SC64 { impl SC64 { pub fn open_local(port: Option) -> Result { - let port = if let Some(port) = port { - port - } else { - list_local_devices()?[0].port.clone() - }; let mut sc64 = SC64 { - link: link::new_local(&port)?, + link: link::new_local(&port.unwrap_or(list_local_devices()?[0].port.clone()))?, }; sc64.check_device()?; Ok(sc64) diff --git a/sw/deployer/src/sc64/server.rs b/sw/deployer/src/sc64/server.rs index 7cce340..645cc61 100644 --- a/sw/deployer/src/sc64/server.rs +++ b/sw/deployer/src/sc64/server.rs @@ -1,5 +1,14 @@ -use super::{error::Error, link::list_local_devices}; -use std::net::{TcpListener, TcpStream}; +use super::{ + error::Error, + link::{ + list_local_devices, new_local, AsynchronousPacket, Command, DataType, Response, UsbPacket, + }, +}; +use std::{ + io::{BufReader, BufWriter, Read, Write}, + net::{TcpListener, TcpStream}, + time::{Duration, Instant}, +}; pub enum ServerEvent { Listening(String), @@ -8,241 +17,188 @@ pub enum ServerEvent { Err(String), } +struct StreamHandler { + stream: TcpStream, + reader: BufReader, + writer: BufWriter, +} + +const POLL_TIMEOUT: Duration = Duration::from_millis(1); +const READ_TIMEOUT: Duration = Duration::from_secs(5); +const WRITE_TIMEOUT: Duration = Duration::from_secs(5); +const KEEPALIVE_PERIOD: Duration = Duration::from_secs(5); + +impl StreamHandler { + fn new(stream: TcpStream) -> std::io::Result { + let reader = BufReader::new(stream.try_clone()?); + let writer = BufWriter::new(stream.try_clone()?); + stream.set_read_timeout(Some(READ_TIMEOUT))?; + stream.set_write_timeout(Some(WRITE_TIMEOUT))?; + Ok(StreamHandler { + stream, + reader, + writer, + }) + } + + fn try_read_exact(&mut self, buffer: &mut [u8]) -> std::io::Result> { + let mut position = 0; + let length = buffer.len(); + let timeout = Instant::now(); + + self.stream.set_read_timeout(Some(POLL_TIMEOUT))?; + + while position < length { + match self.reader.read(&mut buffer[position..length]) { + Ok(0) => return Err(std::io::ErrorKind::UnexpectedEof.into()), + Ok(bytes) => position += bytes, + Err(error) => match error.kind() { + std::io::ErrorKind::Interrupted + | std::io::ErrorKind::TimedOut + | std::io::ErrorKind::WouldBlock => { + if position == 0 { + break; + } + } + _ => return Err(error), + }, + } + if timeout.elapsed() > READ_TIMEOUT { + return Err(std::io::ErrorKind::TimedOut.into()); + } + } + + self.stream.set_read_timeout(Some(READ_TIMEOUT))?; + + if position > 0 { + Ok(Some(())) + } else { + Ok(None) + } + } + + fn try_read_header(&mut self) -> std::io::Result> { + let mut header = [0u8; 4]; + Ok(self.try_read_exact(&mut header)?.map(|_| header)) + } + + fn receive_command(&mut self) -> std::io::Result> { + if let Some(header) = self.try_read_header()? { + if let Ok(data_type) = TryInto::::try_into(u32::from_be_bytes(header)) { + if !matches!(data_type, DataType::Command) { + return Err(std::io::Error::other( + "Received data type was not a command data type", + )); + } + } + + let mut buffer = [0u8; 4]; + let mut id_buffer = [0u8; 1]; + let mut args = [0u32; 2]; + + self.reader.read_exact(&mut id_buffer)?; + let id = id_buffer[0]; + + self.reader.read_exact(&mut buffer)?; + args[0] = u32::from_be_bytes(buffer); + self.reader.read_exact(&mut buffer)?; + args[1] = u32::from_be_bytes(buffer); + + self.reader.read_exact(&mut buffer)?; + let command_data_length = u32::from_be_bytes(buffer) as usize; + let mut data = vec![0u8; command_data_length]; + self.reader.read_exact(&mut data)?; + + Ok(Some(Command { id, args, data })) + } else { + Ok(None) + } + } + + fn send_response(&mut self, response: Response) -> std::io::Result<()> { + self.writer + .write_all(&u32::to_be_bytes(DataType::Response.into()))?; + self.writer.write_all(&[response.id])?; + self.writer.write_all(&[response.error as u8])?; + self.writer + .write_all(&(response.data.len() as u32).to_be_bytes())?; + self.writer.write_all(&response.data)?; + self.writer.flush()?; + Ok(()) + } + + fn send_packet(&mut self, packet: AsynchronousPacket) -> std::io::Result<()> { + self.writer + .write_all(&u32::to_be_bytes(DataType::Packet.into()))?; + self.writer.write_all(&[packet.id])?; + self.writer + .write_all(&(packet.data.len() as u32).to_be_bytes())?; + self.writer.write_all(&packet.data)?; + self.writer.flush()?; + Ok(()) + } + + fn send_keepalive(&mut self) -> std::io::Result<()> { + self.writer + .write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?; + self.writer.flush()?; + Ok(()) + } +} + +fn server_accept_connection(port: String, connection: &mut StreamHandler) -> Result<(), Error> { + let mut link = new_local(&port)?; + + let mut keepalive = Instant::now(); + + loop { + while let Some(usb_packet) = link.receive_response_or_packet()? { + match usb_packet { + UsbPacket::Response(response) => connection.send_response(response)?, + UsbPacket::AsynchronousPacket(packet) => connection.send_packet(packet)?, + } + } + + match connection.receive_command() { + Ok(Some(command)) => { + link.execute_command_raw(&command, true, true)?; + } + Ok(None) => {} + Err(error) => match error.kind() { + std::io::ErrorKind::UnexpectedEof => return Ok(()), + _ => return Err(error.into()), + }, + }; + + if keepalive.elapsed() > KEEPALIVE_PERIOD { + keepalive = Instant::now(); + connection.send_keepalive().ok(); + } + } +} + pub fn run( port: Option, address: String, event_callback: fn(ServerEvent), ) -> Result<(), Error> { - let port = if let Some(port) = port { - port - } else { - list_local_devices()?[0].port.clone() - }; + let port = port.unwrap_or(list_local_devices()?[0].port.clone()); let listener = TcpListener::bind(address)?; let listening_address = listener.local_addr()?; + event_callback(ServerEvent::Listening(listening_address.to_string())); - for stream in listener.incoming() { - match stream { - Ok(mut stream) => { - let peer = stream.peer_addr()?.to_string(); - event_callback(ServerEvent::Connected(peer.clone())); - match server_accept_connection(port.clone(), &mut stream) { - Ok(()) => event_callback(ServerEvent::Disconnected(peer.clone())), - Err(error) => event_callback(ServerEvent::Err(error.to_string())), - } - } - Err(error) => match error.kind() { - _ => return Err(error.into()), - }, + for incoming in listener.incoming() { + let stream = incoming?; + let peer = stream.peer_addr()?.to_string(); + + event_callback(ServerEvent::Connected(peer.clone())); + + match server_accept_connection(port.clone(), &mut StreamHandler::new(stream)?) { + Ok(()) => event_callback(ServerEvent::Disconnected(peer.clone())), + Err(error) => event_callback(ServerEvent::Err(error.to_string())), } } Ok(()) } - -// enum Event { -// Command(Command), -// Response(Response), -// Packet(Packet), -// KeepAlive, -// Closed(Option), -// } - -fn server_accept_connection(_port: String, _stream: &mut TcpStream) -> Result<(), Error> { - // let (event_sender, event_receiver) = channel::(); - // let exit_flag = Arc::new(AtomicBool::new(false)); - - // let mut stream_writer = BufWriter::new(stream.try_clone()?); - // let mut stream_reader = stream.try_clone()?; - - // let serial = Arc::new(new_local(&port)?); - // let serial_writer = serial.clone(); - // let serial_reader = serial.clone(); - - // let stream_event_sender = event_sender.clone(); - // let stream_exit_flag = exit_flag.clone(); - // let stream_thread = thread::spawn(move || { - // let closed_sender = stream_event_sender.clone(); - // match server_stream_thread(&mut stream_reader, stream_event_sender, stream_exit_flag) { - // Ok(()) => closed_sender.send(Event::Closed(None)), - // Err(error) => closed_sender.send(Event::Closed(Some(error))), - // } - // .ok(); - // }); - - // let serial_event_sender = event_sender.clone(); - // let serial_exit_flag = exit_flag.clone(); - // let serial_thread = thread::spawn(move || { - // let closed_sender = serial_event_sender.clone(); - // match server_serial_thread(serial_reader, serial_event_sender, serial_exit_flag) { - // Ok(()) => closed_sender.send(Event::Closed(None)), - // Err(error) => closed_sender.send(Event::Closed(Some(error))), - // } - // .ok(); - // }); - - // let keepalive_event_sender = event_sender.clone(); - // let keepalive_exit_flag = exit_flag.clone(); - // let keepalive_thread = thread::spawn(move || { - // server_keepalive_thread(keepalive_event_sender, keepalive_exit_flag); - // }); - - // let result = server_process_events(&mut stream_writer, serial_writer, event_receiver); - - // exit_flag.store(true, Ordering::Relaxed); - // stream_thread.join().ok(); - // serial_thread.join().ok(); - // keepalive_thread.join().ok(); - - // result - Ok(()) -} - -// fn server_process_events( -// stream_writer: &mut BufWriter, -// link: &mut Link, -// event_receiver: Receiver, -// ) -> Result<(), Error> { -// for event in event_receiver.into_iter() { -// match event { -// Event::Command(command) => { -// // serial_writer.send_command(&command)?; -// // serial_writer. -// // link.execute_command(&command)?; -// link.execute_command(&command)?; -// } -// Event::Response(response) => { -// stream_writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?; -// stream_writer.write_all(&[response.id])?; -// stream_writer.write_all(&[response.error as u8])?; -// stream_writer.write_all(&(response.data.len() as u32).to_be_bytes())?; -// stream_writer.write_all(&response.data)?; -// stream_writer.flush()?; -// } -// Event::Packet(packet) => { -// stream_writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?; -// stream_writer.write_all(&[packet.id])?; -// stream_writer.write_all(&(packet.data.len() as u32).to_be_bytes())?; -// stream_writer.write_all(&packet.data)?; -// stream_writer.flush()?; -// } -// Event::KeepAlive => { -// stream_writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?; -// stream_writer.flush()?; -// } -// Event::Closed(result) => match result { -// Some(error) => return Err(error), -// None => { -// break; -// } -// }, -// } -// } - -// Ok(()) -// } - -// fn server_stream_thread( -// stream: &mut TcpStream, -// event_sender: Sender, -// exit_flag: Arc, -// ) -> Result<(), Error> { -// let mut stream_reader = BufReader::new(stream.try_clone()?); - -// let mut header = [0u8; 4]; -// let header_length = header.len(); - -// loop { -// let mut header_position = 0; - -// let timeout = stream.read_timeout()?; -// stream.set_read_timeout(Some(Duration::from_millis(10)))?; -// while header_position < header_length { -// if exit_flag.load(Ordering::Relaxed) { -// return Ok(()); -// } -// match stream_reader.read(&mut header[header_position..header_length]) { -// Ok(0) => return Ok(()), -// Ok(bytes) => header_position += bytes, -// Err(error) => match error.kind() { -// ErrorKind::Interrupted | ErrorKind::TimedOut | ErrorKind::WouldBlock => {} -// _ => return Err(error.into()), -// }, -// } -// } -// stream.set_read_timeout(timeout)?; - -// let data_type: DataType = u32::from_be_bytes(header).try_into()?; -// if !matches!(data_type, DataType::Command) { -// return Err(Error::new("Received data type was not a command data type")); -// } - -// let mut buffer = [0u8; 4]; -// let mut id_buffer = [0u8; 1]; -// let mut args = [0u32; 2]; - -// stream_reader.read_exact(&mut id_buffer)?; -// let id = id_buffer[0]; - -// stream_reader.read_exact(&mut buffer)?; -// args[0] = u32::from_be_bytes(buffer); -// stream_reader.read_exact(&mut buffer)?; -// args[1] = u32::from_be_bytes(buffer); - -// stream_reader.read_exact(&mut buffer)?; -// let command_data_length = u32::from_be_bytes(buffer) as usize; -// let mut data = vec![0u8; command_data_length]; -// stream_reader.read_exact(&mut data)?; - -// if event_sender -// .send(Event::Command(Command { id, args, data })) -// .is_err() -// { -// break; -// } -// } - -// Ok(()) -// } - -// fn server_serial_thread( -// link: &mut Link, -// event_sender: Sender, -// exit_flag: Arc, -// ) -> Result<(), Error> { -// let mut packets: VecDeque = VecDeque::new(); - -// while !exit_flag.load(Ordering::Relaxed) { -// let response = link.backend.process_incoming_data(DataType::Packet, &mut packets)?; - -// if let Some(response) = response { -// if event_sender.send(Event::Response(response)).is_err() { -// break; -// } -// } - -// if let Some(packet) = packets.pop_front() { -// if event_sender.send(Event::Packet(packet)).is_err() { -// break; -// } -// } -// } - -// Ok(()) -// } - -// fn server_keepalive_thread(event_sender: Sender, exit_flag: Arc) { -// let mut keepalive = Instant::now(); - -// while !exit_flag.load(Ordering::Relaxed) { -// if keepalive.elapsed() >= Duration::from_secs(5) { -// keepalive = Instant::now(); -// if event_sender.send(Event::KeepAlive).is_err() { -// break; -// } -// } else { -// thread::sleep(Duration::from_millis(100)); -// } -// } -// } diff --git a/sw/deployer/src/sc64/types.rs b/sw/deployer/src/sc64/types.rs index 8699207..7c2c952 100644 --- a/sw/deployer/src/sc64/types.rs +++ b/sw/deployer/src/sc64/types.rs @@ -1,4 +1,4 @@ -use super::{link::Packet, Error}; +use super::{link::AsynchronousPacket, Error}; use std::fmt::Display; #[derive(Clone, Copy)] @@ -588,9 +588,9 @@ pub enum DataPacket { UpdateStatus(UpdateStatus), } -impl TryFrom for DataPacket { +impl TryFrom for DataPacket { type Error = Error; - fn try_from(value: Packet) -> Result { + fn try_from(value: AsynchronousPacket) -> Result { Ok(match value.id { b'B' => Self::Button, b'G' => Self::DataFlushed,