stable remote connection

This commit is contained in:
Mateusz Faderewski 2023-03-13 13:35:39 +01:00
parent 066f3b0485
commit 2e011bcad6
2 changed files with 92 additions and 69 deletions

View File

@ -762,19 +762,31 @@ fn handle_server_command(connection: Connection, args: &ServerArgs) -> Result<()
sc64::run_server(port, args.address.clone(), |event| match event { sc64::run_server(port, args.address.clone(), |event| match event {
sc64::ServerEvent::Listening(address) => { sc64::ServerEvent::Listening(address) => {
println!("{}: Listening on address [{}]", "[Server]".bold(), address) println!(
"{}: Listening on address [{}]",
"[Server]".bold(),
address.bright_blue()
)
} }
sc64::ServerEvent::Connection(peer) => { sc64::ServerEvent::Connected(peer) => {
println!("{}: New connection from [{}]", "[Server]".bold(), peer); println!(
"{}: New connection from [{}]",
"[Server]".bold(),
peer.bright_green()
);
} }
sc64::ServerEvent::Disconnected(peer) => { sc64::ServerEvent::Disconnected(peer) => {
println!("{}: Client disconnected [{}]", "[Server]".bold(), peer); println!(
"{}: Client disconnected [{}]",
"[Server]".bold(),
peer.green()
);
} }
sc64::ServerEvent::Err(error) => { sc64::ServerEvent::Err(error) => {
println!( println!(
"{}: Client disconnected with error: {}", "{}: Client disconnected - server error: {}",
"[Server]".bold(), "[Server]".bold(),
error error.red()
); );
} }
})?; })?;

View File

@ -1,7 +1,7 @@
use super::error::Error; use super::error::Error;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
io::{BufRead, BufReader, BufWriter, ErrorKind, Read, Write}, io::{BufReader, BufWriter, ErrorKind, Read, Write},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -165,24 +165,35 @@ struct TcpBackend {
stream: TcpStream, stream: TcpStream,
reader: BufReader<TcpStream>, reader: BufReader<TcpStream>,
writer: BufWriter<TcpStream>, writer: BufWriter<TcpStream>,
header_position: usize,
header: [u8; 4],
} }
impl TcpBackend { impl TcpBackend {
fn bytes_to_read(&mut self) -> Result<usize, Error> { fn read_header(&mut self, block: bool) -> Result<bool, Error> {
self.stream.set_nonblocking(true)?; self.stream.set_nonblocking(!block)?;
let result = self.reader.fill_buf(); while self.header_position != 4 {
let length = match result { self.header_position +=
Ok(buffer) => buffer.len(), match self.reader.read(&mut self.header[self.header_position..4]) {
Err(error) => { Ok(0) => return Err(Error::new("Unexpected end of stream")),
if error.kind() == ErrorKind::WouldBlock { Ok(length) => length,
0 Err(error) => match error.kind() {
} else { ErrorKind::Interrupted => 0,
return Err(error.into()); ErrorKind::WouldBlock => 0,
} _ => return Err(error.into()),
} },
}; };
if !block {
break;
}
}
self.stream.set_nonblocking(false)?; self.stream.set_nonblocking(false)?;
return Ok(length); if self.header_position == 4 {
self.header_position = 0;
return Ok(true);
} else {
return Ok(false);
}
} }
} }
@ -211,10 +222,8 @@ impl Backend for TcpBackend {
) -> Result<Option<Response>, Error> { ) -> Result<Option<Response>, Error> {
let mut buffer = [0u8; 4]; let mut buffer = [0u8; 4];
while matches!(data_type, DataType::Response) || self.bytes_to_read()? >= 4 { while self.read_header(matches!(data_type, DataType::Response))? {
self.reader.read_exact(&mut buffer)?; let payload_data_type: DataType = u32::from_be_bytes(self.header).try_into()?;
let payload_data_type: DataType = u32::from_be_bytes(buffer).try_into()?;
match payload_data_type { match payload_data_type {
DataType::Response => { DataType::Response => {
let mut response_info = vec![0u8; 2]; let mut response_info = vec![0u8; 2];
@ -278,6 +287,8 @@ fn new_tcp_backend(address: &str) -> Result<TcpBackend, Error> {
stream, stream,
reader, reader,
writer, writer,
header_position: 0,
header: [0, 0, 0, 0],
}) })
} }
@ -386,7 +397,7 @@ pub fn list_local_devices() -> Result<Vec<LocalDevice>, Error> {
pub enum ServerEvent { pub enum ServerEvent {
Listening(String), Listening(String),
Connection(String), Connected(String),
Disconnected(String), Disconnected(String),
Err(String), Err(String),
} }
@ -419,6 +430,7 @@ fn server_accept_connection(
stream: &mut TcpStream, stream: &mut TcpStream,
) -> Result<(), Error> { ) -> Result<(), Error> {
let peer = stream.peer_addr()?.to_string(); let peer = stream.peer_addr()?.to_string();
event_callback(ServerEvent::Connected(peer.clone()));
stream.set_write_timeout(Some(Duration::from_secs(10)))?; stream.set_write_timeout(Some(Duration::from_secs(10)))?;
stream.set_read_timeout(Some(Duration::from_secs(10)))?; stream.set_read_timeout(Some(Duration::from_secs(10)))?;
@ -430,25 +442,35 @@ fn server_accept_connection(
serial_backend.reset()?; serial_backend.reset()?;
let mut packets: VecDeque<Packet> = VecDeque::new(); let mut packets: VecDeque<Packet> = VecDeque::new();
let mut buffer = [0u8; 4];
let mut keepalive = Instant::now(); let mut keepalive = Instant::now();
let mut header_position = 0;
event_callback(ServerEvent::Connection(peer.clone())); let mut header = [0u8; 4];
loop { loop {
stream.set_nonblocking(true)?; stream.set_nonblocking(true)?;
match reader.read_exact(&mut buffer) { header_position += match stream.read(&mut header[header_position..4]) {
Ok(()) => { Ok(0) => {
event_callback(ServerEvent::Disconnected(peer.clone()));
return Ok(());
}
Ok(length) => length,
Err(error) => match error.kind() {
ErrorKind::WouldBlock => 0,
_ => return Err(error.into()),
},
};
stream.set_nonblocking(false)?; stream.set_nonblocking(false)?;
let data_type: DataType = u32::from_be_bytes(buffer).try_into()?; if header_position == 4 {
header_position = 0;
let data_type: DataType = u32::from_be_bytes(header).try_into()?;
if !matches!(data_type, DataType::Command) { if !matches!(data_type, DataType::Command) {
return Err(Error::new("Received data type wasn't a command data type")); return Err(Error::new("Received data type wasn't a command data type"));
} }
let mut buffer = [0u8; 4];
let mut id_buffer = [0u8; 1]; let mut id_buffer = [0u8; 1];
let mut args = [0u32; 2]; let mut args = [0u32; 2];
@ -468,18 +490,7 @@ fn server_accept_connection(
args, args,
data: &data, data: &data,
})?; })?;
} else if let Some(response) =
continue;
}
Err(error) => {
if error.kind() != ErrorKind::WouldBlock {
event_callback(ServerEvent::Disconnected(peer.clone()));
return Ok(());
}
stream.set_nonblocking(false)?;
}
}
if let Some(response) =
serial_backend.process_incoming_data(DataType::Packet, &mut packets)? serial_backend.process_incoming_data(DataType::Packet, &mut packets)?
{ {
writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?; writer.write_all(&u32::to_be_bytes(DataType::Response.into()))?;