SummerCart64/sw/pc/lib/sc64_transport.py

172 lines
6.0 KiB
Python
Raw Normal View History

from serial.tools import list_ports
from threading import Thread
from typing import Optional
import queue
import serial
import time
class ConnectionException(Exception):
pass
class SC64Transport:
__disconnect = False
__serial = None
__thread_read = None
__thread_write = None
__queue_output = queue.Queue()
__queue_input = queue.Queue()
__queue_packet = queue.Queue()
def __del__(self) -> None:
self.__disconnect = True
if (self.__thread_read.is_alive()):
self.__thread_read.join(5)
if (self.__thread_write.is_alive()):
self.__thread_write.join(5)
if (self.__serial != None and self.__serial.is_open):
self.__serial.close()
def connect(self) -> None:
ports = list_ports.comports()
device_found = False
if (self.__serial != None and self.__serial.is_open):
raise ConnectionException("Serial port is already open")
for p in ports:
if (p.vid == 0x0403 and p.pid == 0x6014 and p.serial_number.startswith("SC64")):
try:
self.__serial = serial.Serial(
p.device, timeout=1.0, write_timeout=5.0)
self.__reset_link()
except (serial.SerialException, ConnectionException):
if (self.__serial):
self.__serial.close()
continue
device_found = True
break
if (not device_found):
raise ConnectionException("No SummerCart64 device was found")
self.__thread_read = Thread(
target=self.__serial_process_input, daemon=True)
self.__thread_write = Thread(
target=self.__serial_process_output, daemon=True)
self.__thread_read.start()
self.__thread_write.start()
def __reset_link(self) -> None:
self.__serial.reset_output_buffer()
retry_counter = 0
self.__serial.dtr = 1
while (self.__serial.dsr == 0):
time.sleep(0.1)
retry_counter += 1
if (retry_counter >= 10):
raise ConnectionException
self.__serial.reset_input_buffer()
retry_counter = 0
self.__serial.dtr = 0
while (self.__serial.dsr == 1):
time.sleep(0.1)
retry_counter += 1
if (retry_counter >= 10):
raise ConnectionException
def __write(self, data: bytes) -> None:
try:
if (self.__disconnect):
raise ConnectionException
self.__serial.write(data)
self.__serial.flush()
except serial.SerialTimeoutException:
raise ConnectionException
except serial.SerialException:
raise ConnectionException
def __read(self, length: int) -> bytes:
try:
data = b''
while (len(data) < length and not self.__disconnect):
data += self.__serial.read(length - len(data))
if (self.__disconnect):
raise ConnectionException
return data
except serial.SerialException:
raise ConnectionException
def __read_int(self) -> int:
return int.from_bytes(self.__read(4), byteorder="big")
def __serial_process_output(self) -> None:
while (not self.__disconnect and self.__serial != None and self.__serial.is_open):
try:
packet: bytes = self.__queue_output.get(timeout=0.1)
self.__write(packet)
self.__queue_output.task_done()
except queue.Empty:
continue
except ConnectionException:
break
def __serial_process_input(self) -> None:
while (not self.__disconnect and self.__serial != None and self.__serial.is_open):
try:
token = self.__read(4)
if (len(token) == 4):
identifier = token[0:3]
command = token[3:4]
if (identifier == b'PKT'):
lenss = self.__read_int()
data = self.__read(lenss)
self.__queue_packet.put((command, data))
elif (identifier == b'CMP' or identifier == b'ERR'):
data = self.__read(self.__read_int())
success = identifier == b'CMP'
self.__queue_input.put((command, data, success))
else:
raise Exception
except ConnectionException:
break
def __queue_cmd(self, command: bytes, args: list[int] = [], data: bytes = b''):
packet: bytes = b'CMD'
packet += command
for arg in args:
packet += arg.to_bytes(4, byteorder="big")
packet += data
self.__queue_output.put(packet)
def __pop_response(self, command: bytes) -> bytes:
try:
(response_command, data, success) = self.__queue_input.get(timeout=5)
if (command != response_command or success == False):
raise ConnectionException
return data
except queue.Empty:
raise ConnectionException
def execute_cmd(self, cmd: bytes, args: list[int] = [], data: bytes = b'', response: bool = True) -> Optional[bytes]:
if (len(cmd) != 1):
raise ValueError("Length of command is different than 1 byte")
command = cmd[0:1]
if (len(args) != 2):
raise ValueError("Number of arguments is different than 2")
self.__queue_cmd(command, args, data)
if (response):
return self.__pop_response(command)
return None
def get_packet(self) -> Optional[tuple[bytes, bytes]]:
try:
return self.__queue_packet.get(timeout=1)
except queue.Empty:
return None