From b942d33ec6d01a300fadf199a9d6808e94acf6fa Mon Sep 17 00:00:00 2001 From: Mateusz Faderewski Date: Thu, 16 Mar 2023 14:37:16 +0100 Subject: [PATCH] server performance increased --- sw/deployer/Cargo.lock | 119 +++++---- sw/deployer/Cargo.toml | 3 +- sw/deployer/src/main.rs | 75 +++--- sw/deployer/src/sc64/error.rs | 2 +- sw/deployer/src/sc64/link.rs | 483 +++++++++++++++++++++++----------- sw/deployer/src/sc64/mod.rs | 10 +- sw/deployer/src/sc64/types.rs | 4 +- 7 files changed, 452 insertions(+), 244 deletions(-) diff --git a/sw/deployer/Cargo.lock b/sw/deployer/Cargo.lock index 8fd0126..686903a 100644 --- a/sw/deployer/Cargo.lock +++ b/sw/deployer/Cargo.lock @@ -9,7 +9,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0e9889e6db118d49d88d84728d0e964d973a5680befb5f85f55141beea5c20b" dependencies = [ "libc", - "mach", + "mach 0.1.2", ] [[package]] @@ -20,7 +20,7 @@ checksum = "99696c398cbaf669d2368076bdb3d627fb0ce51a26899d7c61228c5c0af3bf4a" dependencies = [ "CoreFoundation-sys", "libc", - "mach", + "mach 0.1.2", ] [[package]] @@ -125,9 +125,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" dependencies = [ "iana-time-zone", "js-sys", @@ -281,7 +281,7 @@ version = "3.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbcf33c2a618cbe41ee43ae6e9f2e48368cd9f9db2896f10167d8d762679f639" dependencies = [ - "nix", + "nix 0.26.2", "windows-sys", ] @@ -373,17 +373,17 @@ dependencies = [ [[package]] name = "exr" -version = "1.5.3" +version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8af5ef47e2ed89d23d0ecbc1b681b30390069de70260937877514377fc24feb" +checksum = "bdd2162b720141a91a054640662d3edce3d50a944a50ffca5313cd951abb35b4" dependencies = [ "bit_field", "flume", "half", "lebe", "miniz_oxide", + "rayon-core", "smallvec", - "threadpool", "zune-inflate", ] @@ -412,15 +412,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-sink" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" +checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" [[package]] name = "getrandom" @@ -718,10 +718,10 @@ dependencies = [ ] [[package]] -name = "mach2" -version = "0.4.1" +name = "mach" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" dependencies = [ "libc", ] @@ -765,6 +765,17 @@ dependencies = [ "getrandom", ] +[[package]] +name = "nix" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa52e972a9a719cecb6864fb88568781eb706bac2cd1d4f04a648542dbf78069" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "nix" version = "0.26.2" @@ -915,18 +926,18 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.51" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +checksum = "1d0e1ae9e836cc3beddd63db0df682593d7e2d3d891ae8c9083d2113e1744224" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.23" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] @@ -1017,6 +1028,7 @@ dependencies = [ "md5", "panic-message", "rust-ini", + "serial2", "serialport", ] @@ -1038,27 +1050,39 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" +[[package]] +name = "serial2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa2370e12745a20d3dcd07438fc72f399c74f59537a35863a1a7a9ff2482040" +dependencies = [ + "cfg-if", + "libc", + "winapi", +] + [[package]] name = "serialport" -version = "4.2.1-alpha.0" -source = "git+https://github.com/serialport/serialport-rs?branch=main#e1f46eef5af7df2430f0a595681243e46f721b2a" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aab92efb5cf60ad310548bc3f16fa6b0d950019cb7ed8ff41968c3d03721cf12" dependencies = [ "CoreFoundation-sys", "IOKit-sys", "bitflags", "cfg-if", "libudev", - "mach2", - "nix", + "mach 0.3.2", + "nix 0.24.3", "regex", "winapi", ] [[package]] name = "simd-adler32" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14a5df39617d7c8558154693a1bb8157a4aab8179209540cc0b10e5dc24e0b18" +checksum = "238abfbb77c1915110ad968465608b68e869e0772622c9656714e73e5a1a522f" [[package]] name = "smallvec" @@ -1068,9 +1092,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "spin" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc" +checksum = "b5d6e0250b93c8427a177b849d144a96d5acc57006149479403d7861ab721e34" dependencies = [ "lock_api", ] @@ -1107,15 +1131,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "tiff" version = "0.8.1" @@ -1270,9 +1285,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -1285,45 +1300,45 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" [[package]] name = "windows_aarch64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" [[package]] name = "windows_i686_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" [[package]] name = "windows_i686_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" [[package]] name = "windows_x86_64_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" [[package]] name = "windows_x86_64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" [[package]] name = "zune-inflate" diff --git a/sw/deployer/Cargo.toml b/sw/deployer/Cargo.toml index 7758404..a08ec17 100644 --- a/sw/deployer/Cargo.toml +++ b/sw/deployer/Cargo.toml @@ -20,7 +20,8 @@ include-flate = { version = "0.2.0", features = ["stable"] } md5 = "0.7.0" panic-message = "0.3.0" rust-ini = "0.18.0" -serialport = { git = "https://github.com/serialport/serialport-rs", branch = "main" } +serial2 = "0.1.7" +serialport = "4.2.0" [profile.release] lto = true diff --git a/sw/deployer/src/main.rs b/sw/deployer/src/main.rs index c632762..bb0d57f 100644 --- a/sw/deployer/src/main.rs +++ b/sw/deployer/src/main.rs @@ -16,8 +16,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::Duration, - {panic, process, thread}, + {panic, process}, }; #[derive(Parser)] @@ -483,11 +482,14 @@ fn handle_64dd_command(connection: Connection, args: &_64DDArgs) -> Result<(), s disk::Format::Development => sc64::DdDriveType::Development, }; - sc64.configure_64dd(sc64::DdMode::Full, drive_type)?; + let dd_mode = sc64::DdMode::Full; + println!("64DD mode set to [{dd_mode} / {drive_type}]"); + sc64.configure_64dd(dd_mode, drive_type)?; println!( - "{}", - "Press button on the SC64 device to cycle through provided disks".bold() + "{}: {}", + "[64DD]".bold(), + "Press button on the SC64 device to cycle through provided disks".bright_purple() ); let exit = setup_exit_flag(); @@ -499,30 +501,30 @@ fn handle_64dd_command(connection: Connection, args: &_64DDArgs) -> Result<(), s let head = packet.info.head; let block = packet.info.block; if let Some(ref mut disk) = selected_disk { - let reply_packet = match packet.kind { - sc64::DiskPacketKind::Read => { - print!("{}", "[R]".cyan()); + let (reply_packet, rw) = match packet.kind { + sc64::DiskPacketKind::Read => ( disk.read_block(track, head, block)?.map(|data| { packet.info.set_data(&data); packet - }) - } - sc64::DiskPacketKind::Write => { - print!("{}", "[W]".yellow()); - let data = &packet.info.data; - disk.write_block(track, head, block, data)?.map(|_| packet) - } + }), + "[R]".cyan(), + ), + sc64::DiskPacketKind::Write => ( + disk.write_block(track, head, block, &packet.info.data)? + .map(|_| packet), + "[W]".yellow(), + ), }; let lba = if let Some(lba) = disk.get_lba(track, head, block) { format!("{lba}") } else { "Invalid".to_string() }; - let message = format!(" {track:4}:{head}:{block} / LBA: {lba}"); + let message = format!("{track:4}:{head}:{block} / LBA: {lba}"); if reply_packet.is_some() { - println!("{}", message.green()); + println!("{}: {} {}", "[64DD]".bold(), rw, message.green()); } else { - println!("{}", message.red()); + println!("{}: {} {}", "[64DD]".bold(), rw, message.red()); } sc64.reply_disk_packet(reply_packet)?; } else { @@ -533,7 +535,11 @@ fn handle_64dd_command(connection: Connection, args: &_64DDArgs) -> Result<(), s if selected_disk.is_some() { sc64.set_64dd_disk_state(sc64::DdDiskState::Ejected)?; selected_disk = None; - println!("64DD disk ejected [{}]", disk_names[selected_disk_index]); + println!( + "{}: Disk ejected [{}]", + "[64DD]".bold(), + disk_names[selected_disk_index].green() + ); } else { selected_disk_index += 1; if selected_disk_index >= disks.len() { @@ -541,16 +547,20 @@ fn handle_64dd_command(connection: Connection, args: &_64DDArgs) -> Result<(), s } selected_disk = Some(&mut disks[selected_disk_index]); sc64.set_64dd_disk_state(sc64::DdDiskState::Inserted)?; - println!("64DD disk inserted [{}]", disk_names[selected_disk_index]); + println!( + "{}: Disk inserted [{}]", + "[64DD]".bold(), + disk_names[selected_disk_index].bright_green() + ); } } _ => {} } - } else { - thread::sleep(Duration::from_millis(1)); } } + sc64.reset_state()?; + Ok(()) } @@ -558,25 +568,25 @@ fn handle_debug_command(connection: Connection, args: &DebugArgs) -> Result<(), let mut sc64 = init_sc64(connection, true)?; let mut debug_handler = debug::new(args.gdb)?; - if let Some(port) = args.gdb { - println!("GDB TCP socket listening at [0.0.0.0:{port}]"); - } if args.isv.is_some() { sc64.configure_is_viewer_64(args.isv)?; println!( - "IS-Viewer 64 configured and listening at ROM offset [0x{:08X}]", - args.isv.unwrap() + "{}: Listening on ROM offset [{}]", + "[IS-Viewer 64]".bold(), + format!("0x{:08X}", args.isv.unwrap()) + .to_string() + .bright_blue() ); } - println!("{}", "Debug mode started".bold()); + println!("{}: Started", "[Debug]".bold()); let exit = setup_exit_flag(); while !exit.load(Ordering::Relaxed) { if let Some(data_packet) = sc64.receive_data_packet()? { match data_packet { - sc64::DataPacket::IsViewer(message) => { + sc64::DataPacket::IsViewer64(message) => { print!("{message}") } sc64::DataPacket::Debug(debug_packet) => { @@ -588,17 +598,16 @@ fn handle_debug_command(connection: Connection, args: &DebugArgs) -> Result<(), sc64.send_debug_packet(gdb_packet)?; } else if let Some(debug_packet) = debug_handler.process_user_input() { sc64.send_debug_packet(debug_packet)?; - } else { - thread::sleep(Duration::from_millis(1)); } } - println!("{}", "Debug mode ended".bold()); - if args.isv.is_some() { sc64.configure_is_viewer_64(None)?; + println!("{}: Stopped listening", "[IS-Viewer 64]".bold()); } + println!("{}: Stopped", "[Debug]".bold()); + Ok(()) } diff --git a/sw/deployer/src/sc64/error.rs b/sw/deployer/src/sc64/error.rs index f96444f..416bebb 100644 --- a/sw/deployer/src/sc64/error.rs +++ b/sw/deployer/src/sc64/error.rs @@ -17,7 +17,7 @@ impl std::error::Error for Error {} impl Display for Error { fn fmt(&self, f: &mut Formatter) -> Result { - write!(f, "SC64 error: {}", self.description.as_str()) + write!(f, "{}", self.description.as_str()) } } diff --git a/sw/deployer/src/sc64/link.rs b/sw/deployer/src/sc64/link.rs index a8fc6e4..2f00d98 100644 --- a/sw/deployer/src/sc64/link.rs +++ b/sw/deployer/src/sc64/link.rs @@ -1,8 +1,15 @@ use super::error::Error; +use serial2::SerialPort; use std::{ collections::VecDeque, io::{BufReader, BufWriter, ErrorKind, Read, Write}, net::{TcpListener, TcpStream}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Receiver, Sender}, + Arc, + }, + thread, time::{Duration, Instant}, }; @@ -63,20 +70,20 @@ trait Backend { ) -> Result, Error>; } -struct SerialBackend { - serial: Box, +struct Serial { + serial: SerialPort, } -impl SerialBackend { - fn reset(&mut self) -> Result<(), Error> { +impl Serial { + fn reset(&self) -> Result<(), Error> { const WAIT_DURATION: Duration = Duration::from_millis(10); const RETRY_COUNT: i32 = 100; - self.serial.write_data_terminal_ready(true)?; + self.serial.set_dtr(true)?; for n in 0..=RETRY_COUNT { - self.serial.clear(serialport::ClearBuffer::All)?; - std::thread::sleep(WAIT_DURATION); - if self.serial.read_data_set_ready()? { + self.serial.discard_buffers()?; + thread::sleep(WAIT_DURATION); + if self.serial.read_dsr()? { break; } if n == RETRY_COUNT { @@ -84,10 +91,10 @@ impl SerialBackend { } } - self.serial.write_data_terminal_ready(false)?; + self.serial.set_dtr(false)?; for n in 0..=RETRY_COUNT { - std::thread::sleep(WAIT_DURATION); - if !self.serial.read_data_set_ready()? { + thread::sleep(WAIT_DURATION); + if !self.serial.read_dsr()? { break; } if n == RETRY_COUNT { @@ -97,10 +104,44 @@ impl SerialBackend { Ok(()) } -} -impl Backend for SerialBackend { - fn send_command(&mut self, command: &Command) -> Result<(), Error> { + fn read_data(&self, buffer: &mut [u8], block: bool) -> Result, Error> { + let timeout = Instant::now(); + let mut position = 0; + let length = buffer.len(); + while position < length { + if timeout.elapsed() > Duration::from_secs(5) { + return Err(Error::new("Serial read timeout")); + } + match self.serial.read(&mut buffer[position..length]) { + Ok(0) => return Err(Error::new("Unexpected end of serial data")), + Ok(bytes) => position += bytes, + Err(error) => match error.kind() { + ErrorKind::Interrupted | ErrorKind::TimedOut => { + if !block && position == 0 { + return Ok(None); + } + } + _ => return Err(error.into()), + }, + } + } + Ok(Some(())) + } + + fn read_exact(&self, buffer: &mut [u8]) -> Result<(), Error> { + match self.read_data(buffer, true)? { + Some(()) => Ok(()), + None => Err(Error::new("Unexpected end of serial data")), + } + } + + fn read_header(&self, block: bool) -> Result, Error> { + let mut header = [0u8; 4]; + Ok(self.read_data(&mut header, block)?.map(|_| header)) + } + + fn send_command(&self, command: &Command) -> Result<(), Error> { self.serial.write_all(b"CMD")?; self.serial.write_all(&command.id.to_be_bytes())?; self.serial.write_all(&command.args[0].to_be_bytes())?; @@ -114,29 +155,27 @@ impl Backend for SerialBackend { } fn process_incoming_data( - &mut self, + &self, data_type: DataType, packets: &mut VecDeque, ) -> Result, Error> { - let mut buffer = [0u8; 4]; - - while matches!(data_type, DataType::Response) - || self.serial.bytes_to_read()? as usize >= buffer.len() - { - self.serial.read_exact(&mut buffer)?; - let (packet_token, error) = (match &buffer[0..3] { + let block = matches!(data_type, DataType::Response); + while let Some(header) = self.read_header(block)? { + let (packet_token, error) = (match &header[0..3] { b"CMP" => Ok((false, false)), b"PKT" => Ok((true, false)), b"ERR" => Ok((false, true)), _ => Err(Error::new("Unknown response token")), })?; - let id = buffer[3]; + let id = header[3]; - self.serial.read_exact(&mut buffer)?; + let mut buffer = [0u8; 4]; + + self.read_exact(&mut buffer)?; let length = u32::from_be_bytes(buffer) as usize; let mut data = vec![0u8; length]; - self.serial.read_exact(&mut data)?; + self.read_exact(&mut data)?; if packet_token { packets.push_back(Packet { id, data }); @@ -152,49 +191,81 @@ impl Backend for SerialBackend { } } -fn new_serial_backend(port: &str) -> Result { - let serial = serialport::new(port, 115_200) - .timeout(Duration::from_secs(10)) - .open()?; - let mut backend = SerialBackend { serial }; +fn new_serial(port: &str) -> Result { + let mut serial = SerialPort::open(port, 115_200)?; + serial.set_read_timeout(Duration::from_millis(10))?; + serial.set_write_timeout(Duration::from_secs(5))?; + let backend = Serial { serial }; backend.reset()?; Ok(backend) } +struct SerialBackend { + inner: Serial, +} + +impl Backend for SerialBackend { + fn send_command(&mut self, command: &Command) -> Result<(), Error> { + self.inner.send_command(command) + } + + fn process_incoming_data( + &mut self, + data_type: DataType, + packets: &mut VecDeque, + ) -> Result, Error> { + self.inner.process_incoming_data(data_type, packets) + } +} + +fn new_serial_backend(port: &str) -> Result { + let backend = SerialBackend { + inner: new_serial(port)?, + }; + Ok(backend) +} + struct TcpBackend { - stream: TcpStream, reader: BufReader, writer: BufWriter, - header_position: usize, - header: [u8; 4], } impl TcpBackend { - fn read_header(&mut self, block: bool) -> Result { - self.stream.set_nonblocking(!block)?; - while self.header_position != 4 { - self.header_position += - match self.reader.read(&mut self.header[self.header_position..4]) { - Ok(0) => return Err(Error::new("Unexpected end of stream")), - Ok(length) => length, - Err(error) => match error.kind() { - ErrorKind::Interrupted => 0, - ErrorKind::WouldBlock => 0, - _ => return Err(error.into()), - }, - }; - if !block { - break; + fn read_data(&mut self, buffer: &mut [u8], block: bool) -> Result, Error> { + let timeout = Instant::now(); + let mut position = 0; + let length = buffer.len(); + while position < length { + if timeout.elapsed() > Duration::from_secs(10) { + return Err(Error::new("Stream read timeout")); + } + match self.reader.read(&mut buffer[position..length]) { + Ok(0) => return Err(Error::new("Unexpected end of stream data")), + Ok(bytes) => position += bytes, + Err(error) => match error.kind() { + ErrorKind::Interrupted | ErrorKind::TimedOut => { + if !block && position == 0 { + return Ok(None); + } + } + _ => return Err(error.into()), + }, } } - self.stream.set_nonblocking(false)?; - if self.header_position == 4 { - self.header_position = 0; - return Ok(true); - } else { - return Ok(false); + Ok(Some(())) + } + + fn read_exact(&mut self, buffer: &mut [u8]) -> Result<(), Error> { + match self.read_data(buffer, true)? { + Some(()) => Ok(()), + None => Err(Error::new("Unexpected end of stream data")), } } + + fn read_header(&mut self, block: bool) -> Result, Error> { + let mut header = [0u8; 4]; + Ok(self.read_data(&mut header, block)?.map(|_| header)) + } } impl Backend for TcpBackend { @@ -220,20 +291,20 @@ impl Backend for TcpBackend { data_type: DataType, packets: &mut VecDeque, ) -> Result, Error> { - let mut buffer = [0u8; 4]; - - while self.read_header(matches!(data_type, DataType::Response))? { - let payload_data_type: DataType = u32::from_be_bytes(self.header).try_into()?; + let block = matches!(data_type, DataType::Response); + while let Some(header) = self.read_header(block)? { + let payload_data_type: DataType = u32::from_be_bytes(header).try_into()?; + let mut buffer = [0u8; 4]; match payload_data_type { DataType::Response => { let mut response_info = vec![0u8; 2]; - self.reader.read_exact(&mut response_info)?; + self.read_exact(&mut response_info)?; - self.reader.read_exact(&mut buffer)?; + self.read_exact(&mut buffer)?; let response_data_length = u32::from_be_bytes(buffer) as usize; let mut data = vec![0u8; response_data_length]; - self.reader.read_exact(&mut data)?; + self.read_exact(&mut data)?; return Ok(Some(Response { id: response_info[0], @@ -243,13 +314,13 @@ impl Backend for TcpBackend { } DataType::Packet => { let mut packet_info = vec![0u8; 1]; - self.reader.read_exact(&mut packet_info)?; + self.read_exact(&mut packet_info)?; - self.reader.read_exact(&mut buffer)?; + self.read_exact(&mut buffer)?; let packet_data_length = u32::from_be_bytes(buffer) as usize; let mut data = vec![0u8; packet_data_length]; - self.reader.read_exact(&mut data)?; + self.read_exact(&mut data)?; packets.push_back(Packet { id: packet_info[0], @@ -272,7 +343,7 @@ fn new_tcp_backend(address: &str) -> Result { let stream = match TcpStream::connect(address) { Ok(stream) => { stream.set_write_timeout(Some(Duration::from_secs(10)))?; - stream.set_read_timeout(Some(Duration::from_secs(10)))?; + stream.set_read_timeout(Some(Duration::from_millis(10)))?; stream } Err(error) => { @@ -283,13 +354,7 @@ fn new_tcp_backend(address: &str) -> Result { }; let reader = BufReader::new(stream.try_clone()?); let writer = BufWriter::new(stream.try_clone()?); - Ok(TcpBackend { - stream, - reader, - writer, - header_position: 0, - header: [0, 0, 0, 0], - }) + Ok(TcpBackend { reader, writer }) } pub struct Link { @@ -408,109 +473,225 @@ pub fn run_server( event_callback: fn(ServerEvent), ) -> Result<(), Error> { let listener = TcpListener::bind(address)?; - - event_callback(ServerEvent::Listening(listener.local_addr()?.to_string())); + let listening_address = listener.local_addr()?; + event_callback(ServerEvent::Listening(listening_address.to_string())); for stream in listener.incoming() { match stream { - Ok(mut stream) => match server_accept_connection(port, event_callback, &mut stream) { - Ok(()) => {} - Err(error) => event_callback(ServerEvent::Err(error.to_string())), + Ok(mut stream) => { + let peer = stream.peer_addr()?.to_string(); + event_callback(ServerEvent::Connected(peer.clone())); + match server_accept_connection(port, &mut stream) { + Ok(()) => event_callback(ServerEvent::Disconnected(peer.clone())), + Err(error) => event_callback(ServerEvent::Err(error.to_string())), + } + } + Err(error) => match error.kind() { + _ => return Err(error.into()), }, - Err(error) => return Err(error.into()), } } Ok(()) } -fn server_accept_connection( - port: &str, - event_callback: fn(ServerEvent), - stream: &mut TcpStream, +enum Event { + Command((u8, [u32; 2], Vec)), + Response(Response), + Packet(Packet), + KeepAlive, + Closed(Option), +} + +fn server_accept_connection(port: &str, stream: &mut TcpStream) -> Result<(), Error> { + let (event_sender, event_receiver) = channel::(); + let exit_flag = Arc::new(AtomicBool::new(false)); + + let mut stream_writer = BufWriter::new(stream.try_clone()?); + let mut stream_reader = stream.try_clone()?; + + let stream_event_sender = event_sender.clone(); + let stream_exit_flag = exit_flag.clone(); + let stream_thread = thread::spawn(move || { + let closed_sender = stream_event_sender.clone(); + match server_stream_thread(&mut stream_reader, stream_event_sender, stream_exit_flag) { + Ok(()) => closed_sender.send(Event::Closed(None)), + Err(error) => closed_sender.send(Event::Closed(Some(error))), + } + .ok(); + }); + + let serial = Arc::new(new_serial(port)?); + let serial_writer = serial.clone(); + let serial_reader = serial.clone(); + + let serial_event_sender = event_sender.clone(); + let serial_exit_flag = exit_flag.clone(); + let serial_thread = thread::spawn(move || { + let closed_sender = serial_event_sender.clone(); + match server_serial_thread(serial_reader, serial_event_sender, serial_exit_flag) { + Ok(()) => closed_sender.send(Event::Closed(None)), + Err(error) => closed_sender.send(Event::Closed(Some(error))), + } + .ok(); + }); + + let keepalive_event_sender = event_sender.clone(); + let keepalive_exit_flag = exit_flag.clone(); + let keepalive_thread = thread::spawn(move || { + server_keepalive_thread(keepalive_event_sender, keepalive_exit_flag); + }); + + let result = server_process_events(&mut stream_writer, serial_writer, event_receiver); + + exit_flag.store(true, Ordering::Relaxed); + stream_thread.join().ok(); + serial_thread.join().ok(); + keepalive_thread.join().ok(); + + result +} + +fn server_process_events( + stream_writer: &mut BufWriter, + serial_writer: Arc, + event_receiver: Receiver, ) -> Result<(), Error> { - let peer = stream.peer_addr()?.to_string(); - event_callback(ServerEvent::Connected(peer.clone())); + for event in event_receiver.into_iter() { + match event { + Event::Command((id, args, data)) => { + serial_writer.send_command(&Command { + id, + args, + data: &data, + })?; + } + Event::Response(response) => { + stream_writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?; + stream_writer.write_all(&[response.id])?; + stream_writer.write_all(&[response.error as u8])?; + stream_writer.write_all(&(response.data.len() as u32).to_be_bytes())?; + stream_writer.write_all(&response.data)?; + stream_writer.flush()?; + } + Event::Packet(packet) => { + stream_writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?; + stream_writer.write_all(&[packet.id])?; + stream_writer.write_all(&(packet.data.len() as u32).to_be_bytes())?; + stream_writer.write_all(&packet.data)?; + stream_writer.flush()?; + } + Event::KeepAlive => { + stream_writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?; + stream_writer.flush()?; + } + Event::Closed(result) => match result { + Some(error) => return Err(error), + None => { + break; + } + }, + } + } - stream.set_write_timeout(Some(Duration::from_secs(10)))?; - stream.set_read_timeout(Some(Duration::from_secs(10)))?; + Ok(()) +} - let mut reader = BufReader::new(stream.try_clone()?); - let mut writer = BufWriter::new(stream.try_clone()?); - - let mut serial_backend = new_serial_backend(port)?; - serial_backend.reset()?; - - let mut packets: VecDeque = VecDeque::new(); - let mut keepalive = Instant::now(); - let mut header_position = 0; - let mut header = [0u8; 4]; +fn server_stream_thread( + stream: &mut TcpStream, + event_sender: Sender, + exit_flag: Arc, +) -> Result<(), Error> { + let mut stream_reader = BufReader::new(stream.try_clone()?); loop { - stream.set_nonblocking(true)?; - header_position += match stream.read(&mut header[header_position..4]) { - Ok(0) => { - event_callback(ServerEvent::Disconnected(peer.clone())); + let mut header = [0u8; 4]; + let header_length = header.len(); + let mut header_position = 0; + + stream.set_read_timeout(Some(Duration::from_millis(10)))?; + while header_position < header_length { + if exit_flag.load(Ordering::Relaxed) { return Ok(()); } - Ok(length) => length, - Err(error) => match error.kind() { - ErrorKind::WouldBlock => 0, - _ => return Err(error.into()), - }, - }; - stream.set_nonblocking(false)?; - - if header_position == 4 { - header_position = 0; - - let data_type: DataType = u32::from_be_bytes(header).try_into()?; - - if !matches!(data_type, DataType::Command) { - return Err(Error::new("Received data type wasn't a command data type")); + match stream_reader.read(&mut header[header_position..header_length]) { + Ok(0) => return Ok(()), + Ok(bytes) => header_position += bytes, + Err(error) => match error.kind() { + ErrorKind::Interrupted | ErrorKind::TimedOut => {} + _ => return Err(error.into()), + }, } + } + stream.set_read_timeout(None)?; - let mut buffer = [0u8; 4]; - let mut id_buffer = [0u8; 1]; - let mut args = [0u32; 2]; + let data_type: DataType = u32::from_be_bytes(header).try_into()?; + if !matches!(data_type, DataType::Command) { + return Err(Error::new("Received data type was not a command data type")); + } - reader.read_exact(&mut id_buffer)?; - reader.read_exact(&mut buffer)?; - args[0] = u32::from_be_bytes(buffer); - reader.read_exact(&mut buffer)?; - args[1] = u32::from_be_bytes(buffer); + let mut buffer = [0u8; 4]; + let mut id = [0u8; 1]; + let mut args = [0u32; 2]; - reader.read_exact(&mut buffer)?; - let command_data_length = u32::from_be_bytes(buffer) as usize; - let mut data = vec![0u8; command_data_length]; - reader.read_exact(&mut data)?; + stream_reader.read_exact(&mut id)?; + stream_reader.read_exact(&mut buffer)?; + args[0] = u32::from_be_bytes(buffer); + stream_reader.read_exact(&mut buffer)?; + args[1] = u32::from_be_bytes(buffer); - serial_backend.send_command(&Command { - id: id_buffer[0], - args, - data: &data, - })?; - } else if let Some(response) = - serial_backend.process_incoming_data(DataType::Packet, &mut packets)? - { - writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?; - writer.write_all(&[response.id])?; - writer.write_all(&[response.error as u8])?; - writer.write_all(&(response.data.len() as u32).to_be_bytes())?; - writer.write_all(&response.data)?; - writer.flush()?; - } else if let Some(packet) = packets.pop_front() { - writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?; - writer.write_all(&[packet.id])?; - writer.write_all(&(packet.data.len() as u32).to_be_bytes())?; - writer.write_all(&packet.data)?; - writer.flush()?; - } else if keepalive.elapsed() > Duration::from_secs(5) { + stream_reader.read_exact(&mut buffer)?; + let command_data_length = u32::from_be_bytes(buffer) as usize; + let mut data = vec![0u8; command_data_length]; + stream_reader.read_exact(&mut data)?; + + let event = Event::Command((id[0], args, data)); + if event_sender.send(event).is_err() { + break; + } + } + + Ok(()) +} + +fn server_serial_thread( + serial_reader: Arc, + event_sender: Sender, + exit_flag: Arc, +) -> Result<(), Error> { + let mut packets: VecDeque = VecDeque::new(); + + while !exit_flag.load(Ordering::Relaxed) { + let response = serial_reader.process_incoming_data(DataType::Packet, &mut packets)?; + if let Some(response) = response { + let event = Event::Response(response); + if event_sender.send(event).is_err() { + break; + } + } + if let Some(packet) = packets.pop_front() { + let event = Event::Packet(packet); + if event_sender.send(event).is_err() { + break; + } + } + } + + Ok(()) +} + +fn server_keepalive_thread(event_sender: Sender, exit_flag: Arc) { + let mut keepalive = Instant::now(); + + while !exit_flag.load(Ordering::Relaxed) { + if keepalive.elapsed() >= Duration::from_secs(5) { + let event = Event::KeepAlive; + if event_sender.send(event).is_err() { + break; + } keepalive = Instant::now(); - writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?; - writer.flush()?; } else { - std::thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::from_millis(10)); } } } diff --git a/sw/deployer/src/sc64/mod.rs b/sw/deployer/src/sc64/mod.rs index c5363b7..b56c83d 100644 --- a/sw/deployer/src/sc64/mod.rs +++ b/sw/deployer/src/sc64/mod.rs @@ -617,13 +617,15 @@ impl SC64 { } pub fn check_firmware_version(&mut self) -> Result<(u16, u16, u32), Error> { + let unsupported_version_message = format!( + "Unsupported SC64 firmware version, minimum supported version: {}.{}.x", + SUPPORTED_MAJOR_VERSION, SUPPORTED_MINOR_VERSION + ); let (major, minor, revision) = self .command_version_get() - .map_err(|_| Error::new("Outdated SC64 firmware version, please update firmware"))?; + .map_err(|_| Error::new(unsupported_version_message.as_str()))?; if major != SUPPORTED_MAJOR_VERSION || minor < SUPPORTED_MINOR_VERSION { - return Err(Error::new( - "Unsupported SC64 firmware version, please update firmware", - )); + return Err(Error::new(unsupported_version_message.as_str())); } Ok((major, minor, revision)) } diff --git a/sw/deployer/src/sc64/types.rs b/sw/deployer/src/sc64/types.rs index 2b9e625..cd3cc56 100644 --- a/sw/deployer/src/sc64/types.rs +++ b/sw/deployer/src/sc64/types.rs @@ -579,7 +579,7 @@ pub enum DataPacket { Button, Debug(DebugPacket), Disk(DiskPacket), - IsViewer(String), + IsViewer64(String), UpdateStatus(UpdateStatus), } @@ -590,7 +590,7 @@ impl TryFrom for DataPacket { b'B' => Self::Button, b'U' => Self::Debug(value.data.try_into()?), b'D' => Self::Disk(value.data.try_into()?), - b'I' => Self::IsViewer(EUC_JP.decode(&value.data).0.into()), + b'I' => Self::IsViewer64(EUC_JP.decode(&value.data).0.into()), b'F' => { if value.data.len() != 4 { return Err(Error::new(