mirror of
https://github.com/Polprzewodnikowy/SummerCart64.git
synced 2024-12-27 05:21:55 +01:00
pc software update
This commit is contained in:
parent
49185981a3
commit
0e79411740
1
build.sh
1
build.sh
@ -7,6 +7,7 @@ PACKAGE_FILE_NAME="SC64"
|
||||
FILES=(
|
||||
"./fw/ftdi/ft232h_config.xml"
|
||||
"./fw/project/lcmxo2/impl1/sc64_impl1.twr"
|
||||
"./sw/pc/dd64.py"
|
||||
"./sw/pc/sc64.py"
|
||||
"./sw/update/sc64.upd"
|
||||
"./LICENSE"
|
||||
|
@ -108,12 +108,12 @@ void display_init (uint32_t *background) {
|
||||
|
||||
if (background == NULL) {
|
||||
for (int i = 0; i < (SCREEN_WIDTH * SCREEN_HEIGHT); i += 1) {
|
||||
display_framebuffer[i] = BACKGROUND_COLOR;
|
||||
io_write(&display_framebuffer[i], BACKGROUND_COLOR);
|
||||
}
|
||||
} else {
|
||||
for (int i = 0; i < (SCREEN_WIDTH * SCREEN_HEIGHT); i += 2) {
|
||||
display_framebuffer[i] = *background;
|
||||
display_framebuffer[i + 1] = *background;
|
||||
io_write(&display_framebuffer[i], *background);
|
||||
io_write(&display_framebuffer[i + 1], *background);
|
||||
background++;
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ const uint8_t font_data[96][FONT_CHAR_BYTES] = {
|
||||
{ 0x00, 0x00, 0x00, 0x00, 0x7C, 0x00, 0x00, 0x00, },
|
||||
{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x18, 0x18, 0x00, },
|
||||
{ 0x60, 0x60, 0x30, 0x30, 0x18, 0x18, 0x0C, 0x0C, },
|
||||
{ 0x3C, 0x66, 0x6E, 0x7E, 0x76, 0x66, 0x3C, 0x00, },
|
||||
{ 0x3C, 0x66, 0x66, 0x66, 0x66, 0x66, 0x3C, 0x00, },
|
||||
{ 0x18, 0x1E, 0x18, 0x18, 0x18, 0x18, 0x7E, 0x00, },
|
||||
{ 0x3C, 0x66, 0x60, 0x30, 0x18, 0x0C, 0x7E, 0x00, },
|
||||
{ 0x3E, 0x60, 0x60, 0x3C, 0x60, 0x60, 0x3E, 0x00, },
|
||||
|
@ -3,11 +3,12 @@ from io import BufferedReader
|
||||
from typing import Optional
|
||||
|
||||
|
||||
|
||||
class BadBlockError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SixtyFourDiskDrive:
|
||||
class DD64Image:
|
||||
__DISK_HEADS = 2
|
||||
__DISK_TRACKS = 1175
|
||||
__DISK_BLOCKS_PER_TRACK = 2
|
||||
@ -42,12 +43,12 @@ class SixtyFourDiskDrive:
|
||||
[0, 1, 2, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10, 9, 8],
|
||||
]
|
||||
__DISK_DRIVE_TYPES = [(
|
||||
"development",
|
||||
'development',
|
||||
192,
|
||||
[11, 10, 3, 2],
|
||||
[0, 1, 8, 9, 16, 17, 18, 19, 20, 21, 22, 23],
|
||||
), (
|
||||
"retail",
|
||||
'retail',
|
||||
232,
|
||||
[9, 8, 1, 0],
|
||||
[2, 3, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23],
|
||||
@ -56,6 +57,7 @@ class SixtyFourDiskDrive:
|
||||
__file: Optional[BufferedReader]
|
||||
__drive_type: Optional[str]
|
||||
__block_info_table: list[tuple[int, int]]
|
||||
loaded: bool = False
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.__file = None
|
||||
@ -108,7 +110,7 @@ class SixtyFourDiskDrive:
|
||||
disk_bad_lbas.append(id_lba)
|
||||
|
||||
if not (disk_system_data and disk_id_data):
|
||||
raise ValueError("Provided 64DD disk file is not valid")
|
||||
raise ValueError('Provided 64DD disk file is not valid')
|
||||
|
||||
disk_zone_bad_tracks = []
|
||||
|
||||
@ -153,11 +155,11 @@ class SixtyFourDiskDrive:
|
||||
|
||||
def __check_track_head_block(self, track: int, head: int, block: int) -> None:
|
||||
if (track < 0 or track >= self.__DISK_TRACKS):
|
||||
raise ValueError("Track outside of possible range")
|
||||
raise ValueError('Track outside of possible range')
|
||||
if (head < 0 or head >= self.__DISK_HEADS):
|
||||
raise ValueError("Head outside of possible range")
|
||||
raise ValueError('Head outside of possible range')
|
||||
if (block < 0 or block >= self.__DISK_BLOCKS_PER_TRACK):
|
||||
raise ValueError("Block outside of possible range")
|
||||
raise ValueError('Block outside of possible range')
|
||||
|
||||
def __get_table_index(self, track: int, head: int, block: int) -> int:
|
||||
return (track << 2) | (head << 1) | (block)
|
||||
@ -173,8 +175,10 @@ class SixtyFourDiskDrive:
|
||||
self.unload()
|
||||
self.__file = open(path, 'rb+')
|
||||
self.__parse_disk()
|
||||
self.loaded = True
|
||||
|
||||
def unload(self) -> None:
|
||||
self.loaded = False
|
||||
if (self.__file != None and not self.__file.closed):
|
||||
self.__file.close()
|
||||
self.__drive_type = None
|
||||
@ -196,25 +200,26 @@ class SixtyFourDiskDrive:
|
||||
raise BadBlockError
|
||||
(offset, block_size) = info
|
||||
if (len(data) != block_size):
|
||||
raise ValueError(f"Provided data block size is different than expected ({len(data)} != {block_size})")
|
||||
raise ValueError(f'Provided data block size is different than expected ({len(data)} != {block_size})')
|
||||
self.__file.seek(offset)
|
||||
self.__file.write(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if __name__ == '__main__':
|
||||
id_lba_locations = [
|
||||
(7, 0, 1),
|
||||
(7, 0, 0)
|
||||
]
|
||||
if (len(sys.argv) >= 2):
|
||||
dd = SixtyFourDiskDrive()
|
||||
dd = DD64Image()
|
||||
dd.load(sys.argv[1])
|
||||
print(dd.get_drive_type())
|
||||
for (track, head, block) in id_lba_locations:
|
||||
try:
|
||||
print(dd.read_block(track, head, block)[:4])
|
||||
except BadBlockError:
|
||||
print(f"Bad ID block [track: {track}, head: {head}, block: {block}]")
|
||||
print(f'Bad ID block [track: {track}, head: {head}, block: {block}]')
|
||||
dd.unload()
|
||||
else:
|
||||
print(f"[{sys.argv[0]}]: Expected disk image path as first argument")
|
||||
print(f'[{sys.argv[0]}]: Expected disk image path as first argument')
|
@ -1,23 +0,0 @@
|
||||
from io import TextIOWrapper
|
||||
import contextlib
|
||||
import os.path
|
||||
import platform
|
||||
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock_volume(volume: TextIOWrapper):
|
||||
if (os.path.ismount(volume.name)):
|
||||
if (platform.system().startswith("Windows")):
|
||||
import msvcrt
|
||||
import win32file
|
||||
import winioctlcon
|
||||
handle = msvcrt.get_osfhandle(volume.fileno())
|
||||
win32file.DeviceIoControl(handle, winioctlcon.FSCTL_LOCK_VOLUME, None, None)
|
||||
try:
|
||||
yield volume
|
||||
finally:
|
||||
try:
|
||||
volume.flush()
|
||||
finally:
|
||||
win32file.DeviceIoControl(handle, winioctlcon.FSCTL_UNLOCK_VOLUME, None, None)
|
@ -1,254 +0,0 @@
|
||||
from io import BufferedReader
|
||||
from time import sleep
|
||||
from sc64_64dd import BadBlockError, SixtyFourDiskDrive
|
||||
from sc64_transport import SC64Transport
|
||||
|
||||
class SC64Exception(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SC64:
|
||||
__VERSION_V2 = b'SCv2'
|
||||
|
||||
__CFG_ID_BOOTLOADER_SWITCH = 0
|
||||
__CFG_ID_ROM_WRITE_ENABLE = 1
|
||||
__CFG_ID_ROM_SHADOW_ENABLE = 2
|
||||
__CFG_ID_DD_MODE = 3
|
||||
__CFG_ID_ISV_ENABLE = 4
|
||||
__CFG_ID_BOOT_MODE = 5
|
||||
__CFG_ID_SAVE_TYPE = 6
|
||||
__CFG_ID_CIC_SEED = 7
|
||||
__CFG_ID_TV_TYPE = 8
|
||||
__CFG_ID_FLASH_ERASE_BLOCK = 9
|
||||
__CFG_ID_DD_DRIVE_TYPE = 10
|
||||
__CFG_ID_DD_DISK_STATE = 11
|
||||
|
||||
|
||||
__SDRAM_ADDRESS = 0x00000000
|
||||
__SDRAM_LENGTH = (64 * 1024 * 1024)
|
||||
__DDIPL_ADDRESS = 0x03BC0000
|
||||
__DDIPL_LENGTH = (4 * 1024 * 1024)
|
||||
__SAVE_ADDRESS = 0x03FE0000
|
||||
__SAVE_LENGTH = (128 * 1024)
|
||||
__FLASH_ADDRESS = 0x04000000
|
||||
__FLASH_LENGTH = (16 * 1024 * 1024)
|
||||
__EXTENDED_ROM_ADDRESS = 0x04000000
|
||||
__EXTENDED_ROM_LENGTH = (14 * 1024 * 1024)
|
||||
__BOOTLOADER_ADDRESS = 0x04E00000
|
||||
__BOOTLOADER_LENGTH = (1920 * 1024)
|
||||
__SHADOW_ADDRESS = 0x04FE0000
|
||||
__SHADOW_LENGTH = (128 * 1024)
|
||||
__BUFFER_ADDRESS = 0x06000000
|
||||
__BUFFER_LENGTH = (8 * 1024)
|
||||
__EEPROM_ADDRESS = 0x06002000
|
||||
__EEPROM_LENGTH = (2 * 1024)
|
||||
__DD_SECTOR_ADDRESS = 0x06006000
|
||||
__DD_SECTOR_LENGTH = (2 * 1024)
|
||||
|
||||
__FLASH_ERASE_SIZE = (128 * 1024)
|
||||
|
||||
__CHUNK_SIZE = (128 * 1024)
|
||||
|
||||
__MAX_ROM_SIZE = __SDRAM_LENGTH + __EXTENDED_ROM_LENGTH
|
||||
|
||||
__BOOT_MODE = [
|
||||
'sd',
|
||||
'usb',
|
||||
'rom',
|
||||
'ddipl',
|
||||
'direct',
|
||||
]
|
||||
__SAVE_TYPE = [
|
||||
'none',
|
||||
'eeprom4k',
|
||||
'eeprom16k',
|
||||
'sram',
|
||||
'flashram',
|
||||
'sram3x',
|
||||
]
|
||||
__SAVE_LENGTH = [
|
||||
0,
|
||||
512,
|
||||
(2 * 1024),
|
||||
(32 * 1024),
|
||||
(128 * 1024),
|
||||
(3 * 32 * 1024),
|
||||
]
|
||||
__SAVE_ADDRESS = [
|
||||
0,
|
||||
__EEPROM_ADDRESS,
|
||||
__EEPROM_ADDRESS,
|
||||
__SAVE_ADDRESS,
|
||||
__SAVE_ADDRESS,
|
||||
__SAVE_ADDRESS,
|
||||
]
|
||||
__DD_MODE = [
|
||||
'none',
|
||||
'full',
|
||||
'ddipl',
|
||||
]
|
||||
__DD_DRIVE_TYPE = [
|
||||
'retail',
|
||||
'development',
|
||||
]
|
||||
__DD_DISK_STATE = [
|
||||
'ejected',
|
||||
'inserted',
|
||||
'changed',
|
||||
]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.__transport = SC64Transport()
|
||||
self.__transport.connect()
|
||||
version = self.__transport.execute_cmd(cmd=b'v', args=[0, 0])
|
||||
if (version != self.__VERSION_V2):
|
||||
raise SC64Exception("SC64 version different than expected")
|
||||
|
||||
def __get_int(self, data: bytes) -> int:
|
||||
return int.from_bytes(data[:4], byteorder="big")
|
||||
|
||||
def __set_config(self, config: int, value: int) -> None:
|
||||
self.__transport.execute_cmd(b'C', [config, value])
|
||||
|
||||
def __get_config(self, config: int) -> int:
|
||||
return int.from_bytes(self.__transport.execute_cmd(b'c', args=[config, 0]), byteorder="big")
|
||||
|
||||
def __write_memory(self, address: int, data: bytes) -> None:
|
||||
self.__transport.execute_cmd(
|
||||
cmd=b'M', args=[address, len(data)], data=data)
|
||||
|
||||
def __read_memory(self, address: int, length: int):
|
||||
return self.__transport.execute_cmd(cmd=b'm', args=[address, length])
|
||||
|
||||
def __erase_flash_block(self, address: int):
|
||||
self.__set_config(self.__CFG_ID_FLASH_ERASE_BLOCK, address)
|
||||
|
||||
def __erase_flash(self, address: int, length: int):
|
||||
if (address < self.__FLASH_ADDRESS):
|
||||
raise ValueError
|
||||
if (address + length >= self.__FLASH_ADDRESS + self.__FLASH_LENGTH):
|
||||
raise ValueError
|
||||
if (address % self.__FLASH_ERASE_SIZE != 0):
|
||||
raise ValueError
|
||||
if (length % self.__FLASH_ERASE_SIZE != 0):
|
||||
raise ValueError
|
||||
for offset in range(address, address + length, self.__FLASH_ERASE_SIZE):
|
||||
self.__erase_flash_block(offset)
|
||||
|
||||
def set_boot_mode(self, boot_mode: str) -> None:
|
||||
value = self.__BOOT_MODE.index(boot_mode)
|
||||
self.__set_config(self.__CFG_ID_BOOT_MODE, value)
|
||||
|
||||
def set_save_type(self, save_type: str) -> None:
|
||||
value = self.__SAVE_TYPE.index(save_type)
|
||||
self.__set_config(self.__CFG_ID_SAVE_TYPE, value)
|
||||
|
||||
def set_dd_mode(self, dd_mode: str) -> None:
|
||||
value = self.__DD_MODE.index(dd_mode)
|
||||
self.__set_config(self.__CFG_ID_DD_MODE, value)
|
||||
|
||||
def set_dd_drive_type(self, dd_drive_type: str) -> None:
|
||||
value = self.__DD_DRIVE_TYPE.index(dd_drive_type)
|
||||
self.__set_config(self.__CFG_ID_DD_DRIVE_TYPE, value)
|
||||
|
||||
def set_dd_disk_state(self, dd_disk_state: str) -> None:
|
||||
value = self.__DD_DISK_STATE.index(dd_disk_state)
|
||||
self.__set_config(self.__CFG_ID_DD_DISK_STATE, value)
|
||||
|
||||
def upload_rom(self, data: bytes):
|
||||
if (len(data) > self.__SDRAM_LENGTH):
|
||||
raise ValueError
|
||||
self.__write_memory(self.__SDRAM_ADDRESS, data)
|
||||
|
||||
def upload_ddipl(self, data: bytes) -> None:
|
||||
if (len(data) > self.__DDIPL_LENGTH):
|
||||
raise ValueError
|
||||
self.__write_memory(self.__DDIPL_ADDRESS, data)
|
||||
|
||||
def upload_save(self, data: bytes) -> None:
|
||||
save_type = self.__get_config(self.__CFG_ID_SAVE_TYPE)
|
||||
if (save_type < 0 or save_type >= len(self.__SAVE_TYPE)):
|
||||
raise ValueError
|
||||
if (len(data) != self.__SAVE_LENGTH[save_type]):
|
||||
raise ValueError
|
||||
self.__write_memory(self.__SAVE_ADDRESS[save_type], data)
|
||||
|
||||
def upload_bootloader(self, data: bytes) -> None:
|
||||
if (len(data) > self.__BOOTLOADER_LENGTH):
|
||||
raise ValueError
|
||||
self.__erase_flash(self.__BOOTLOADER_ADDRESS, self.__BOOTLOADER_LENGTH)
|
||||
self.__write_memory(self.__BOOTLOADER_ADDRESS, data)
|
||||
|
||||
def __handle_dd_packet(self, dd: SixtyFourDiskDrive, data: bytes) -> None:
|
||||
CMD_READ_BLOCK = 1
|
||||
CMD_WRITE_BLOCK = 2
|
||||
cmd = self.__get_int(data[0:])
|
||||
address = self.__get_int(data[4:])
|
||||
track_head_block = self.__get_int(data[8:])
|
||||
track = (track_head_block >> 2) & 0xFFF
|
||||
head = (track_head_block >> 1) & 0x1
|
||||
block = track_head_block & 0x1
|
||||
try:
|
||||
if (cmd == CMD_READ_BLOCK):
|
||||
block_data = dd.read_block(track, head, block)
|
||||
self.__transport.execute_cmd(cmd=b'D', args=[address, len(block_data)], data=block_data)
|
||||
elif (cmd == CMD_WRITE_BLOCK):
|
||||
dd.write_block(track, head, block, data[12:])
|
||||
self.__transport.execute_cmd(cmd=b'd', args=[0, 0])
|
||||
else:
|
||||
self.__transport.execute_cmd(cmd=b'd', args=[-1, 0])
|
||||
except BadBlockError:
|
||||
self.__transport.execute_cmd(cmd=b'd', args=[1, 0])
|
||||
|
||||
def __handle_isv_packet(self, data: bytes) -> None:
|
||||
print(data.decode("EUC-JP", errors="backslashreplace"), end="")
|
||||
|
||||
def debug_server(self, disk_path: str) -> None:
|
||||
dd = SixtyFourDiskDrive()
|
||||
dd.load(disk_path)
|
||||
self.set_dd_drive_type(dd.get_drive_type())
|
||||
self.set_dd_disk_state('inserted')
|
||||
|
||||
self.__set_config(self.__CFG_ID_ROM_WRITE_ENABLE, 1)
|
||||
self.__set_config(self.__CFG_ID_ISV_ENABLE, 1)
|
||||
self.__set_config(self.__CFG_ID_TV_TYPE, 1)
|
||||
|
||||
while (True):
|
||||
packet = self.__transport.get_packet()
|
||||
if (packet != None):
|
||||
(cmd, data) = packet
|
||||
if (cmd == b'D'):
|
||||
self.__handle_dd_packet(dd, data)
|
||||
if (cmd == b'I'):
|
||||
self.__handle_isv_packet(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sc64 = SC64()
|
||||
|
||||
sc64.set_boot_mode('rom')
|
||||
print('set boot mode')
|
||||
|
||||
sc64.set_dd_mode('none')
|
||||
print('set dd mode')
|
||||
|
||||
with open('S:/n64/roms/ZELOOTD.z64', 'rb') as f:
|
||||
sc64.upload_rom(f.read())
|
||||
print('uploaded rom')
|
||||
|
||||
# with open('S:/n64/64dd/ipl/NDDJ2.n64', 'rb') as f:
|
||||
# sc64.upload_ddipl(f.read())
|
||||
# print('uploaded ddipl')
|
||||
|
||||
sc64.set_save_type('sram')
|
||||
print('set save type')
|
||||
|
||||
# with open('S:/n64/saves/SM64.eep', 'rb') as f:
|
||||
# sc64.upload_save(f.read())
|
||||
# print('uploaded save')
|
||||
|
||||
try:
|
||||
print('starting debug server')
|
||||
sc64.debug_server(disk_path='S:/n64/64dd/rtl/Mario Artist Polygon Studio.ndd')
|
||||
except KeyboardInterrupt:
|
||||
pass
|
@ -1,171 +0,0 @@
|
||||
from serial.tools import list_ports
|
||||
from threading import Thread
|
||||
from typing import Optional
|
||||
import queue
|
||||
import serial
|
||||
import time
|
||||
|
||||
|
||||
class ConnectionException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SC64Transport:
|
||||
__disconnect = False
|
||||
__serial = None
|
||||
__thread_read = None
|
||||
__thread_write = None
|
||||
__queue_output = queue.Queue()
|
||||
__queue_input = queue.Queue()
|
||||
__queue_packet = queue.Queue()
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.__disconnect = True
|
||||
if (self.__thread_read.is_alive()):
|
||||
self.__thread_read.join(5)
|
||||
if (self.__thread_write.is_alive()):
|
||||
self.__thread_write.join(5)
|
||||
if (self.__serial != None and self.__serial.is_open):
|
||||
self.__serial.close()
|
||||
|
||||
def connect(self) -> None:
|
||||
ports = list_ports.comports()
|
||||
device_found = False
|
||||
|
||||
if (self.__serial != None and self.__serial.is_open):
|
||||
raise ConnectionException("Serial port is already open")
|
||||
|
||||
for p in ports:
|
||||
if (p.vid == 0x0403 and p.pid == 0x6014 and p.serial_number.startswith("SC64")):
|
||||
try:
|
||||
self.__serial = serial.Serial(
|
||||
p.device, timeout=1.0, write_timeout=5.0)
|
||||
self.__reset_link()
|
||||
except (serial.SerialException, ConnectionException):
|
||||
if (self.__serial):
|
||||
self.__serial.close()
|
||||
continue
|
||||
device_found = True
|
||||
break
|
||||
|
||||
if (not device_found):
|
||||
raise ConnectionException("No SummerCart64 device was found")
|
||||
|
||||
self.__thread_read = Thread(
|
||||
target=self.__serial_process_input, daemon=True)
|
||||
self.__thread_write = Thread(
|
||||
target=self.__serial_process_output, daemon=True)
|
||||
|
||||
self.__thread_read.start()
|
||||
self.__thread_write.start()
|
||||
|
||||
def __reset_link(self) -> None:
|
||||
self.__serial.reset_output_buffer()
|
||||
|
||||
retry_counter = 0
|
||||
self.__serial.dtr = 1
|
||||
while (self.__serial.dsr == 0):
|
||||
time.sleep(0.1)
|
||||
retry_counter += 1
|
||||
if (retry_counter >= 10):
|
||||
raise ConnectionException
|
||||
|
||||
self.__serial.reset_input_buffer()
|
||||
|
||||
retry_counter = 0
|
||||
self.__serial.dtr = 0
|
||||
while (self.__serial.dsr == 1):
|
||||
time.sleep(0.1)
|
||||
retry_counter += 1
|
||||
if (retry_counter >= 10):
|
||||
raise ConnectionException
|
||||
|
||||
def __write(self, data: bytes) -> None:
|
||||
try:
|
||||
if (self.__disconnect):
|
||||
raise ConnectionException
|
||||
self.__serial.write(data)
|
||||
self.__serial.flush()
|
||||
except serial.SerialTimeoutException:
|
||||
raise ConnectionException
|
||||
except serial.SerialException:
|
||||
raise ConnectionException
|
||||
|
||||
def __read(self, length: int) -> bytes:
|
||||
try:
|
||||
data = b''
|
||||
while (len(data) < length and not self.__disconnect):
|
||||
data += self.__serial.read(length - len(data))
|
||||
if (self.__disconnect):
|
||||
raise ConnectionException
|
||||
return data
|
||||
except serial.SerialException:
|
||||
raise ConnectionException
|
||||
|
||||
def __read_int(self) -> int:
|
||||
return int.from_bytes(self.__read(4), byteorder="big")
|
||||
|
||||
def __serial_process_output(self) -> None:
|
||||
while (not self.__disconnect and self.__serial != None and self.__serial.is_open):
|
||||
try:
|
||||
packet: bytes = self.__queue_output.get(timeout=0.1)
|
||||
self.__write(packet)
|
||||
self.__queue_output.task_done()
|
||||
except queue.Empty:
|
||||
continue
|
||||
except ConnectionException:
|
||||
break
|
||||
|
||||
def __serial_process_input(self) -> None:
|
||||
while (not self.__disconnect and self.__serial != None and self.__serial.is_open):
|
||||
try:
|
||||
token = self.__read(4)
|
||||
if (len(token) == 4):
|
||||
identifier = token[0:3]
|
||||
command = token[3:4]
|
||||
if (identifier == b'PKT'):
|
||||
lenss = self.__read_int()
|
||||
data = self.__read(lenss)
|
||||
self.__queue_packet.put((command, data))
|
||||
elif (identifier == b'CMP' or identifier == b'ERR'):
|
||||
data = self.__read(self.__read_int())
|
||||
success = identifier == b'CMP'
|
||||
self.__queue_input.put((command, data, success))
|
||||
else:
|
||||
raise Exception
|
||||
except ConnectionException:
|
||||
break
|
||||
|
||||
def __queue_cmd(self, command: bytes, args: list[int] = [], data: bytes = b''):
|
||||
packet: bytes = b'CMD'
|
||||
packet += command
|
||||
for arg in args:
|
||||
packet += arg.to_bytes(4, byteorder="big")
|
||||
packet += data
|
||||
self.__queue_output.put(packet)
|
||||
|
||||
def __pop_response(self, command: bytes) -> bytes:
|
||||
try:
|
||||
(response_command, data, success) = self.__queue_input.get(timeout=5)
|
||||
if (command != response_command or success == False):
|
||||
raise ConnectionException
|
||||
return data
|
||||
except queue.Empty:
|
||||
raise ConnectionException
|
||||
|
||||
def execute_cmd(self, cmd: bytes, args: list[int] = [], data: bytes = b'', response: bool = True) -> Optional[bytes]:
|
||||
if (len(cmd) != 1):
|
||||
raise ValueError("Length of command is different than 1 byte")
|
||||
command = cmd[0:1]
|
||||
if (len(args) != 2):
|
||||
raise ValueError("Number of arguments is different than 2")
|
||||
self.__queue_cmd(command, args, data)
|
||||
if (response):
|
||||
return self.__pop_response(command)
|
||||
return None
|
||||
|
||||
def get_packet(self) -> Optional[tuple[bytes, bytes]]:
|
||||
try:
|
||||
return self.__queue_packet.get(timeout=1)
|
||||
except queue.Empty:
|
||||
return None
|
@ -1,3 +0,0 @@
|
||||
progressbar2==3.55.0
|
||||
pyft232==0.12
|
||||
pywin32==303; sys_platform == 'win32'
|
1402
sw/pc/sc64.py
1402
sw/pc/sc64.py
File diff suppressed because it is too large
Load Diff
@ -196,10 +196,10 @@ if __name__ == "__main__":
|
||||
update.create_update_data()
|
||||
|
||||
hostname = platform.node()
|
||||
creation_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
|
||||
creation_datetime = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
|
||||
info = [
|
||||
f'build system: [{hostname}]',
|
||||
f'creation time: [{creation_time}]',
|
||||
f'creation datetime: [{creation_datetime}]',
|
||||
]
|
||||
if (args.git):
|
||||
info.append(args.git)
|
||||
|
Loading…
Reference in New Issue
Block a user