From 0e7941174086ec6932ee691960d310f0a03c510c Mon Sep 17 00:00:00 2001 From: Polprzewodnikowy Date: Mon, 22 Aug 2022 01:00:14 +0200 Subject: [PATCH] pc software update --- build.sh | 1 + sw/bootloader/src/display.c | 6 +- sw/bootloader/src/font.c | 2 +- sw/pc/{lib/sc64_64dd.py => dd64.py} | 29 +- sw/pc/helpers.py | 23 - sw/pc/lib/sc64.py | 254 ----- sw/pc/lib/sc64_transport.py | 171 ---- sw/pc/requirements.txt | 3 - sw/pc/sc64.py | 1402 ++++++++++----------------- sw/update/update.py | 4 +- 10 files changed, 556 insertions(+), 1339 deletions(-) rename sw/pc/{lib/sc64_64dd.py => dd64.py} (88%) delete mode 100644 sw/pc/helpers.py delete mode 100644 sw/pc/lib/sc64.py delete mode 100644 sw/pc/lib/sc64_transport.py delete mode 100644 sw/pc/requirements.txt diff --git a/build.sh b/build.sh index 4187689..f1a4882 100755 --- a/build.sh +++ b/build.sh @@ -7,6 +7,7 @@ PACKAGE_FILE_NAME="SC64" FILES=( "./fw/ftdi/ft232h_config.xml" "./fw/project/lcmxo2/impl1/sc64_impl1.twr" + "./sw/pc/dd64.py" "./sw/pc/sc64.py" "./sw/update/sc64.upd" "./LICENSE" diff --git a/sw/bootloader/src/display.c b/sw/bootloader/src/display.c index a9255e2..7d1be8d 100644 --- a/sw/bootloader/src/display.c +++ b/sw/bootloader/src/display.c @@ -108,12 +108,12 @@ void display_init (uint32_t *background) { if (background == NULL) { for (int i = 0; i < (SCREEN_WIDTH * SCREEN_HEIGHT); i += 1) { - display_framebuffer[i] = BACKGROUND_COLOR; + io_write(&display_framebuffer[i], BACKGROUND_COLOR); } } else { for (int i = 0; i < (SCREEN_WIDTH * SCREEN_HEIGHT); i += 2) { - display_framebuffer[i] = *background; - display_framebuffer[i + 1] = *background; + io_write(&display_framebuffer[i], *background); + io_write(&display_framebuffer[i + 1], *background); background++; } } diff --git a/sw/bootloader/src/font.c b/sw/bootloader/src/font.c index bd349a6..0732a3f 100644 --- a/sw/bootloader/src/font.c +++ b/sw/bootloader/src/font.c @@ -20,7 +20,7 @@ const uint8_t font_data[96][FONT_CHAR_BYTES] = { { 0x00, 0x00, 0x00, 0x00, 0x7C, 0x00, 0x00, 0x00, }, { 0x00, 0x00, 0x00, 0x00, 0x00, 0x18, 0x18, 0x00, }, { 0x60, 0x60, 0x30, 0x30, 0x18, 0x18, 0x0C, 0x0C, }, - { 0x3C, 0x66, 0x6E, 0x7E, 0x76, 0x66, 0x3C, 0x00, }, + { 0x3C, 0x66, 0x66, 0x66, 0x66, 0x66, 0x3C, 0x00, }, { 0x18, 0x1E, 0x18, 0x18, 0x18, 0x18, 0x7E, 0x00, }, { 0x3C, 0x66, 0x60, 0x30, 0x18, 0x0C, 0x7E, 0x00, }, { 0x3E, 0x60, 0x60, 0x3C, 0x60, 0x60, 0x3E, 0x00, }, diff --git a/sw/pc/lib/sc64_64dd.py b/sw/pc/dd64.py similarity index 88% rename from sw/pc/lib/sc64_64dd.py rename to sw/pc/dd64.py index e2eb2b8..48f7b25 100644 --- a/sw/pc/lib/sc64_64dd.py +++ b/sw/pc/dd64.py @@ -3,11 +3,12 @@ from io import BufferedReader from typing import Optional + class BadBlockError(Exception): pass -class SixtyFourDiskDrive: +class DD64Image: __DISK_HEADS = 2 __DISK_TRACKS = 1175 __DISK_BLOCKS_PER_TRACK = 2 @@ -42,12 +43,12 @@ class SixtyFourDiskDrive: [0, 1, 2, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10, 9, 8], ] __DISK_DRIVE_TYPES = [( - "development", + 'development', 192, [11, 10, 3, 2], [0, 1, 8, 9, 16, 17, 18, 19, 20, 21, 22, 23], ), ( - "retail", + 'retail', 232, [9, 8, 1, 0], [2, 3, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23], @@ -56,6 +57,7 @@ class SixtyFourDiskDrive: __file: Optional[BufferedReader] __drive_type: Optional[str] __block_info_table: list[tuple[int, int]] + loaded: bool = False def __init__(self) -> None: self.__file = None @@ -108,7 +110,7 @@ class SixtyFourDiskDrive: disk_bad_lbas.append(id_lba) if not (disk_system_data and disk_id_data): - raise ValueError("Provided 64DD disk file is not valid") + raise ValueError('Provided 64DD disk file is not valid') disk_zone_bad_tracks = [] @@ -153,11 +155,11 @@ class SixtyFourDiskDrive: def __check_track_head_block(self, track: int, head: int, block: int) -> None: if (track < 0 or track >= self.__DISK_TRACKS): - raise ValueError("Track outside of possible range") + raise ValueError('Track outside of possible range') if (head < 0 or head >= self.__DISK_HEADS): - raise ValueError("Head outside of possible range") + raise ValueError('Head outside of possible range') if (block < 0 or block >= self.__DISK_BLOCKS_PER_TRACK): - raise ValueError("Block outside of possible range") + raise ValueError('Block outside of possible range') def __get_table_index(self, track: int, head: int, block: int) -> int: return (track << 2) | (head << 1) | (block) @@ -173,8 +175,10 @@ class SixtyFourDiskDrive: self.unload() self.__file = open(path, 'rb+') self.__parse_disk() + self.loaded = True def unload(self) -> None: + self.loaded = False if (self.__file != None and not self.__file.closed): self.__file.close() self.__drive_type = None @@ -196,25 +200,26 @@ class SixtyFourDiskDrive: raise BadBlockError (offset, block_size) = info if (len(data) != block_size): - raise ValueError(f"Provided data block size is different than expected ({len(data)} != {block_size})") + raise ValueError(f'Provided data block size is different than expected ({len(data)} != {block_size})') self.__file.seek(offset) self.__file.write(data) -if __name__ == "__main__": + +if __name__ == '__main__': id_lba_locations = [ (7, 0, 1), (7, 0, 0) ] if (len(sys.argv) >= 2): - dd = SixtyFourDiskDrive() + dd = DD64Image() dd.load(sys.argv[1]) print(dd.get_drive_type()) for (track, head, block) in id_lba_locations: try: print(dd.read_block(track, head, block)[:4]) except BadBlockError: - print(f"Bad ID block [track: {track}, head: {head}, block: {block}]") + print(f'Bad ID block [track: {track}, head: {head}, block: {block}]') dd.unload() else: - print(f"[{sys.argv[0]}]: Expected disk image path as first argument") + print(f'[{sys.argv[0]}]: Expected disk image path as first argument') diff --git a/sw/pc/helpers.py b/sw/pc/helpers.py deleted file mode 100644 index 09d9718..0000000 --- a/sw/pc/helpers.py +++ /dev/null @@ -1,23 +0,0 @@ -from io import TextIOWrapper -import contextlib -import os.path -import platform - - - -@contextlib.contextmanager -def lock_volume(volume: TextIOWrapper): - if (os.path.ismount(volume.name)): - if (platform.system().startswith("Windows")): - import msvcrt - import win32file - import winioctlcon - handle = msvcrt.get_osfhandle(volume.fileno()) - win32file.DeviceIoControl(handle, winioctlcon.FSCTL_LOCK_VOLUME, None, None) - try: - yield volume - finally: - try: - volume.flush() - finally: - win32file.DeviceIoControl(handle, winioctlcon.FSCTL_UNLOCK_VOLUME, None, None) diff --git a/sw/pc/lib/sc64.py b/sw/pc/lib/sc64.py deleted file mode 100644 index 4935e9c..0000000 --- a/sw/pc/lib/sc64.py +++ /dev/null @@ -1,254 +0,0 @@ -from io import BufferedReader -from time import sleep -from sc64_64dd import BadBlockError, SixtyFourDiskDrive -from sc64_transport import SC64Transport - -class SC64Exception(Exception): - pass - - -class SC64: - __VERSION_V2 = b'SCv2' - - __CFG_ID_BOOTLOADER_SWITCH = 0 - __CFG_ID_ROM_WRITE_ENABLE = 1 - __CFG_ID_ROM_SHADOW_ENABLE = 2 - __CFG_ID_DD_MODE = 3 - __CFG_ID_ISV_ENABLE = 4 - __CFG_ID_BOOT_MODE = 5 - __CFG_ID_SAVE_TYPE = 6 - __CFG_ID_CIC_SEED = 7 - __CFG_ID_TV_TYPE = 8 - __CFG_ID_FLASH_ERASE_BLOCK = 9 - __CFG_ID_DD_DRIVE_TYPE = 10 - __CFG_ID_DD_DISK_STATE = 11 - - - __SDRAM_ADDRESS = 0x00000000 - __SDRAM_LENGTH = (64 * 1024 * 1024) - __DDIPL_ADDRESS = 0x03BC0000 - __DDIPL_LENGTH = (4 * 1024 * 1024) - __SAVE_ADDRESS = 0x03FE0000 - __SAVE_LENGTH = (128 * 1024) - __FLASH_ADDRESS = 0x04000000 - __FLASH_LENGTH = (16 * 1024 * 1024) - __EXTENDED_ROM_ADDRESS = 0x04000000 - __EXTENDED_ROM_LENGTH = (14 * 1024 * 1024) - __BOOTLOADER_ADDRESS = 0x04E00000 - __BOOTLOADER_LENGTH = (1920 * 1024) - __SHADOW_ADDRESS = 0x04FE0000 - __SHADOW_LENGTH = (128 * 1024) - __BUFFER_ADDRESS = 0x06000000 - __BUFFER_LENGTH = (8 * 1024) - __EEPROM_ADDRESS = 0x06002000 - __EEPROM_LENGTH = (2 * 1024) - __DD_SECTOR_ADDRESS = 0x06006000 - __DD_SECTOR_LENGTH = (2 * 1024) - - __FLASH_ERASE_SIZE = (128 * 1024) - - __CHUNK_SIZE = (128 * 1024) - - __MAX_ROM_SIZE = __SDRAM_LENGTH + __EXTENDED_ROM_LENGTH - - __BOOT_MODE = [ - 'sd', - 'usb', - 'rom', - 'ddipl', - 'direct', - ] - __SAVE_TYPE = [ - 'none', - 'eeprom4k', - 'eeprom16k', - 'sram', - 'flashram', - 'sram3x', - ] - __SAVE_LENGTH = [ - 0, - 512, - (2 * 1024), - (32 * 1024), - (128 * 1024), - (3 * 32 * 1024), - ] - __SAVE_ADDRESS = [ - 0, - __EEPROM_ADDRESS, - __EEPROM_ADDRESS, - __SAVE_ADDRESS, - __SAVE_ADDRESS, - __SAVE_ADDRESS, - ] - __DD_MODE = [ - 'none', - 'full', - 'ddipl', - ] - __DD_DRIVE_TYPE = [ - 'retail', - 'development', - ] - __DD_DISK_STATE = [ - 'ejected', - 'inserted', - 'changed', - ] - - def __init__(self) -> None: - self.__transport = SC64Transport() - self.__transport.connect() - version = self.__transport.execute_cmd(cmd=b'v', args=[0, 0]) - if (version != self.__VERSION_V2): - raise SC64Exception("SC64 version different than expected") - - def __get_int(self, data: bytes) -> int: - return int.from_bytes(data[:4], byteorder="big") - - def __set_config(self, config: int, value: int) -> None: - self.__transport.execute_cmd(b'C', [config, value]) - - def __get_config(self, config: int) -> int: - return int.from_bytes(self.__transport.execute_cmd(b'c', args=[config, 0]), byteorder="big") - - def __write_memory(self, address: int, data: bytes) -> None: - self.__transport.execute_cmd( - cmd=b'M', args=[address, len(data)], data=data) - - def __read_memory(self, address: int, length: int): - return self.__transport.execute_cmd(cmd=b'm', args=[address, length]) - - def __erase_flash_block(self, address: int): - self.__set_config(self.__CFG_ID_FLASH_ERASE_BLOCK, address) - - def __erase_flash(self, address: int, length: int): - if (address < self.__FLASH_ADDRESS): - raise ValueError - if (address + length >= self.__FLASH_ADDRESS + self.__FLASH_LENGTH): - raise ValueError - if (address % self.__FLASH_ERASE_SIZE != 0): - raise ValueError - if (length % self.__FLASH_ERASE_SIZE != 0): - raise ValueError - for offset in range(address, address + length, self.__FLASH_ERASE_SIZE): - self.__erase_flash_block(offset) - - def set_boot_mode(self, boot_mode: str) -> None: - value = self.__BOOT_MODE.index(boot_mode) - self.__set_config(self.__CFG_ID_BOOT_MODE, value) - - def set_save_type(self, save_type: str) -> None: - value = self.__SAVE_TYPE.index(save_type) - self.__set_config(self.__CFG_ID_SAVE_TYPE, value) - - def set_dd_mode(self, dd_mode: str) -> None: - value = self.__DD_MODE.index(dd_mode) - self.__set_config(self.__CFG_ID_DD_MODE, value) - - def set_dd_drive_type(self, dd_drive_type: str) -> None: - value = self.__DD_DRIVE_TYPE.index(dd_drive_type) - self.__set_config(self.__CFG_ID_DD_DRIVE_TYPE, value) - - def set_dd_disk_state(self, dd_disk_state: str) -> None: - value = self.__DD_DISK_STATE.index(dd_disk_state) - self.__set_config(self.__CFG_ID_DD_DISK_STATE, value) - - def upload_rom(self, data: bytes): - if (len(data) > self.__SDRAM_LENGTH): - raise ValueError - self.__write_memory(self.__SDRAM_ADDRESS, data) - - def upload_ddipl(self, data: bytes) -> None: - if (len(data) > self.__DDIPL_LENGTH): - raise ValueError - self.__write_memory(self.__DDIPL_ADDRESS, data) - - def upload_save(self, data: bytes) -> None: - save_type = self.__get_config(self.__CFG_ID_SAVE_TYPE) - if (save_type < 0 or save_type >= len(self.__SAVE_TYPE)): - raise ValueError - if (len(data) != self.__SAVE_LENGTH[save_type]): - raise ValueError - self.__write_memory(self.__SAVE_ADDRESS[save_type], data) - - def upload_bootloader(self, data: bytes) -> None: - if (len(data) > self.__BOOTLOADER_LENGTH): - raise ValueError - self.__erase_flash(self.__BOOTLOADER_ADDRESS, self.__BOOTLOADER_LENGTH) - self.__write_memory(self.__BOOTLOADER_ADDRESS, data) - - def __handle_dd_packet(self, dd: SixtyFourDiskDrive, data: bytes) -> None: - CMD_READ_BLOCK = 1 - CMD_WRITE_BLOCK = 2 - cmd = self.__get_int(data[0:]) - address = self.__get_int(data[4:]) - track_head_block = self.__get_int(data[8:]) - track = (track_head_block >> 2) & 0xFFF - head = (track_head_block >> 1) & 0x1 - block = track_head_block & 0x1 - try: - if (cmd == CMD_READ_BLOCK): - block_data = dd.read_block(track, head, block) - self.__transport.execute_cmd(cmd=b'D', args=[address, len(block_data)], data=block_data) - elif (cmd == CMD_WRITE_BLOCK): - dd.write_block(track, head, block, data[12:]) - self.__transport.execute_cmd(cmd=b'd', args=[0, 0]) - else: - self.__transport.execute_cmd(cmd=b'd', args=[-1, 0]) - except BadBlockError: - self.__transport.execute_cmd(cmd=b'd', args=[1, 0]) - - def __handle_isv_packet(self, data: bytes) -> None: - print(data.decode("EUC-JP", errors="backslashreplace"), end="") - - def debug_server(self, disk_path: str) -> None: - dd = SixtyFourDiskDrive() - dd.load(disk_path) - self.set_dd_drive_type(dd.get_drive_type()) - self.set_dd_disk_state('inserted') - - self.__set_config(self.__CFG_ID_ROM_WRITE_ENABLE, 1) - self.__set_config(self.__CFG_ID_ISV_ENABLE, 1) - self.__set_config(self.__CFG_ID_TV_TYPE, 1) - - while (True): - packet = self.__transport.get_packet() - if (packet != None): - (cmd, data) = packet - if (cmd == b'D'): - self.__handle_dd_packet(dd, data) - if (cmd == b'I'): - self.__handle_isv_packet(data) - - -if __name__ == "__main__": - sc64 = SC64() - - sc64.set_boot_mode('rom') - print('set boot mode') - - sc64.set_dd_mode('none') - print('set dd mode') - - with open('S:/n64/roms/ZELOOTD.z64', 'rb') as f: - sc64.upload_rom(f.read()) - print('uploaded rom') - - # with open('S:/n64/64dd/ipl/NDDJ2.n64', 'rb') as f: - # sc64.upload_ddipl(f.read()) - # print('uploaded ddipl') - - sc64.set_save_type('sram') - print('set save type') - - # with open('S:/n64/saves/SM64.eep', 'rb') as f: - # sc64.upload_save(f.read()) - # print('uploaded save') - - try: - print('starting debug server') - sc64.debug_server(disk_path='S:/n64/64dd/rtl/Mario Artist Polygon Studio.ndd') - except KeyboardInterrupt: - pass diff --git a/sw/pc/lib/sc64_transport.py b/sw/pc/lib/sc64_transport.py deleted file mode 100644 index d1e2246..0000000 --- a/sw/pc/lib/sc64_transport.py +++ /dev/null @@ -1,171 +0,0 @@ -from serial.tools import list_ports -from threading import Thread -from typing import Optional -import queue -import serial -import time - - -class ConnectionException(Exception): - pass - - -class SC64Transport: - __disconnect = False - __serial = None - __thread_read = None - __thread_write = None - __queue_output = queue.Queue() - __queue_input = queue.Queue() - __queue_packet = queue.Queue() - - def __del__(self) -> None: - self.__disconnect = True - if (self.__thread_read.is_alive()): - self.__thread_read.join(5) - if (self.__thread_write.is_alive()): - self.__thread_write.join(5) - if (self.__serial != None and self.__serial.is_open): - self.__serial.close() - - def connect(self) -> None: - ports = list_ports.comports() - device_found = False - - if (self.__serial != None and self.__serial.is_open): - raise ConnectionException("Serial port is already open") - - for p in ports: - if (p.vid == 0x0403 and p.pid == 0x6014 and p.serial_number.startswith("SC64")): - try: - self.__serial = serial.Serial( - p.device, timeout=1.0, write_timeout=5.0) - self.__reset_link() - except (serial.SerialException, ConnectionException): - if (self.__serial): - self.__serial.close() - continue - device_found = True - break - - if (not device_found): - raise ConnectionException("No SummerCart64 device was found") - - self.__thread_read = Thread( - target=self.__serial_process_input, daemon=True) - self.__thread_write = Thread( - target=self.__serial_process_output, daemon=True) - - self.__thread_read.start() - self.__thread_write.start() - - def __reset_link(self) -> None: - self.__serial.reset_output_buffer() - - retry_counter = 0 - self.__serial.dtr = 1 - while (self.__serial.dsr == 0): - time.sleep(0.1) - retry_counter += 1 - if (retry_counter >= 10): - raise ConnectionException - - self.__serial.reset_input_buffer() - - retry_counter = 0 - self.__serial.dtr = 0 - while (self.__serial.dsr == 1): - time.sleep(0.1) - retry_counter += 1 - if (retry_counter >= 10): - raise ConnectionException - - def __write(self, data: bytes) -> None: - try: - if (self.__disconnect): - raise ConnectionException - self.__serial.write(data) - self.__serial.flush() - except serial.SerialTimeoutException: - raise ConnectionException - except serial.SerialException: - raise ConnectionException - - def __read(self, length: int) -> bytes: - try: - data = b'' - while (len(data) < length and not self.__disconnect): - data += self.__serial.read(length - len(data)) - if (self.__disconnect): - raise ConnectionException - return data - except serial.SerialException: - raise ConnectionException - - def __read_int(self) -> int: - return int.from_bytes(self.__read(4), byteorder="big") - - def __serial_process_output(self) -> None: - while (not self.__disconnect and self.__serial != None and self.__serial.is_open): - try: - packet: bytes = self.__queue_output.get(timeout=0.1) - self.__write(packet) - self.__queue_output.task_done() - except queue.Empty: - continue - except ConnectionException: - break - - def __serial_process_input(self) -> None: - while (not self.__disconnect and self.__serial != None and self.__serial.is_open): - try: - token = self.__read(4) - if (len(token) == 4): - identifier = token[0:3] - command = token[3:4] - if (identifier == b'PKT'): - lenss = self.__read_int() - data = self.__read(lenss) - self.__queue_packet.put((command, data)) - elif (identifier == b'CMP' or identifier == b'ERR'): - data = self.__read(self.__read_int()) - success = identifier == b'CMP' - self.__queue_input.put((command, data, success)) - else: - raise Exception - except ConnectionException: - break - - def __queue_cmd(self, command: bytes, args: list[int] = [], data: bytes = b''): - packet: bytes = b'CMD' - packet += command - for arg in args: - packet += arg.to_bytes(4, byteorder="big") - packet += data - self.__queue_output.put(packet) - - def __pop_response(self, command: bytes) -> bytes: - try: - (response_command, data, success) = self.__queue_input.get(timeout=5) - if (command != response_command or success == False): - raise ConnectionException - return data - except queue.Empty: - raise ConnectionException - - def execute_cmd(self, cmd: bytes, args: list[int] = [], data: bytes = b'', response: bool = True) -> Optional[bytes]: - if (len(cmd) != 1): - raise ValueError("Length of command is different than 1 byte") - command = cmd[0:1] - if (len(args) != 2): - raise ValueError("Number of arguments is different than 2") - self.__queue_cmd(command, args, data) - if (response): - return self.__pop_response(command) - return None - - def get_packet(self) -> Optional[tuple[bytes, bytes]]: - try: - return self.__queue_packet.get(timeout=1) - except queue.Empty: - return None diff --git a/sw/pc/requirements.txt b/sw/pc/requirements.txt deleted file mode 100644 index a9dad9b..0000000 --- a/sw/pc/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -progressbar2==3.55.0 -pyft232==0.12 -pywin32==303; sys_platform == 'win32' diff --git a/sw/pc/sc64.py b/sw/pc/sc64.py index 224e595..5624371 100644 --- a/sw/pc/sc64.py +++ b/sw/pc/sc64.py @@ -1,870 +1,532 @@ -#!/usr/bin/env python3 - -from datetime import datetime -from serial import Serial, SerialException -from serial.tools import list_ports -from io import TextIOWrapper -from typing import Union -import argparse -import filecmp -import helpers -import os -import progressbar -import struct -import sys -import time - - - -class SC64Exception(Exception): - pass - - - -class SC64: - __CFG_ID_BOOTLOADER_SWITCH = 0 - __CFG_ID_ROM_WRITE_ENABLE = 1 - __CFG_ID_ROM_SHADOW_ENABLE = 2 - __CFG_ID_DD_ENABLE = 3 - __CFG_ID_ISV_ENABLE = 4 - __CFG_ID_BOOT_MODE = 5 - __CFG_ID_SAVE_TYPE = 6 - __CFG_ID_CIC_SEED = 7 - __CFG_ID_TV_TYPE = 8 - __CFG_ID_FLASH_ERASE_BLOCK = 9 - - __FLASH_ERASE_BLOCK_SIZE = 64 * 1024 - - __SC64_VERSION_V2 = 0x53437632 - - __BOOTLOADER_OFFSET = (64 + 14) * 1024 * 1024 - __SAVE_OFFSET = (64 * 1024 * 1024) - (128 * 1024) - __EEPROM_OFFSET = (96 * 1024 * 1024) + 8192 - __DDIPL_OFFSET = 0x03BC0000 - - __CHUNK_SIZE = 256 * 1024 - - __MIN_ROM_LENGTH = 0x101000 - __DDIPL_ROM_LENGTH = 0x400000 - - __DEBUG_ID_TEXT = 0x01 - __EVENT_ID_FSD_READ = 0xF0 - __EVENT_ID_FSD_WRITE = 0xF1 - __EVENT_ID_DD_BLOCK = 0xF4 - __EVENT_ID_IS_VIEWER = 0xF5 - - __DD_DRIVE_ID_RETAIL = 3 - __DD_DRIVE_ID_DEVELOPMENT = 4 - __DD_DISK_STATE_EJECTED = 0 - __DD_DISK_STATE_INSERTED = 1 - __DD_DISK_STATE_CHANGED = 2 - - - def __init__(self) -> None: - self.__usb = None - self.__progress_init = None - self.__progress_value = None - self.__progress_finish = None - self.__fsd_file = None - self.__disk_file = None - self.__disk_lba_table = [] - self.__dd_turbo = False - self.__find_sc64() - - - def __del__(self) -> None: - if (self.__usb): - self.__usb.close() - if (self.__fsd_file): - self.__fsd_file.close() - - - def __set_progress_init(self, value: int, label: str) -> None: - if (self.__progress_init != None): - self.__progress_init(value, label) - - - def __set_progress_value(self, value: int) -> None: - if (self.__progress_value != None): - self.__progress_value(value) - - - def __set_progress_finish(self) -> None: - if (self.__progress_finish != None): - self.__progress_finish() - - - def __align(self, value: int, align: int) -> int: - return (value + (align - 1)) & ~(align - 1) - - - def reset_link(self) -> None: - if (self.__usb): - self.__usb.flush() - self.__usb.setDTR(1) - while (self.__usb.getDSR() == 0): - pass - self.__usb.setDTR(0) - while (self.__usb.getDSR() == 1): - pass - - - def __read(self, bytes: int) -> bytes: - return self.__usb.read(bytes) - - - def __write(self, data: bytes) -> None: - self.__usb.write(data) - - - def __read_long(self, length: int) -> bytes: - data = bytearray() - while (len(data) < length): - data += self.__read(length - len(data)) - return bytes(data) - - - def __write_dummy(self, length: int) -> None: - self.__write(bytes(length)) - - - def __read_int(self) -> int: - return int.from_bytes(self.__read(4), byteorder="big") - - - def __write_int(self, value: int) -> None: - self.__write(value.to_bytes(4, byteorder="big")) - - - def __read_cmd_status(self, cmd: Union[str, int]) -> None: - token = b"CMP" + (cmd.encode() if isinstance(cmd, str) else int.to_bytes(cmd, 1, byteorder="big")) - if (self.__read(4) != token): - raise SC64Exception("Wrong command response") - - - def __write_cmd(self, cmd: Union[str, int], arg1: int, arg2: int) -> None: - token = b"CMD" + (cmd.encode() if isinstance(cmd, str) else int.to_bytes(cmd, 1, byteorder="big")) - self.__write(token) - self.__write_int(arg1) - self.__write_int(arg2) - - - def __find_sc64(self) -> None: - ports = list_ports.comports() - device_found = False - - if (self.__usb != None and self.__usb.isOpen()): - self.__usb.close() - - for p in ports: - if (p.vid == 0x0403 and p.pid == 0x6014 and p.serial_number.startswith("SC64")): - try: - self.__usb = Serial(p.device, timeout=10.0, write_timeout=10.0) - self.reset_link() - self.__probe_device() - except (SerialException, SC64Exception): - if (self.__usb): - self.__usb.close() - continue - device_found = True - break - - if (not device_found): - raise SC64Exception("No SummerCart64 device was found") - - - def __probe_device(self) -> None: - self.__write_cmd("v", 0, 0) - version = self.__read_int() - self.__read_cmd_status("v") - if (version != self.__SC64_VERSION_V2): - raise SC64Exception(f"Unknown hardware version: {hex(version)}") - - - def __query_config(self, id: int) -> int: - self.__write_cmd("c", id, 0) - value = self.__read_int() - self.__read_cmd_status("c") - return value - - - def __change_config(self, id: int, arg: int = 0, ignore_response: bool = False) -> None: - self.__write_cmd("C", id, arg) - if (not ignore_response): - self.__read_cmd_status("C") - - - def __read_file_from_sdram(self, file: str, offset: int, length: int) -> None: - with open(file, "wb") as f: - self.__set_progress_init(length, os.path.basename(f.name)) - self.__write_cmd("m", offset, length) - while (f.tell() < length): - chunk_size = min(self.__CHUNK_SIZE, length - f.tell()) - f.write(self.__read(chunk_size)) - self.__set_progress_value(f.tell()) - self.__read_cmd_status("m") - self.__set_progress_finish() - - - def __write_file_to_sdram(self, file: str, offset: int, min_length: int = 0) -> None: - with open(file, "rb") as f: - length = os.fstat(f.fileno()).st_size - transfer_size = max(length, min_length) - self.__set_progress_init(transfer_size, os.path.basename(f.name)) - self.__write_cmd("M", offset, transfer_size) - while (f.tell() < length): - self.__write(f.read(min(self.__CHUNK_SIZE, length - f.tell()))) - self.__set_progress_value(f.tell()) - if (transfer_size != length): - self.__write_dummy(transfer_size - length) - self.__read_cmd_status("M") - self.__set_progress_finish() - - - def upload_bootloader(self, file: str, min_length: int = 1028 * 1024) -> None: - offset = self.__BOOTLOADER_OFFSET - with open(file, "rb") as f: - length = os.fstat(f.fileno()).st_size - transfer_size = max(length, min_length) - self.__set_progress_init(transfer_size, "Erase...") - for block in range(offset, offset + transfer_size, self.__FLASH_ERASE_BLOCK_SIZE): - self.__change_config(self.__CFG_ID_FLASH_ERASE_BLOCK, block) - self.__set_progress_value(block - offset) - self.__set_progress_finish() - - self.__set_progress_init(transfer_size, os.path.basename(f.name)) - self.__write_cmd("M", offset, transfer_size) - while (f.tell() < length): - self.__write(f.read(min(self.__CHUNK_SIZE, length - f.tell()))) - self.__set_progress_value(f.tell()) - if (transfer_size != length): - self.__write_dummy(transfer_size - length) - self.__read_cmd_status("M") - self.__set_progress_finish() - - - def __get_save_length(self) -> None: - save_type = self.__query_config(self.__CFG_ID_SAVE_TYPE) - return { - 0: 0, - 1: 512, - 2: 2048, - 3: (32 * 1024), - 4: (128 * 1024), - 5: (3 * 32 * 1024) - }[save_type] - - - def __reconfigure(self) -> None: - pass - # magic = self.__query_config(self.__CFG_ID_RECONFIGURE) - # self.__change_config(self.__CFG_ID_RECONFIGURE, magic, ignore_response=True) - # time.sleep(0.5) - - - def backup_firmware(self, file: str) -> None: - pass - # length = self.__query_config(self.__CFG_ID_FLASH_SIZE) - # self.__change_config(self.__CFG_ID_FLASH_READ, self.__UPDATE_OFFSET) - # self.__read_file_from_sdram(file, self.__UPDATE_OFFSET, length) - - - def update_firmware(self, file: str) -> None: - pass - # self.__write_file_to_sdram(file, self.__UPDATE_OFFSET) - # saved_timeout = self.__usb.timeout - # self.__usb.timeout = 20.0 - # self.__change_config(self.__CFG_ID_FLASH_PROGRAM, self.__UPDATE_OFFSET) - # self.__usb.timeout = saved_timeout - # self.__reconfigure() - - - def set_rtc(self, t: datetime) -> None: - to_bcd = lambda v : ((int((v / 10) % 10) << 4) | int(int(v) % 10)) - args = [ - (to_bcd(t.weekday() + 1) << 24) | (to_bcd(t.hour) << 16) | (to_bcd(t.minute) << 8) | to_bcd(t.second), - (to_bcd(t.year) << 16) | (to_bcd(t.month) << 8) | to_bcd(t.day), - ] - self.__write_cmd("T", args[0], args[1]) - self.__read_cmd_status("T") - - - def set_boot_mode(self, mode: int) -> None: - if (mode >= 0 and mode <= 4): - self.__change_config(self.__CFG_ID_BOOT_MODE, mode) - else: - raise SC64Exception("Boot mode outside of supported values") - - - def get_boot_mode_label(self, mode: int) -> None: - if (mode < 0 or mode > 4): - return "Unknown" - return { - 0: "Load menu from SD card", - 1: "Load menu from USB", - 2: "Load ROM from SDRAM", - 3: "Load DDIPL from SDRAM", - 4: "Load ROM from SDRAM directly without bootloader", - }[mode] - - - def set_tv_type(self, type: int = None) -> None: - if (type == None or type == 3): - self.__change_config(self.__CFG_ID_TV_TYPE, 3) - elif (type >= 0 and type <= 2): - self.__change_config(self.__CFG_ID_TV_TYPE, type) - else: - raise SC64Exception("TV type outside of supported values") - - - def get_tv_type_label(self, type: int) -> None: - if (type < 0 or type > 2): - return "Unknown" - return { - 0: "PAL", - 1: "NTSC", - 2: "MPAL" - }[type] - - - def set_cic_seed(self, seed: int = None) -> None: - if (seed == None or seed == 0xFFFF): - self.__change_config(self.__CFG_ID_CIC_SEED, 0xFFFF) - elif (seed >= 0 and seed <= 0x1FF): - self.__change_config(self.__CFG_ID_CIC_SEED, seed) - else: - raise SC64Exception("CIC seed outside of supported values") - - - def download_rom(self, file: str, length: int, offset: int = 0) -> None: - self.__read_file_from_sdram(file, offset, length) - - - def upload_rom(self, file: str, offset: int = 0) -> None: - self.__write_file_to_sdram(file, offset, min_length=self.__MIN_ROM_LENGTH) - - - def set_save_type(self, type: int) -> None: - if (type >= 0 and type <= 5): - self.__change_config(self.__CFG_ID_SAVE_TYPE, type) - else: - raise SC64Exception("Save type outside of supported values") - - - def get_save_type_label(self, type: int) -> None: - if (type < 0 or type > 5): - return "Unknown" - return { - 0: "No save", - 1: "EEPROM 4Kb", - 2: "EEPROM 16Kb", - 3: "SRAM 256Kb", - 4: "FlashRAM 1Mb", - 5: "SRAM 768Kb" - }[type] - - - def download_save(self, file: str) -> None: - length = self.__get_save_length() - if (length > 0): - self.__read_file_from_sdram(file, self.__SAVE_OFFSET if length > 2048 else self.__EEPROM_OFFSET, length) - else: - raise SC64Exception("Can't read save data - no save type is set") - - - def upload_save(self, file: str) -> None: - length = self.__get_save_length() - save_length = os.path.getsize(file) - if (length <= 0): - raise SC64Exception("Can't write save data - no save type is set") - elif (length != save_length): - raise SC64Exception("Can't write save data - save file size is different than expected") - else: - self.__write_file_to_sdram(file, self.__SAVE_OFFSET if length > 2048 else self.__EEPROM_OFFSET) - - - def set_dd_enable(self, enable: bool) -> None: - self.__change_config(self.__CFG_ID_DD_ENABLE, 1 if enable else 0) - - - def download_dd_ipl(self, file: str) -> None: - self.__read_file_from_sdram(file, self.__DDIPL_OFFSET, length=self.__DDIPL_ROM_LENGTH) - - - def upload_dd_ipl(self, file: str) -> None: - self.__write_file_to_sdram(file, self.__DDIPL_OFFSET, min_length=self.__DDIPL_ROM_LENGTH) - - - # def set_dd_disk_state(self, state: str) -> None: - # state_mapping = { - # "ejected": self.__DD_DISK_STATE_EJECTED, - # "inserted": self.__DD_DISK_STATE_INSERTED, - # "changed": self.__DD_DISK_STATE_CHANGED, - # } - # if (state in state_mapping): - # self.__change_config(self.__CFG_ID_DD_DISK_STATE, state_mapping[state]) - # else: - # raise SC64Exception("DD disk state outside of supported values") - - - # def __dd_create_configuration(self, handle: TextIOWrapper) -> tuple[str, int, list[tuple[int, int]]]: - # DISK_HEADS = 2 - # DISK_TRACKS = 1175 - # DISK_BLOCKS_PER_TRACK = 2 - # DISK_SECTORS_PER_BLOCK = 85 - # DISK_BAD_TRACKS_PER_ZONE = 12 - # DISK_SYSTEM_SECTOR_SIZE = 232 - - # DISK_ZONES = [ - # (0, 232, 158, 0), - # (0, 216, 158, 158), - # (0, 208, 149, 316), - # (0, 192, 149, 465), - # (0, 176, 149, 614), - # (0, 160, 149, 763), - # (0, 144, 149, 912), - # (0, 128, 114, 1061), - # (1, 216, 158, 157), - # (1, 208, 158, 315), - # (1, 192, 149, 464), - # (1, 176, 149, 613), - # (1, 160, 149, 762), - # (1, 144, 149, 911), - # (1, 128, 149, 1060), - # (1, 112, 114, 1174), - # ] - - # DISK_VZONE_TO_PZONE = [ - # [0, 1, 2, 9, 8, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10], - # [0, 1, 2, 3, 10, 9, 8, 4, 5, 6, 7, 15, 14, 13, 12, 11], - # [0, 1, 2, 3, 4, 11, 10, 9, 8, 5, 6, 7, 15, 14, 13, 12], - # [0, 1, 2, 3, 4, 5, 12, 11, 10, 9, 8, 6, 7, 15, 14, 13], - # [0, 1, 2, 3, 4, 5, 6, 13, 12, 11, 10, 9, 8, 7, 15, 14], - # [0, 1, 2, 3, 4, 5, 6, 7, 14, 13, 12, 11, 10, 9, 8, 15], - # [0, 1, 2, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10, 9, 8], - # ] - - # DISK_DRIVE_TYPES = [( - # "development", - # 192, - # [11, 10, 3, 2], - # [0, 1, 8, 9, 16, 17, 18, 19, 20, 21, 22, 23], - # ), ( - # "retail", - # 232, - # [9, 8, 1, 0], - # [2, 3, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23], - # )] - - # def __check_system_block(lba: int, sector_size: int, check_disk_type: bool) -> tuple[bool, bytes]: - # handle.seek(lba * DISK_SYSTEM_SECTOR_SIZE * DISK_SECTORS_PER_BLOCK) - # system_block_data = handle.read(sector_size * DISK_SECTORS_PER_BLOCK) - # system_data = system_block_data[:sector_size] - # for sector in range(1, DISK_SECTORS_PER_BLOCK): - # sector_data = system_block_data[(sector * sector_size):][:sector_size] - # if (system_data != sector_data): - # return (False, None) - # if (check_disk_type): - # if (system_data[4] != 0x10): - # return (False, None) - # if ((system_data[5] & 0xF0) != 0x10): - # return (False, None) - # return (True, system_data) - - # disk_drive_type = None - # disk_system_data = None - # disk_id_data = None - # disk_bad_lbas = [] - - # drive_index = 0 - # while (disk_system_data == None) and (drive_index < len(DISK_DRIVE_TYPES)): - # (drive_type, system_sector_size, system_data_lbas, bad_lbas) = DISK_DRIVE_TYPES[drive_index] - # disk_bad_lbas.clear() - # disk_bad_lbas.extend(bad_lbas) - # for system_lba in system_data_lbas: - # (valid, system_data) = __check_system_block(system_lba, system_sector_size, check_disk_type=True) - # if (valid): - # disk_drive_type = drive_type - # disk_system_data = system_data - # else: - # disk_bad_lbas.append(system_lba) - # drive_index += 1 - - # for id_lba in [15, 14]: - # (valid, id_data) = __check_system_block(id_lba, DISK_SYSTEM_SECTOR_SIZE, check_disk_type=False) - # if (valid): - # disk_id_data = id_data - # else: - # disk_bad_lbas.append(id_lba) - - # if not (disk_system_data and disk_id_data): - # raise SC64Exception("Provided 64DD disk file is not valid") - - # disk_zone_bad_tracks = [] - - # for zone in range(len(DISK_ZONES)): - # zone_bad_tracks = [] - # start = 0 if zone == 0 else system_data[0x07 + zone] - # stop = system_data[0x07 + zone + 1] - # for offset in range(start, stop): - # zone_bad_tracks.append(system_data[0x20 + offset]) - # for ignored_track in range(DISK_BAD_TRACKS_PER_ZONE - len(zone_bad_tracks)): - # zone_bad_tracks.append(DISK_ZONES[zone][2] - ignored_track - 1) - # disk_zone_bad_tracks.append(zone_bad_tracks) - - # thb_lba_table = [(0xFFFFFFFF, -1)] * (DISK_HEADS * DISK_TRACKS * DISK_BLOCKS_PER_TRACK) - - # disk_type = disk_system_data[5] & 0x0F - - # current_lba = 0 - # starting_block = 0 - # disk_file_offset = 0 - - # for zone in DISK_VZONE_TO_PZONE[disk_type]: - # (head, sector_size, tracks, track) = DISK_ZONES[zone] - - # for zone_track in range(tracks): - # current_zone_track = ((tracks - 1) - zone_track) if head else zone_track - - # if (current_zone_track in disk_zone_bad_tracks[zone]): - # track += (-1) if head else 1 - # continue - - # for block in range(DISK_BLOCKS_PER_TRACK): - # if (current_lba not in disk_bad_lbas): - # index = (track << 2) | (head << 1) | (starting_block ^ block) - # thb_lba_table[index] = (disk_file_offset, current_lba) - # disk_file_offset += sector_size * DISK_SECTORS_PER_BLOCK - # current_lba += 1 - - # track += (-1) if head else 1 - # starting_block ^= 1 - - # return (disk_drive_type, thb_lba_table) - - - # def set_dd_configuration_for_disk(self, file: str = None) -> None: - # if (file): - # with open(file, "rb+") as handle: - # (disk_drive_type, thb_lba_table) = self.__dd_create_configuration(handle) - # thb_table_offset = self.__query_config(self.__CFG_ID_DD_THB_TABLE_OFFSET) - # data = bytearray() - # self.__disk_lba_table = [0xFFFFFFFF] * len(thb_lba_table) - # for (offset, lba) in thb_lba_table: - # data += struct.pack(">I", offset) - # self.__disk_lba_table[lba] = offset - # self.__write_cmd("W", thb_table_offset, len(data)) - # self.__write(bytes(data)) - # self.__read_cmd_status("W") - # id_mapping = { - # "retail": self.__DD_DRIVE_ID_RETAIL, - # "development": self.__DD_DRIVE_ID_DEVELOPMENT, - # } - # if (disk_drive_type in id_mapping): - # self.__change_config(self.__CFG_ID_DD_DRIVE_ID, id_mapping[disk_drive_type]) - # else: - # raise SC64Exception("DD drive type outside of supported values") - # else: - # raise SC64Exception("No DD disk file provided for disk info creation") - - - # def __debug_process_fsd_read(self, data: bytes) -> None: - # sector = int.from_bytes(data[0:4], byteorder="little") - # offset = int.from_bytes(data[4:8], byteorder="little") - # count = int.from_bytes(data[8:12], byteorder="little") - - # if (self.__fsd_file): - # self.__fsd_file.seek(sector * 512) - # self.__write_cmd("S", offset, count * 512) - # self.__write(self.__fsd_file.read(count * 512)) - # else: - # self.__write_cmd("S", offset, 0) - - - # def __debug_process_fsd_write(self, data: bytes) -> None: - # sector = int.from_bytes(data[0:4], byteorder="little") - # offset = int.from_bytes(data[4:8], byteorder="little") - # count = int.from_bytes(data[8:12], byteorder="little") - - # if (self.__fsd_file): - # with helpers.lock_volume(self.__fsd_file): - # self.__fsd_file.seek(sector * 512) - # self.__write_cmd("L", offset, count * 512) - # self.__fsd_file.write(self.__read(count * 512)) - # else: - # self.__write_cmd("L", offset, 0) - - - # def __debug_process_dd_block(self, data: bytes) -> None: - # transfer_mode = int.from_bytes(data[0:4], byteorder="little") - # sdram_offset = int.from_bytes(data[4:8], byteorder="little") - # disk_file_offset = int.from_bytes(data[8:12], byteorder="big") - # block_length = int.from_bytes(data[12:16], byteorder="little") - # print(f"DD BLOCK {disk_file_offset:08X} {block_length}") - # if (self.__disk_file): - # self.__disk_file.seek(disk_file_offset) - # if (transfer_mode): - # if (not self.__dd_turbo): - # # Fixes weird bug in Mario Artist Paint Studio prototype minigame - # time.sleep(0.016) - # self.__write_cmd("S", sdram_offset, block_length) - # self.__write(self.__disk_file.read(block_length)) - # else: - # self.__write_cmd("L", sdram_offset, block_length) - # self.__disk_file.write(self.__read(block_length)) - - - # def __debug_process_is_viewer(self, data: bytes) -> None: - # length = int.from_bytes(data[0:4], byteorder="little") - # address = int.from_bytes(data[4:8], byteorder="little") - # self.__write_cmd("L", address, length) - # text = self.__read(length) - # print(text.decode("EUC-JP", errors="backslashreplace"), end="") - - - # def debug_init(self, fsd_file: str = None, disk_file: str = None, dd_turbo: bool = False) -> None: - # if (fsd_file): - # self.__fsd_file = open(fsd_file, "rb+") - # if (disk_file): - # self.__disk_file = open(disk_file, "rb+") - # self.__dd_turbo = dd_turbo - - - # def debug_loop(self, is_viewer_enabled: bool = False) -> None: - # self.__change_config(self.__CFG_ID_IS_VIEWER_ENABLE, is_viewer_enabled) - - # print("\r\n\033[34m --- Debug server started --- \033[0m\r\n") - - # start_indicator = bytearray() - # dropped_bytes = 0 - - # while (True): - # while (start_indicator != b"DMA@"): - # start_indicator.append(self.__read_long(1)[0]) - # if (len(start_indicator) > 4): - # dropped_bytes += 1 - # start_indicator.pop(0) - # start_indicator.clear() - - # if (dropped_bytes): - # print(f"\033[35mWarning - dropped {dropped_bytes} bytes from stream\033[0m", file=sys.stderr) - # dropped_bytes = 0 - - # header = self.__read_long(4) - # id = int(header[0]) - # length = int.from_bytes(header[1:4], byteorder="big") - # data = self.__read_long(length) - # self.__read_long(self.__align(length, 4) - length) - # end_indicator = self.__read_long(4) - - # if (end_indicator != b"CMPH"): - # print(f"\033[35mGot unknown end indicator: {end_indicator.decode(encoding='ascii', errors='backslashreplace')}\033[0m", file=sys.stderr) - # else: - # if (id == self.__DEBUG_ID_TEXT): - # print(data.decode(encoding="ascii", errors="backslashreplace"), end="") - # elif (id == self.__EVENT_ID_FSD_READ): - # self.__debug_process_fsd_read(data) - # elif (id == self.__EVENT_ID_FSD_WRITE): - # self.__debug_process_fsd_write(data) - # elif (id == self.__EVENT_ID_DD_BLOCK): - # self.__debug_process_dd_block(data) - # elif (id == self.__EVENT_ID_IS_VIEWER): - # self.__debug_process_is_viewer(data) - # else: - # print(f"\033[35mGot unknown id: {id}, length: {length}\033[0m", file=sys.stderr) - - - -class SC64ProgressBar: - __LABEL_LENGTH = 30 - - - def __init__(self, sc64: SC64) -> None: - self.__sc64 = sc64 - self.__widgets = [ - "[ ", - progressbar.FormatLabel("{variables.label}", new_style=True), - " | ", - progressbar.Bar(left="", right=""), - " ", - progressbar.Percentage(), - " | ", - progressbar.DataSize(prefixes=(" ", "Ki", "Mi")), - " | ", - progressbar.AdaptiveTransferSpeed(), - " ]" - ] - self.__variables = {"label": ""} - self.__bar = None - - - def __enter__(self) -> None: - if (self.__sc64): - self.__sc64._SC64__progress_init = self.__progress_init - self.__sc64._SC64__progress_value = self.__progress_value - self.__sc64._SC64__progress_finish = self.__progress_finish - - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - if (self.__sc64): - self.__sc64._SC64__progress_init = None - self.__sc64._SC64__progress_value = None - self.__sc64._SC64__progress_finish = None - - - def __progress_init(self, value: int, label: str) -> None: - if (self.__bar == None): - self.__bar = progressbar.ProgressBar(widgets=self.__widgets, variables=self.__variables) - justified_label = label.ljust(self.__LABEL_LENGTH) - if (len(justified_label) > self.__LABEL_LENGTH): - justified_label = f"{justified_label[:self.__LABEL_LENGTH - 3]}..." - self.__bar.variables.label = justified_label - self.__bar.start(max_value=value) - - - def __progress_value(self, value: int) -> None: - self.__bar.update(value) - - - def __progress_finish(self) -> None: - self.__bar.finish(end="") - print() - - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="SummerCart64 one stop control center") - parser.add_argument("-rtc", default=False, action="store_true", required=False, help="update RTC to system time") - parser.add_argument("-b", metavar="boot_mode", default="2", required=False, help="set boot mode (0 - 4)") - parser.add_argument("-t", metavar="tv_type", default="3", required=False, help="set TV type (0 - 2)") - parser.add_argument("-c", metavar="cic_seed", default="0xFFFF", required=False, help="set CIC seed") - parser.add_argument("-s", metavar="save_type", default="0", required=False, help="set save type (0 - 6)") - parser.add_argument("-d", default=False, action="store_true", required=False, help="enable 64DD emulation") - parser.add_argument("-df", default=False, action="store_true", required=False, help="set 64DD emulation to turbo mode") - parser.add_argument("-r", default=False, action="store_true", required=False, help="perform reading operation instead of writing") - parser.add_argument("-l", metavar="length", default="0x101000", required=False, help="specify ROM length to read") - parser.add_argument("-u", metavar="update_path", default=None, required=False, help="path to update file") - parser.add_argument("-e", metavar="save_path", default=None, required=False, help="path to save file") - parser.add_argument("-i", metavar="ddipl_path", default=None, required=False, help="path to DDIPL file") - parser.add_argument("-q", default=None, action="store_true", required=False, help="start debug server") - parser.add_argument("-v", default=False, action="store_true", required=False, help="enable IS-Viewer64 support") - parser.add_argument("-f", metavar="sd_path", default=None, required=False, help="path to disk or file for fake SD card emulation") - parser.add_argument("-k", metavar="disk_path", default=None, required=False, help="path to 64DD disk file") - parser.add_argument("rom", metavar="rom_path", default=None, help="path to ROM file", nargs="?") - - if (len(sys.argv) <= 1): - parser.print_help() - parser.exit() - - args = parser.parse_args() - - sc64 = None - - try: - sc64 = SC64() - - rtc = args.rtc - boot_mode = int(args.b) - save_type = int(args.s) - tv_type = int(args.t) - cic_seed = int(args.c, 0) - dd_enable = args.d - dd_turbo = args.df - is_read = args.r - rom_length = int(args.l, 0) - update_file = args.u - save_file = args.e - dd_ipl_file = args.i - rom_file = args.rom - debug_server = args.q - is_viewer_enabled = args.v - sd_file = args.f - disk_file = args.k - - firmware_backup_file = "sc64firmware.bin.bak" - - with SC64ProgressBar(sc64): - if (update_file): - sc64.upload_bootloader(update_file) - # if (is_read): - # sc64.backup_firmware(update_file) - # else: - # sc64.backup_firmware(firmware_backup_file) - # if (not filecmp.cmp(update_file, firmware_backup_file)): - # print("Update file different than contents of flash - updating SummerCart64") - # sc64.update_firmware(update_file) - # else: - # print("SummerCart64 is already updated to latest version") - # os.remove(firmware_backup_file) - - if (rtc): - now = datetime.now() - print(f"Setting RTC to [{now}]") - sc64.set_rtc(now) - - if (not is_read): - if (boot_mode != 2): - print(f"Setting boot mode to [{sc64.get_boot_mode_label(boot_mode)}]") - sc64.set_boot_mode(boot_mode) - - if (save_type): - print(f"Setting save type to [{sc64.get_save_type_label(save_type)}]") - sc64.set_save_type(save_type) - - if (tv_type != 3): - print(f"Setting TV type to [{sc64.get_tv_type_label(tv_type)}]") - sc64.set_tv_type(tv_type) - - if (cic_seed != 0xFFFF): - print(f"Setting CIC seed to [{hex(cic_seed) if cic_seed != 0xFFFF else 'Unknown'}]") - sc64.set_cic_seed(cic_seed) - - if (dd_enable): - print(f"Setting 64DD emulation to [{'Enabled' if dd_enable else 'Disabled'}]") - sc64.set_dd_enable(dd_enable) - - if (rom_file): - if (is_read): - if (rom_length > 0): - sc64.download_rom(rom_file, rom_length) - else: - raise SC64Exception("ROM length should be larger than 0") - else: - sc64.upload_rom(rom_file) - - if (dd_ipl_file): - if (is_read): - sc64.download_dd_ipl(dd_ipl_file) - else: - sc64.upload_dd_ipl(dd_ipl_file) - - if (save_file): - if (is_read): - sc64.download_save(save_file) - else: - sc64.upload_save(save_file) - - # if (debug_server): - # sc64.debug_init(sd_file, disk_file, dd_turbo) - # if (is_viewer_enabled): - # print(f"Setting IS-Viewer 64 emulation to [Enabled]") - # if (sd_file): - # print(f"Using fake SD emulation file [{sd_file}]") - # if (disk_file): - # print(f"Using 64DD disk image file [{disk_file}]") - # sc64.set_dd_configuration_for_disk(disk_file) - # print(f"Setting 64DD disk state to [Inserted]") - # sc64.set_dd_disk_state("inserted" if disk_file else "ejected") - # sc64.debug_loop(is_viewer_enabled) - - except SC64Exception as e: - print(f"Error: {e}") - parser.exit(1) - except KeyboardInterrupt: - pass - # finally: - # if (sc64): - # if (disk_file): - # print(f"Setting 64DD disk state to [Ejected]") - # sc64.set_dd_disk_state("ejected") - # sys.stdout.write("\033[0m") +import queue +import serial +import time +from dd64 import BadBlockError, DD64Image +from enum import IntEnum +from serial.tools import list_ports +from threading import Thread +from typing import Optional + + + +class ConnectionException(Exception): + pass + + +class SC64Serial: + __disconnect = False + __serial = None + __thread_read = None + __thread_write = None + __queue_output = queue.Queue() + __queue_input = queue.Queue() + __queue_packet = queue.Queue() + + def __init__(self) -> None: + ports = list_ports.comports() + device_found = False + + if (self.__serial != None and self.__serial.is_open): + raise ConnectionException('Serial port is already open') + + for p in ports: + if (p.vid == 0x0403 and p.pid == 0x6014 and p.serial_number.startswith('SC64')): + try: + self.__serial = serial.Serial(p.device, timeout=0.1, write_timeout=5.0) + self.__reset_link() + except (serial.SerialException, ConnectionException): + if (self.__serial): + self.__serial.close() + continue + device_found = True + break + + if (not device_found): + raise ConnectionException('No SC64 device was found') + + self.__thread_read = Thread(target=self.__serial_process_input, daemon=True) + self.__thread_write = Thread(target=self.__serial_process_output, daemon=True) + + self.__thread_read.start() + self.__thread_write.start() + + def __del__(self) -> None: + self.__disconnect = True + if (self.__thread_read.is_alive()): + self.__thread_read.join(1) + if (self.__thread_write.is_alive()): + self.__thread_write.join(6) + if (self.__serial != None and self.__serial.is_open): + self.__serial.close() + + def __reset_link(self) -> None: + self.__serial.reset_output_buffer() + + retry_counter = 0 + self.__serial.dtr = 1 + while (self.__serial.dsr == 0): + time.sleep(0.1) + retry_counter += 1 + if (retry_counter >= 10): + raise ConnectionException('Could not reset SC64 device') + + self.__serial.reset_input_buffer() + + retry_counter = 0 + self.__serial.dtr = 0 + while (self.__serial.dsr == 1): + time.sleep(0.1) + retry_counter += 1 + if (retry_counter >= 10): + raise ConnectionException('Could not reset SC64 device') + + def __write(self, data: bytes) -> None: + try: + if (self.__disconnect): + raise ConnectionException + self.__serial.write(data) + self.__serial.flush() + except (serial.SerialException, serial.SerialTimeoutException): + raise ConnectionException + + def __read(self, length: int) -> bytes: + try: + data = b'' + while (len(data) < length and not self.__disconnect): + data += self.__serial.read(length - len(data)) + if (self.__disconnect): + raise ConnectionException + return data + except serial.SerialException: + raise ConnectionException + + def __read_int(self) -> int: + return int.from_bytes(self.__read(4), byteorder='big') + + def __serial_process_output(self) -> None: + while (not self.__disconnect and self.__serial != None and self.__serial.is_open): + try: + packet: bytes = self.__queue_output.get(timeout=0.1) + self.__write(packet) + self.__queue_output.task_done() + except queue.Empty: + continue + except ConnectionException: + break + + def __serial_process_input(self) -> None: + while (not self.__disconnect and self.__serial != None and self.__serial.is_open): + try: + token = self.__read(4) + if (len(token) == 4): + identifier = token[0:3] + command = token[3:4] + if (identifier == b'PKT'): + data = self.__read(self.__read_int()) + self.__queue_packet.put((command, data)) + elif (identifier == b'CMP' or identifier == b'ERR'): + data = self.__read(self.__read_int()) + success = identifier == b'CMP' + self.__queue_input.put((command, data, success)) + else: + raise ConnectionException + except ConnectionException: + break + + def __check_threads(self) -> None: + if (not (self.__thread_write.is_alive() and self.__thread_read.is_alive())): + raise ConnectionException('Serial link is closed') + + def __queue_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'') -> None: + if (len(cmd) != 1): + raise ValueError('Length of command is different than 1 byte') + if (len(args) != 2): + raise ValueError('Number of arguments is different than 2') + packet: bytes = b'CMD' + packet += cmd[0:1] + for arg in args: + packet += arg.to_bytes(4, byteorder='big') + packet += data + self.__queue_output.put(packet) + + def __pop_response(self, cmd: bytes, timeout: float) -> bytes: + try: + (response_cmd, data, success) = self.__queue_input.get(timeout=timeout) + self.__queue_input.task_done() + if (cmd != response_cmd): + raise ConnectionException('CMD wrong command response') + if (success == False): + raise ConnectionException('CMD response error') + return data + except queue.Empty: + raise ConnectionException('CMD response timeout') + + def execute_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'', response: bool=True, timeout: float=5.0) -> Optional[bytes]: + self.__check_threads() + self.__queue_cmd(cmd, args, data) + if (response): + return self.__pop_response(cmd, timeout) + return None + + def get_packet(self, timeout: float=0.1) -> Optional[tuple[bytes, bytes]]: + self.__check_threads() + try: + packet = self.__queue_packet.get(timeout=timeout) + self.__queue_packet.task_done() + return packet + except queue.Empty: + return None + + +class SC64: + class __Address(IntEnum): + ROM = 0x0000_0000 + FIRMWARE = 0x0200_0000 + DDIPL = 0x03BC_0000 + SAVE = 0x03FE_0000 + SHADOW = 0x04FE_0000 + BUFFER = 0x0500_0000 + EEPROM = 0x0500_2000 + + class __Length(IntEnum): + ROM = (64 * 1024 * 1024) + FIRMWARE = (2 * 1024 * 1024) + DDIPL = (4 * 1024 * 1024) + SAVE = (128 * 1024) + SHADOW = (128 * 1024) + BUFFER = (8 * 1024) + EEPROM = (2 * 1024) + + class __SaveLength(IntEnum): + NONE = 0 + EEPROM_4K = 512 + EEPROM_16K = (2 * 1024) + SRAM = (32 * 1024) + FLASHRAM = (128 * 1024) + SRAM_X3 = (3 * 32 * 1024) + + class __CfgId(IntEnum): + BOOTLOADER_SWITCH = 0 + ROM_WRITE_ENABLE = 1 + ROM_SHADOW_ENABLE = 2 + DD_MODE = 3 + ISV_ENABLE = 4 + BOOT_MODE = 5 + SAVE_TYPE = 6 + CIC_SEED = 7 + TV_TYPE = 8 + FLASH_ERASE_BLOCK = 9 + DD_DRIVE_TYPE = 10 + DD_DISK_STATE = 11 + BUTTON_STATE = 12 + BUTTON_MODE = 13 + + class __UpdateError(IntEnum): + OK = 0 + TOKEN = 1 + CHECKSUM = 2 + SIZE = 3 + UNKNOWN_CHUNK = 4 + READ = 5 + + class __UpdateStatus(IntEnum): + MCU = 1 + FPGA = 2 + BOOTLOADER = 3 + DONE = 0x80 + ERROR = 0xFF + + class __DDMode(IntEnum): + NONE = 0 + REGS = 1 + DDIPL = 2 + FULL = 3 + + class __DDDriveType(IntEnum): + RETAIL = 0 + DEVELOPMENT = 1 + + class __DDDiskState(IntEnum): + EJECTED = 0 + INSERTED = 1 + CHANGED = 2 + + class __ButtonMode(IntEnum): + NONE = 0 + N64_IRQ = 1 + USB_PACKET = 2 + DD_DISK_SWAP = 3 + + class BootMode(IntEnum): + SD = 0 + USB = 1 + ROM = 2 + DDIPL = 3 + DIRECT = 4 + + class SaveType(IntEnum): + NONE = 0 + EEPROM_4K = 1 + EEPROM_16K = 2 + SRAM = 3 + FLASHRAM = 4 + SRAM_X3 = 5 + + class CICSeed(IntEnum): + ALECK = 0xAC + X101 = 0x3F + X102 = 0x3F + X103 = 0x78 + X105 = 0x91 + X106 = 0x85 + DD_JP = 0xDD + DD_US = 0xDE + AUTO = 0xFFFF + + class TVType(IntEnum): + PAL = 0 + NTSC = 1 + MPAL = 2 + AUTO = 3 + + def __init__(self) -> None: + self.__link = SC64Serial() + version = self.__link.execute_cmd(cmd=b'v') + if (version != b'SCv2'): + raise ConnectionException('Unknown SC64 API version') + + def __get_int(self, data: bytes) -> int: + return int.from_bytes(data[:4], byteorder='big') + + def __set_config(self, config: __CfgId, value: int) -> None: + self.__link.execute_cmd(cmd=b'C', args=[config, value]) + + def __get_config(self, config: __CfgId) -> int: + data = self.__link.execute_cmd(cmd=b'c', args=[config, 0]) + return self.__get_int(data) + + def __write_memory(self, address: int, data: bytes) -> None: + self.__link.execute_cmd(cmd=b'M', args=[address, len(data)], data=data) + + def __read_memory(self, address: int, length: int) -> bytes: + return self.__link.execute_cmd(cmd=b'm', args=[address, length]) + + def reset_state(self) -> None: + self.__set_config(self.__CfgId.BOOTLOADER_SWITCH, False) + self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, False) + self.__set_config(self.__CfgId.ROM_SHADOW_ENABLE, False) + self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.NONE) + self.__set_config(self.__CfgId.ISV_ENABLE, False) + self.__set_config(self.__CfgId.BOOT_MODE, self.BootMode.USB) + self.__set_config(self.__CfgId.SAVE_TYPE, self.SaveType.NONE) + self.__set_config(self.__CfgId.CIC_SEED, self.CICSeed.AUTO) + self.__set_config(self.__CfgId.TV_TYPE, self.TVType.AUTO) + self.__set_config(self.__CfgId.DD_DRIVE_TYPE, self.__DDDriveType.RETAIL) + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.NONE) + self.set_cic_parameters() + + def upload_rom(self, data: bytes): + if (len(data) > self.__Length.ROM): + raise ValueError('ROM size too big') + self.__write_memory(self.__Address.ROM, data) + + def upload_ddipl(self, data: bytes) -> None: + if (len(data) > self.__Length.DDIPL): + raise ValueError('DDIPL size too big') + self.__write_memory(self.__Address.DDIPL, data) + + def upload_save(self, data: bytes) -> None: + save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) + if (save_type not in self.SaveType): + raise ConnectionError('Unknown save type fetched from SC64 device') + if (len(data) != self.__SaveLength[save_type.name]): + raise ValueError('Wrong save data length') + address = self.__Address.SAVE + if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): + address = self.__Address.EEPROM + self.__write_memory(address, data) + + def set_boot_mode(self, mode: BootMode) -> None: + if (mode not in self.BootMode): + raise ValueError('Boot mode outside of allowed values') + self.__set_config(self.__CfgId.BOOT_MODE, mode) + + def set_cic_seed(self, seed: int) -> None: + if (seed != self.CICSeed.AUTO): + if (seed < 0 or seed > 0xFF): + raise ValueError('CIC seed outside of allowed values') + self.__set_config(self.__CfgId.CIC_SEED, seed) + + def set_tv_type(self, type: TVType) -> None: + if (type not in self.TVType): + raise ValueError('TV type outside of allowed values') + self.__set_config(self.__CfgId.TV_TYPE, type) + + def set_save_type(self, type: SaveType) -> None: + if (type not in self.SaveType): + raise ValueError('Save type outside of allowed values') + self.__set_config(self.__CfgId.SAVE_TYPE, type) + + def set_cic_parameters(self, dd_mode: bool=False, seed: int=0x3F, checksum: bytes=[0xA5, 0x36, 0xC0, 0xF1, 0xD8, 0x59]) -> None: + if (seed < 0 or seed > 0xFF): + raise ValueError('CIC seed outside of allowed values') + if (len(checksum) != 6): + raise ValueError('CIC checksum length outside of allowed values') + args = [0, 0] + if (dd_mode): + args[0] |= (1 << 24) + args[0] |= (seed << 16) + args[0] |= (checksum[0] << 8) | checksum[1] + args[1] |= ( + (checksum[2] << 24) | + (checksum[3] << 16) | + (checksum[4] << 8) | + (checksum[5]) + ) + self.__link.execute_cmd(cmd=b'B', args=args) + + def update_firmware(self, data: bytes) -> None: + address = self.__Address.FIRMWARE + self.__write_memory(address, data) + response = self.__link.execute_cmd(cmd=b'F', args=[address, len(data)]) + error = self.__UpdateError(self.__get_int(response[0:4])) + if (error != self.__UpdateError.OK): + raise ConnectionException(f'Bad update image [{error.name}]') + status = None + while status != self.__UpdateStatus.DONE: + packet = self.__link.get_packet(timeout=60.0) + if (packet == None): + raise ConnectionException('Update timeout') + (cmd, data) = packet + status = self.__UpdateStatus(self.__get_int(data)) + if (cmd != b'F'): + raise ConnectionException('Wrong update status packet') + print(f'Update status [{status.name}]') + if (status == self.__UpdateStatus.ERROR): + raise ConnectionException('Update error, device is most likely bricked') + time.sleep(2) + + def backup_firmware(self) -> bytes: + address = self.__Address.FIRMWARE + info = self.__link.execute_cmd(cmd=b'f', args=[address, 0], timeout=60.0) + error = self.__UpdateError(self.__get_int(info[0:4])) + length = self.__get_int(info[4:8]) + print(f'Backup info - error: {error.name}, length: {hex(length)}') + if (error != self.__UpdateError.OK): + raise ConnectionException('Error while getting firmware backup') + return self.__read_memory(address, length) + + def __handle_dd_packet(self, dd: DD64Image, data: bytes) -> None: + CMD_READ_BLOCK = 1 + CMD_WRITE_BLOCK = 2 + cmd = self.__get_int(data[0:]) + address = self.__get_int(data[4:]) + track_head_block = self.__get_int(data[8:]) + track = (track_head_block >> 2) & 0xFFF + head = (track_head_block >> 1) & 0x1 + block = track_head_block & 0x1 + try: + if (not dd.loaded): + raise BadBlockError + if (cmd == CMD_READ_BLOCK): + block_data = dd.read_block(track, head, block) + self.__link.execute_cmd(cmd=b'D', args=[address, len(block_data)], data=block_data) + elif (cmd == CMD_WRITE_BLOCK): + dd.write_block(track, head, block, data[12:]) + self.__link.execute_cmd(cmd=b'd', args=[0, 0]) + else: + self.__link.execute_cmd(cmd=b'd', args=[-1, 0]) + except BadBlockError: + self.__link.execute_cmd(cmd=b'd', args=[1, 0]) + + def __handle_isv_packet(self, data: bytes) -> None: + print(data.decode('EUC-JP', errors='backslashreplace'), end='') + + def __handle_usb_packet(self, data: bytes) -> None: + print(data) + + def debug_loop(self, isv: bool=False, disks: Optional[list[str]]=None) -> None: + dd = None + drive_type = None + current_image = 0 + next_image = 0 + + self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, isv) + self.__set_config(self.__CfgId.ISV_ENABLE, isv) + + if (disks): + dd = DD64Image() + for path in disks: + try: + dd.load(path) + if (drive_type == None): + drive_type = dd.get_drive_type() + elif (drive_type != dd.get_drive_type()): + raise ValueError('Disk drive type mismatch') + dd.unload() + except ValueError as e: + dd = None + print(f'64DD disabled, incorrect disk images provided: {e}') + break + + drive_type = self.__DDDriveType.RETAIL if drive_type == 'retail' else self.__DDDriveType.DEVELOPMENT + + if (dd): + self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.FULL) + self.__set_config(self.__CfgId.DD_DRIVE_TYPE, drive_type) + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.USB_PACKET) + + while (True): + packet = self.__link.get_packet() + if (packet != None): + (cmd, data) = packet + if (cmd == b'D'): + self.__handle_dd_packet(dd, data) + if (cmd == b'I'): + self.__handle_isv_packet(data) + if (cmd == b'U'): + self.__handle_usb_packet(data) + if (cmd == b'B'): + if (dd.loaded): + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + dd.unload() + print(f'64DD disk ejected [{disks[current_image]}]') + else: + dd.load(disks[next_image]) + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.INSERTED) + current_image = next_image + next_image += 1 + if (next_image >= len(disks)): + next_image = 0 + print(f'64DD disk inserted [{disks[current_image]}]') + + + +if __name__ == '__main__': + try: + sc64 = SC64() + sc64.reset_state() + + # sc64.set_save_type(SC64.SaveType.EEPROM_4K) + # with open('S:/n64/saves/SM64.eep', 'rb+') as f: + # sc64.upload_save(f.read()) + # with open('S:/n64/roms/baserom.us.z64', 'rb+') as f: + # sc64.upload_rom(f.read()) + # sc64.set_boot_mode(SC64.BootMode.ROM) + + # with open('S:/n64/64dd/ipl/NDDJ2.n64', 'rb+') as f: + # sc64.upload_ddipl(f.read()) + # sc64.set_boot_mode(SC64.BootMode.DDIPL) + + # try: + # disks = [ + # 'S:/n64/64dd/rtl/NUD-DMPJ-JPN (sealed).ndd', + # 'S:/n64/64dd/rtl/NUD-DMBJ-JPN.ndd' + # ] + # sc64.debug_loop(disks=disks) + # except KeyboardInterrupt: + # pass + except ConnectionException as e: + print(f'SC64 error: {e}') diff --git a/sw/update/update.py b/sw/update/update.py index 9ffcc78..3ac476a 100644 --- a/sw/update/update.py +++ b/sw/update/update.py @@ -196,10 +196,10 @@ if __name__ == "__main__": update.create_update_data() hostname = platform.node() - creation_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + creation_datetime = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') info = [ f'build system: [{hostname}]', - f'creation time: [{creation_time}]', + f'creation datetime: [{creation_datetime}]', ] if (args.git): info.append(args.git)