mirror of
https://github.com/Polprzewodnikowy/SummerCart64.git
synced 2024-11-22 14:09:16 +01:00
server in separate file
This commit is contained in:
parent
9ab03c4fcf
commit
77ad2a5d2a
@ -3,17 +3,12 @@ use serial2::SerialPort;
|
|||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
io::{BufReader, BufWriter, ErrorKind, Read, Write},
|
io::{BufReader, BufWriter, ErrorKind, Read, Write},
|
||||||
net::{TcpListener, TcpStream},
|
net::TcpStream,
|
||||||
sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
mpsc::{channel, Receiver, Sender},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
thread,
|
thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
enum DataType {
|
pub enum DataType {
|
||||||
Command,
|
Command,
|
||||||
Response,
|
Response,
|
||||||
Packet,
|
Packet,
|
||||||
@ -61,16 +56,7 @@ pub struct Packet {
|
|||||||
pub data: Vec<u8>,
|
pub data: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Backend {
|
pub struct Serial {
|
||||||
fn send_command(&mut self, command: &Command) -> Result<(), Error>;
|
|
||||||
fn process_incoming_data(
|
|
||||||
&mut self,
|
|
||||||
data_type: DataType,
|
|
||||||
packets: &mut VecDeque<Packet>,
|
|
||||||
) -> Result<Option<Response>, Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Serial {
|
|
||||||
serial: SerialPort,
|
serial: SerialPort,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,7 +127,7 @@ impl Serial {
|
|||||||
Ok(self.read_data(&mut header, block)?.map(|_| header))
|
Ok(self.read_data(&mut header, block)?.map(|_| header))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_command(&self, command: &Command) -> Result<(), Error> {
|
pub fn send_command(&self, command: &Command) -> Result<(), Error> {
|
||||||
self.serial.write_all(b"CMD")?;
|
self.serial.write_all(b"CMD")?;
|
||||||
self.serial.write_all(&command.id.to_be_bytes())?;
|
self.serial.write_all(&command.id.to_be_bytes())?;
|
||||||
self.serial.write_all(&command.args[0].to_be_bytes())?;
|
self.serial.write_all(&command.args[0].to_be_bytes())?;
|
||||||
@ -154,7 +140,7 @@ impl Serial {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_incoming_data(
|
pub fn process_incoming_data(
|
||||||
&self,
|
&self,
|
||||||
data_type: DataType,
|
data_type: DataType,
|
||||||
packets: &mut VecDeque<Packet>,
|
packets: &mut VecDeque<Packet>,
|
||||||
@ -191,15 +177,24 @@ impl Serial {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_serial(port: &str) -> Result<Serial, Error> {
|
pub fn new_serial(port: &str) -> Result<Serial, Error> {
|
||||||
let mut serial = SerialPort::open(port, 115_200)?;
|
let mut serial = SerialPort::open(port, 115_200)?;
|
||||||
serial.set_read_timeout(Duration::from_millis(10))?;
|
|
||||||
serial.set_write_timeout(Duration::from_secs(5))?;
|
serial.set_write_timeout(Duration::from_secs(5))?;
|
||||||
|
serial.set_read_timeout(Duration::from_millis(10))?;
|
||||||
let backend = Serial { serial };
|
let backend = Serial { serial };
|
||||||
backend.reset()?;
|
backend.reset()?;
|
||||||
Ok(backend)
|
Ok(backend)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait Backend {
|
||||||
|
fn send_command(&mut self, command: &Command) -> Result<(), Error>;
|
||||||
|
fn process_incoming_data(
|
||||||
|
&mut self,
|
||||||
|
data_type: DataType,
|
||||||
|
packets: &mut VecDeque<Packet>,
|
||||||
|
) -> Result<Option<Response>, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
struct SerialBackend {
|
struct SerialBackend {
|
||||||
inner: Serial,
|
inner: Serial,
|
||||||
}
|
}
|
||||||
@ -459,239 +454,3 @@ pub fn list_local_devices() -> Result<Vec<LocalDevice>, Error> {
|
|||||||
|
|
||||||
return Ok(serial_devices);
|
return Ok(serial_devices);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum ServerEvent {
|
|
||||||
Listening(String),
|
|
||||||
Connected(String),
|
|
||||||
Disconnected(String),
|
|
||||||
Err(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run_server(
|
|
||||||
port: &str,
|
|
||||||
address: String,
|
|
||||||
event_callback: fn(ServerEvent),
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let listener = TcpListener::bind(address)?;
|
|
||||||
let listening_address = listener.local_addr()?;
|
|
||||||
event_callback(ServerEvent::Listening(listening_address.to_string()));
|
|
||||||
|
|
||||||
for stream in listener.incoming() {
|
|
||||||
match stream {
|
|
||||||
Ok(mut stream) => {
|
|
||||||
let peer = stream.peer_addr()?.to_string();
|
|
||||||
event_callback(ServerEvent::Connected(peer.clone()));
|
|
||||||
match server_accept_connection(port, &mut stream) {
|
|
||||||
Ok(()) => event_callback(ServerEvent::Disconnected(peer.clone())),
|
|
||||||
Err(error) => event_callback(ServerEvent::Err(error.to_string())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(error) => match error.kind() {
|
|
||||||
_ => return Err(error.into()),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
enum Event {
|
|
||||||
Command((u8, [u32; 2], Vec<u8>)),
|
|
||||||
Response(Response),
|
|
||||||
Packet(Packet),
|
|
||||||
KeepAlive,
|
|
||||||
Closed(Option<Error>),
|
|
||||||
}
|
|
||||||
|
|
||||||
fn server_accept_connection(port: &str, stream: &mut TcpStream) -> Result<(), Error> {
|
|
||||||
let (event_sender, event_receiver) = channel::<Event>();
|
|
||||||
let exit_flag = Arc::new(AtomicBool::new(false));
|
|
||||||
|
|
||||||
let mut stream_writer = BufWriter::new(stream.try_clone()?);
|
|
||||||
let mut stream_reader = stream.try_clone()?;
|
|
||||||
|
|
||||||
let stream_event_sender = event_sender.clone();
|
|
||||||
let stream_exit_flag = exit_flag.clone();
|
|
||||||
let stream_thread = thread::spawn(move || {
|
|
||||||
let closed_sender = stream_event_sender.clone();
|
|
||||||
match server_stream_thread(&mut stream_reader, stream_event_sender, stream_exit_flag) {
|
|
||||||
Ok(()) => closed_sender.send(Event::Closed(None)),
|
|
||||||
Err(error) => closed_sender.send(Event::Closed(Some(error))),
|
|
||||||
}
|
|
||||||
.ok();
|
|
||||||
});
|
|
||||||
|
|
||||||
let serial = Arc::new(new_serial(port)?);
|
|
||||||
let serial_writer = serial.clone();
|
|
||||||
let serial_reader = serial.clone();
|
|
||||||
|
|
||||||
let serial_event_sender = event_sender.clone();
|
|
||||||
let serial_exit_flag = exit_flag.clone();
|
|
||||||
let serial_thread = thread::spawn(move || {
|
|
||||||
let closed_sender = serial_event_sender.clone();
|
|
||||||
match server_serial_thread(serial_reader, serial_event_sender, serial_exit_flag) {
|
|
||||||
Ok(()) => closed_sender.send(Event::Closed(None)),
|
|
||||||
Err(error) => closed_sender.send(Event::Closed(Some(error))),
|
|
||||||
}
|
|
||||||
.ok();
|
|
||||||
});
|
|
||||||
|
|
||||||
let keepalive_event_sender = event_sender.clone();
|
|
||||||
let keepalive_exit_flag = exit_flag.clone();
|
|
||||||
let keepalive_thread = thread::spawn(move || {
|
|
||||||
server_keepalive_thread(keepalive_event_sender, keepalive_exit_flag);
|
|
||||||
});
|
|
||||||
|
|
||||||
let result = server_process_events(&mut stream_writer, serial_writer, event_receiver);
|
|
||||||
|
|
||||||
exit_flag.store(true, Ordering::Relaxed);
|
|
||||||
stream_thread.join().ok();
|
|
||||||
serial_thread.join().ok();
|
|
||||||
keepalive_thread.join().ok();
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
fn server_process_events(
|
|
||||||
stream_writer: &mut BufWriter<TcpStream>,
|
|
||||||
serial_writer: Arc<Serial>,
|
|
||||||
event_receiver: Receiver<Event>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
for event in event_receiver.into_iter() {
|
|
||||||
match event {
|
|
||||||
Event::Command((id, args, data)) => {
|
|
||||||
serial_writer.send_command(&Command {
|
|
||||||
id,
|
|
||||||
args,
|
|
||||||
data: &data,
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
Event::Response(response) => {
|
|
||||||
stream_writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?;
|
|
||||||
stream_writer.write_all(&[response.id])?;
|
|
||||||
stream_writer.write_all(&[response.error as u8])?;
|
|
||||||
stream_writer.write_all(&(response.data.len() as u32).to_be_bytes())?;
|
|
||||||
stream_writer.write_all(&response.data)?;
|
|
||||||
stream_writer.flush()?;
|
|
||||||
}
|
|
||||||
Event::Packet(packet) => {
|
|
||||||
stream_writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?;
|
|
||||||
stream_writer.write_all(&[packet.id])?;
|
|
||||||
stream_writer.write_all(&(packet.data.len() as u32).to_be_bytes())?;
|
|
||||||
stream_writer.write_all(&packet.data)?;
|
|
||||||
stream_writer.flush()?;
|
|
||||||
}
|
|
||||||
Event::KeepAlive => {
|
|
||||||
stream_writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?;
|
|
||||||
stream_writer.flush()?;
|
|
||||||
}
|
|
||||||
Event::Closed(result) => match result {
|
|
||||||
Some(error) => return Err(error),
|
|
||||||
None => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn server_stream_thread(
|
|
||||||
stream: &mut TcpStream,
|
|
||||||
event_sender: Sender<Event>,
|
|
||||||
exit_flag: Arc<AtomicBool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut stream_reader = BufReader::new(stream.try_clone()?);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let mut header = [0u8; 4];
|
|
||||||
let header_length = header.len();
|
|
||||||
let mut header_position = 0;
|
|
||||||
|
|
||||||
stream.set_read_timeout(Some(Duration::from_millis(10)))?;
|
|
||||||
while header_position < header_length {
|
|
||||||
if exit_flag.load(Ordering::Relaxed) {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
match stream_reader.read(&mut header[header_position..header_length]) {
|
|
||||||
Ok(0) => return Ok(()),
|
|
||||||
Ok(bytes) => header_position += bytes,
|
|
||||||
Err(error) => match error.kind() {
|
|
||||||
ErrorKind::Interrupted | ErrorKind::TimedOut | ErrorKind::WouldBlock => {}
|
|
||||||
_ => return Err(error.into()),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stream.set_read_timeout(None)?;
|
|
||||||
|
|
||||||
let data_type: DataType = u32::from_be_bytes(header).try_into()?;
|
|
||||||
if !matches!(data_type, DataType::Command) {
|
|
||||||
return Err(Error::new("Received data type was not a command data type"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buffer = [0u8; 4];
|
|
||||||
let mut id = [0u8; 1];
|
|
||||||
let mut args = [0u32; 2];
|
|
||||||
|
|
||||||
stream_reader.read_exact(&mut id)?;
|
|
||||||
stream_reader.read_exact(&mut buffer)?;
|
|
||||||
args[0] = u32::from_be_bytes(buffer);
|
|
||||||
stream_reader.read_exact(&mut buffer)?;
|
|
||||||
args[1] = u32::from_be_bytes(buffer);
|
|
||||||
|
|
||||||
stream_reader.read_exact(&mut buffer)?;
|
|
||||||
let command_data_length = u32::from_be_bytes(buffer) as usize;
|
|
||||||
let mut data = vec![0u8; command_data_length];
|
|
||||||
stream_reader.read_exact(&mut data)?;
|
|
||||||
|
|
||||||
let event = Event::Command((id[0], args, data));
|
|
||||||
if event_sender.send(event).is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn server_serial_thread(
|
|
||||||
serial_reader: Arc<Serial>,
|
|
||||||
event_sender: Sender<Event>,
|
|
||||||
exit_flag: Arc<AtomicBool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut packets: VecDeque<Packet> = VecDeque::new();
|
|
||||||
|
|
||||||
while !exit_flag.load(Ordering::Relaxed) {
|
|
||||||
let response = serial_reader.process_incoming_data(DataType::Packet, &mut packets)?;
|
|
||||||
if let Some(response) = response {
|
|
||||||
let event = Event::Response(response);
|
|
||||||
if event_sender.send(event).is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(packet) = packets.pop_front() {
|
|
||||||
let event = Event::Packet(packet);
|
|
||||||
if event_sender.send(event).is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn server_keepalive_thread(event_sender: Sender<Event>, exit_flag: Arc<AtomicBool>) {
|
|
||||||
let mut keepalive = Instant::now();
|
|
||||||
|
|
||||||
while !exit_flag.load(Ordering::Relaxed) {
|
|
||||||
if keepalive.elapsed() >= Duration::from_secs(5) {
|
|
||||||
let event = Event::KeepAlive;
|
|
||||||
if event_sender.send(event).is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
keepalive = Instant::now();
|
|
||||||
} else {
|
|
||||||
thread::sleep(Duration::from_millis(10));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -2,12 +2,14 @@ mod cic;
|
|||||||
mod error;
|
mod error;
|
||||||
pub mod firmware;
|
pub mod firmware;
|
||||||
mod link;
|
mod link;
|
||||||
|
mod server;
|
||||||
mod types;
|
mod types;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
pub use self::{
|
pub use self::{
|
||||||
error::Error,
|
error::Error,
|
||||||
link::{list_local_devices, ServerEvent},
|
link::list_local_devices,
|
||||||
|
server::ServerEvent,
|
||||||
types::{
|
types::{
|
||||||
BootMode, ButtonMode, ButtonState, CicSeed, DataPacket, DdDiskState, DdDriveType, DdMode,
|
BootMode, ButtonMode, ButtonState, CicSeed, DataPacket, DdDiskState, DdDriveType, DdMode,
|
||||||
DebugPacket, DiskPacket, DiskPacketKind, FpgaDebugData, McuStackUsage, SaveType, Switch,
|
DebugPacket, DiskPacket, DiskPacketKind, FpgaDebugData, McuStackUsage, SaveType, Switch,
|
||||||
@ -786,5 +788,5 @@ pub fn run_server(
|
|||||||
} else {
|
} else {
|
||||||
list_local_devices()?[0].port.clone()
|
list_local_devices()?[0].port.clone()
|
||||||
};
|
};
|
||||||
link::run_server(&port, address, event_callback)
|
server::run_server(&port, address, event_callback)
|
||||||
}
|
}
|
||||||
|
256
sw/deployer/src/sc64/server.rs
Normal file
256
sw/deployer/src/sc64/server.rs
Normal file
@ -0,0 +1,256 @@
|
|||||||
|
use super::{
|
||||||
|
error::Error,
|
||||||
|
link::{new_serial, Command, DataType, Packet, Response, Serial},
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
collections::VecDeque,
|
||||||
|
io::{BufReader, BufWriter, ErrorKind, Read, Write},
|
||||||
|
net::{TcpListener, TcpStream},
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
mpsc::{channel, Receiver, Sender},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
thread,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub enum ServerEvent {
|
||||||
|
Listening(String),
|
||||||
|
Connected(String),
|
||||||
|
Disconnected(String),
|
||||||
|
Err(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_server(
|
||||||
|
port: &str,
|
||||||
|
address: String,
|
||||||
|
event_callback: fn(ServerEvent),
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let listener = TcpListener::bind(address)?;
|
||||||
|
let listening_address = listener.local_addr()?;
|
||||||
|
event_callback(ServerEvent::Listening(listening_address.to_string()));
|
||||||
|
|
||||||
|
for stream in listener.incoming() {
|
||||||
|
match stream {
|
||||||
|
Ok(mut stream) => {
|
||||||
|
let peer = stream.peer_addr()?.to_string();
|
||||||
|
event_callback(ServerEvent::Connected(peer.clone()));
|
||||||
|
match server_accept_connection(port, &mut stream) {
|
||||||
|
Ok(()) => event_callback(ServerEvent::Disconnected(peer.clone())),
|
||||||
|
Err(error) => event_callback(ServerEvent::Err(error.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(error) => match error.kind() {
|
||||||
|
_ => return Err(error.into()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Event {
|
||||||
|
Command((u8, [u32; 2], Vec<u8>)),
|
||||||
|
Response(Response),
|
||||||
|
Packet(Packet),
|
||||||
|
KeepAlive,
|
||||||
|
Closed(Option<Error>),
|
||||||
|
}
|
||||||
|
|
||||||
|
fn server_accept_connection(port: &str, stream: &mut TcpStream) -> Result<(), Error> {
|
||||||
|
let (event_sender, event_receiver) = channel::<Event>();
|
||||||
|
let exit_flag = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let mut stream_writer = BufWriter::new(stream.try_clone()?);
|
||||||
|
let mut stream_reader = stream.try_clone()?;
|
||||||
|
|
||||||
|
let serial = Arc::new(new_serial(port)?);
|
||||||
|
let serial_writer = serial.clone();
|
||||||
|
let serial_reader = serial.clone();
|
||||||
|
|
||||||
|
let stream_event_sender = event_sender.clone();
|
||||||
|
let stream_exit_flag = exit_flag.clone();
|
||||||
|
let stream_thread = thread::spawn(move || {
|
||||||
|
let closed_sender = stream_event_sender.clone();
|
||||||
|
match server_stream_thread(&mut stream_reader, stream_event_sender, stream_exit_flag) {
|
||||||
|
Ok(()) => closed_sender.send(Event::Closed(None)),
|
||||||
|
Err(error) => closed_sender.send(Event::Closed(Some(error))),
|
||||||
|
}
|
||||||
|
.ok();
|
||||||
|
});
|
||||||
|
|
||||||
|
let serial_event_sender = event_sender.clone();
|
||||||
|
let serial_exit_flag = exit_flag.clone();
|
||||||
|
let serial_thread = thread::spawn(move || {
|
||||||
|
let closed_sender = serial_event_sender.clone();
|
||||||
|
match server_serial_thread(serial_reader, serial_event_sender, serial_exit_flag) {
|
||||||
|
Ok(()) => closed_sender.send(Event::Closed(None)),
|
||||||
|
Err(error) => closed_sender.send(Event::Closed(Some(error))),
|
||||||
|
}
|
||||||
|
.ok();
|
||||||
|
});
|
||||||
|
|
||||||
|
let keepalive_event_sender = event_sender.clone();
|
||||||
|
let keepalive_exit_flag = exit_flag.clone();
|
||||||
|
let keepalive_thread = thread::spawn(move || {
|
||||||
|
server_keepalive_thread(keepalive_event_sender, keepalive_exit_flag);
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = server_process_events(&mut stream_writer, serial_writer, event_receiver);
|
||||||
|
|
||||||
|
exit_flag.store(true, Ordering::Relaxed);
|
||||||
|
stream_thread.join().ok();
|
||||||
|
serial_thread.join().ok();
|
||||||
|
keepalive_thread.join().ok();
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
fn server_process_events(
|
||||||
|
stream_writer: &mut BufWriter<TcpStream>,
|
||||||
|
serial_writer: Arc<Serial>,
|
||||||
|
event_receiver: Receiver<Event>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
for event in event_receiver.into_iter() {
|
||||||
|
match event {
|
||||||
|
Event::Command((id, args, data)) => {
|
||||||
|
serial_writer.send_command(&Command {
|
||||||
|
id,
|
||||||
|
args,
|
||||||
|
data: &data,
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Event::Response(response) => {
|
||||||
|
stream_writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?;
|
||||||
|
stream_writer.write_all(&[response.id])?;
|
||||||
|
stream_writer.write_all(&[response.error as u8])?;
|
||||||
|
stream_writer.write_all(&(response.data.len() as u32).to_be_bytes())?;
|
||||||
|
stream_writer.write_all(&response.data)?;
|
||||||
|
stream_writer.flush()?;
|
||||||
|
}
|
||||||
|
Event::Packet(packet) => {
|
||||||
|
stream_writer.write_all(&u32::to_be_bytes(DataType::Packet.into()))?;
|
||||||
|
stream_writer.write_all(&[packet.id])?;
|
||||||
|
stream_writer.write_all(&(packet.data.len() as u32).to_be_bytes())?;
|
||||||
|
stream_writer.write_all(&packet.data)?;
|
||||||
|
stream_writer.flush()?;
|
||||||
|
}
|
||||||
|
Event::KeepAlive => {
|
||||||
|
stream_writer.write_all(&u32::to_be_bytes(DataType::KeepAlive.into()))?;
|
||||||
|
stream_writer.flush()?;
|
||||||
|
}
|
||||||
|
Event::Closed(result) => match result {
|
||||||
|
Some(error) => return Err(error),
|
||||||
|
None => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn server_stream_thread(
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
event_sender: Sender<Event>,
|
||||||
|
exit_flag: Arc<AtomicBool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut stream_reader = BufReader::new(stream.try_clone()?);
|
||||||
|
|
||||||
|
let mut header = [0u8; 4];
|
||||||
|
let header_length = header.len();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let mut header_position = 0;
|
||||||
|
|
||||||
|
let timeout = stream.read_timeout()?;
|
||||||
|
stream.set_read_timeout(Some(Duration::from_millis(10)))?;
|
||||||
|
while header_position < header_length {
|
||||||
|
if exit_flag.load(Ordering::Relaxed) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
match stream.read(&mut header[header_position..header_length]) {
|
||||||
|
Ok(0) => return Ok(()),
|
||||||
|
Ok(bytes) => header_position += bytes,
|
||||||
|
Err(error) => match error.kind() {
|
||||||
|
ErrorKind::Interrupted | ErrorKind::TimedOut | ErrorKind::WouldBlock => {}
|
||||||
|
_ => return Err(error.into()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream.set_read_timeout(timeout)?;
|
||||||
|
|
||||||
|
let data_type: DataType = u32::from_be_bytes(header).try_into()?;
|
||||||
|
if !matches!(data_type, DataType::Command) {
|
||||||
|
return Err(Error::new("Received data type was not a command data type"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buffer = [0u8; 4];
|
||||||
|
let mut id = [0u8; 1];
|
||||||
|
let mut args = [0u32; 2];
|
||||||
|
|
||||||
|
stream_reader.read_exact(&mut id)?;
|
||||||
|
stream_reader.read_exact(&mut buffer)?;
|
||||||
|
args[0] = u32::from_be_bytes(buffer);
|
||||||
|
stream_reader.read_exact(&mut buffer)?;
|
||||||
|
args[1] = u32::from_be_bytes(buffer);
|
||||||
|
|
||||||
|
stream_reader.read_exact(&mut buffer)?;
|
||||||
|
let command_data_length = u32::from_be_bytes(buffer) as usize;
|
||||||
|
let mut data = vec![0u8; command_data_length];
|
||||||
|
stream_reader.read_exact(&mut data)?;
|
||||||
|
|
||||||
|
let event = Event::Command((id[0], args, data));
|
||||||
|
if event_sender.send(event).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn server_serial_thread(
|
||||||
|
serial_reader: Arc<Serial>,
|
||||||
|
event_sender: Sender<Event>,
|
||||||
|
exit_flag: Arc<AtomicBool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut packets: VecDeque<Packet> = VecDeque::new();
|
||||||
|
|
||||||
|
while !exit_flag.load(Ordering::Relaxed) {
|
||||||
|
let response = serial_reader.process_incoming_data(DataType::Packet, &mut packets)?;
|
||||||
|
|
||||||
|
if let Some(response) = response {
|
||||||
|
let event = Event::Response(response);
|
||||||
|
if event_sender.send(event).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(packet) = packets.pop_front() {
|
||||||
|
let event = Event::Packet(packet);
|
||||||
|
if event_sender.send(event).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn server_keepalive_thread(event_sender: Sender<Event>, exit_flag: Arc<AtomicBool>) {
|
||||||
|
let mut keepalive = Instant::now();
|
||||||
|
|
||||||
|
while !exit_flag.load(Ordering::Relaxed) {
|
||||||
|
if keepalive.elapsed() >= Duration::from_secs(5) {
|
||||||
|
keepalive = Instant::now();
|
||||||
|
let event = Event::KeepAlive;
|
||||||
|
if event_sender.send(event).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
thread::sleep(Duration::from_millis(10));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user