diff --git a/sw/bootloader/src/init.c b/sw/bootloader/src/init.c index fbe80fe..a519255 100644 --- a/sw/bootloader/src/init.c +++ b/sw/bootloader/src/init.c @@ -18,12 +18,12 @@ void init (void) { exception_enable_watchdog(); exception_enable_interrupts(); - sc64_init(); - if (test_check()) { exception_disable_watchdog(); test_execute(); } + + sc64_set_config(CFG_ID_BOOTLOADER_SWITCH, false); } void deinit (void) { diff --git a/sw/bootloader/src/sc64.c b/sw/bootloader/src/sc64.c index 06ee528..cd3c779 100644 --- a/sw/bootloader/src/sc64.c +++ b/sw/bootloader/src/sc64.c @@ -6,15 +6,15 @@ typedef enum { - SC64_CMD_GET_VERSION = 'v', - SC64_CMD_CONFIG_QUERY = 'c', - SC64_CMD_CONFIG_CHANGE = 'C', - SC64_CMD_TIME_GET = 't', + SC64_CMD_VERSION_GET = 'v', + SC64_CMD_CONFIG_SET = 'C', + SC64_CMD_CONFIG_GET = 'c', SC64_CMD_TIME_SET = 'T', - SC64_CMD_USB_READ = 'm', + SC64_CMD_TIME_GET = 't', + SC64_CMD_USB_WRITE_STATUS = 'U', SC64_CMD_USB_WRITE = 'M', SC64_CMD_USB_READ_STATUS = 'u', - SC64_CMD_USB_WRITE_STATUS = 'U', + SC64_CMD_USB_READ = 'm', } cmd_id_t; @@ -45,26 +45,37 @@ bool sc64_check_presence (void) { return (version == SC64_VERSION_2); } -void sc64_init (void) { - sc64_change_config(CFG_ID_BOOTLOADER_SWITCH, false); +cmd_error_t sc64_get_error (void) { + if (pi_io_read(&SC64_REGS->SR_CMD) & SC64_SR_CMD_ERROR) { + return (cmd_error_t) pi_io_read(&SC64_REGS->DATA[0]); + } + return CMD_OK; } -uint32_t sc64_query_config (cfg_id_t id) { +void sc64_set_config (cfg_id_t id, uint32_t value) { + uint32_t args[2] = { id, value }; + sc64_execute_cmd(SC64_CMD_CONFIG_SET, args, NULL); +} + +uint32_t sc64_get_config (cfg_id_t id) { uint32_t args[2] = { id, 0 }; uint32_t result[2]; - sc64_execute_cmd(SC64_CMD_CONFIG_QUERY, args, result); + sc64_execute_cmd(SC64_CMD_CONFIG_GET, args, result); return result[1]; } -void sc64_change_config (cfg_id_t id, uint32_t value) { - uint32_t args[2] = { id, value }; - sc64_execute_cmd(SC64_CMD_CONFIG_CHANGE, args, NULL); +void sc64_get_boot_info (sc64_boot_info_t *info) { + info->cic_seed = (uint16_t) sc64_get_config(CFG_ID_CIC_SEED); + info->tv_type = (tv_type_t) sc64_get_config(CFG_ID_TV_TYPE); + info->boot_mode = (boot_mode_t) sc64_get_config(CFG_ID_BOOT_MODE); } -void sc64_get_boot_info (sc64_boot_info_t *info) { - info->cic_seed = (uint16_t) sc64_query_config(CFG_ID_CIC_SEED); - info->tv_type = (tv_type_t) sc64_query_config(CFG_ID_TV_TYPE); - info->boot_mode = (boot_mode_t) sc64_query_config(CFG_ID_BOOT_MODE); +void sc64_set_time (rtc_time_t *t) { + uint32_t args[2] = { + ((t->hour << 16) | (t->minute << 8) | t->second), + ((t->weekday << 24) | (t->year << 16) | (t->month << 8) | t->day), + }; + sc64_execute_cmd(SC64_CMD_TIME_SET, args, NULL); } void sc64_get_time (rtc_time_t *t) { @@ -79,14 +90,6 @@ void sc64_get_time (rtc_time_t *t) { t->year = ((result[1] >> 16) & 0xFF); } -void sc64_set_time (rtc_time_t *t) { - uint32_t args[2] = { - ((t->hour << 16) | (t->minute << 8) | t->second), - ((t->weekday << 24) | (t->year << 16) | (t->month << 8) | t->day), - }; - sc64_execute_cmd(SC64_CMD_TIME_SET, args, NULL); -} - bool sc64_usb_write_ready (void) { uint32_t result[2]; sc64_execute_cmd(SC64_CMD_USB_WRITE_STATUS, NULL, result); diff --git a/sw/bootloader/src/sc64.h b/sw/bootloader/src/sc64.h index 704f3dd..f9c4c8c 100644 --- a/sw/bootloader/src/sc64.h +++ b/sw/bootloader/src/sc64.h @@ -6,6 +6,14 @@ #include +typedef enum { + CMD_OK = 0, + CMD_ERROR_BAD_ADDRESS = 1, + CMD_ERROR_BAD_CONFIG_ID = 2, + CMD_ERROR_TIMEOUT = 3, + CMD_ERROR_UNKNOWN_CMD = -1, +} cmd_error_t; + typedef enum { CFG_ID_BOOTLOADER_SWITCH, CFG_ID_ROM_WRITE_ENABLE, @@ -84,14 +92,14 @@ typedef struct { bool sc64_check_presence (void); -void sc64_init (void); -uint32_t sc64_query_config (cfg_id_t id); -void sc64_change_config (cfg_id_t id, uint32_t value); +cmd_error_t sc64_get_error (void); +void sc64_set_config (cfg_id_t id, uint32_t value); +uint32_t sc64_get_config (cfg_id_t id); void sc64_get_boot_info (sc64_boot_info_t *info); -void sc64_get_time (rtc_time_t *t); void sc64_set_time (rtc_time_t *t); -bool sc64_write_usb_ready (void); -bool sc64_write_usb (uint32_t *address, uint32_t length); +void sc64_get_time (rtc_time_t *t); +bool sc64_usb_write_ready (void); +bool sc64_usb_write (uint32_t *address, uint32_t length); bool sc64_usb_read_ready (uint8_t *type, uint32_t *length); bool sc64_usb_read (uint32_t *address, uint32_t length); diff --git a/sw/bootloader/src/test.c b/sw/bootloader/src/test.c index e616be0..75d07ea 100644 --- a/sw/bootloader/src/test.c +++ b/sw/bootloader/src/test.c @@ -5,7 +5,7 @@ bool test_check (void) { - if (sc64_query_config(CFG_ID_BUTTON_STATE)) { + if (sc64_get_config(CFG_ID_BUTTON_STATE)) { return true; } return false; diff --git a/sw/pc/dd64.py b/sw/pc/dd64.py old mode 100644 new mode 100755 index 89d5eb0..2af1d6c --- a/sw/pc/dd64.py +++ b/sw/pc/dd64.py @@ -1,227 +1,227 @@ -#!/usr/bin/env python3 - -import sys -from io import BufferedReader -from typing import Optional - - - -class BadBlockError(Exception): - pass - - -class DD64Image: - __DISK_HEADS = 2 - __DISK_TRACKS = 1175 - __DISK_BLOCKS_PER_TRACK = 2 - __DISK_SECTORS_PER_BLOCK = 85 - __DISK_BAD_TRACKS_PER_ZONE = 12 - __DISK_SYSTEM_SECTOR_SIZE = 232 - __DISK_ZONES = [ - (0, 232, 158, 0), - (0, 216, 158, 158), - (0, 208, 149, 316), - (0, 192, 149, 465), - (0, 176, 149, 614), - (0, 160, 149, 763), - (0, 144, 149, 912), - (0, 128, 114, 1061), - (1, 216, 158, 157), - (1, 208, 158, 315), - (1, 192, 149, 464), - (1, 176, 149, 613), - (1, 160, 149, 762), - (1, 144, 149, 911), - (1, 128, 149, 1060), - (1, 112, 114, 1174), - ] - __DISK_VZONE_TO_PZONE = [ - [0, 1, 2, 9, 8, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10], - [0, 1, 2, 3, 10, 9, 8, 4, 5, 6, 7, 15, 14, 13, 12, 11], - [0, 1, 2, 3, 4, 11, 10, 9, 8, 5, 6, 7, 15, 14, 13, 12], - [0, 1, 2, 3, 4, 5, 12, 11, 10, 9, 8, 6, 7, 15, 14, 13], - [0, 1, 2, 3, 4, 5, 6, 13, 12, 11, 10, 9, 8, 7, 15, 14], - [0, 1, 2, 3, 4, 5, 6, 7, 14, 13, 12, 11, 10, 9, 8, 15], - [0, 1, 2, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10, 9, 8], - ] - __DISK_DRIVE_TYPES = [( - 'development', - 192, - [11, 10, 3, 2], - [0, 1, 8, 9, 16, 17, 18, 19, 20, 21, 22, 23], - ), ( - 'retail', - 232, - [9, 8, 1, 0], - [2, 3, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23], - )] - - __file: Optional[BufferedReader] - __drive_type: Optional[str] - __block_info_table: list[tuple[int, int]] - loaded: bool = False - - def __init__(self) -> None: - self.__file = None - self.__drive_type = None - block_info_table_length = self.__DISK_HEADS * self.__DISK_TRACKS * self.__DISK_BLOCKS_PER_TRACK - self.__block_info_table = [None] * block_info_table_length - - def __del__(self) -> None: - self.unload() - - def __check_system_block(self, lba: int, sector_size: int, check_disk_type: bool) -> tuple[bool, bytes]: - self.__file.seek(lba * self.__DISK_SYSTEM_SECTOR_SIZE * self.__DISK_SECTORS_PER_BLOCK) - system_block_data = self.__file.read(sector_size * self.__DISK_SECTORS_PER_BLOCK) - system_data = system_block_data[:sector_size] - for sector in range(1, self.__DISK_SECTORS_PER_BLOCK): - sector_data = system_block_data[(sector * sector_size):][:sector_size] - if (system_data != sector_data): - return (False, None) - if (check_disk_type): - if (system_data[4] != 0x10): - return (False, None) - if ((system_data[5] & 0xF0) != 0x10): - return (False, None) - return (True, system_data) - - def __parse_disk(self) -> None: - disk_system_data = None - disk_id_data = None - disk_bad_lbas = [] - - drive_index = 0 - while (disk_system_data == None) and (drive_index < len(self.__DISK_DRIVE_TYPES)): - (drive_type, system_sector_size, system_data_lbas, bad_lbas) = self.__DISK_DRIVE_TYPES[drive_index] - disk_bad_lbas.clear() - disk_bad_lbas.extend(bad_lbas) - for system_lba in system_data_lbas: - (valid, system_data) = self.__check_system_block(system_lba, system_sector_size, check_disk_type=True) - if (valid): - self.__drive_type = drive_type - disk_system_data = system_data - else: - disk_bad_lbas.append(system_lba) - drive_index += 1 - - for id_lba in [15, 14]: - (valid, id_data) = self.__check_system_block(id_lba, self.__DISK_SYSTEM_SECTOR_SIZE, check_disk_type=False) - if (valid): - disk_id_data = id_data - else: - disk_bad_lbas.append(id_lba) - - if not (disk_system_data and disk_id_data): - raise ValueError('Provided 64DD disk file is not valid') - - disk_zone_bad_tracks = [] - - for zone in range(len(self.__DISK_ZONES)): - zone_bad_tracks = [] - start = 0 if zone == 0 else system_data[0x07 + zone] - stop = system_data[0x07 + zone + 1] - for offset in range(start, stop): - zone_bad_tracks.append(system_data[0x20 + offset]) - for ignored_track in range(self.__DISK_BAD_TRACKS_PER_ZONE - len(zone_bad_tracks)): - zone_bad_tracks.append(self.__DISK_ZONES[zone][2] - ignored_track - 1) - disk_zone_bad_tracks.append(zone_bad_tracks) - - disk_type = disk_system_data[5] & 0x0F - - current_lba = 0 - starting_block = 0 - disk_file_offset = 0 - - for zone in self.__DISK_VZONE_TO_PZONE[disk_type]: - (head, sector_size, tracks, track) = self.__DISK_ZONES[zone] - - for zone_track in range(tracks): - current_zone_track = ( - (tracks - 1) - zone_track) if head else zone_track - - if (current_zone_track in disk_zone_bad_tracks[zone]): - track += (-1) if head else 1 - continue - - for block in range(self.__DISK_BLOCKS_PER_TRACK): - index = (track << 2) | (head << 1) | (starting_block ^ block) - if (current_lba not in disk_bad_lbas): - self.__block_info_table[index] = (disk_file_offset, sector_size * self.__DISK_SECTORS_PER_BLOCK) - else: - self.__block_info_table[index] = None - disk_file_offset += sector_size * self.__DISK_SECTORS_PER_BLOCK - current_lba += 1 - - track += (-1) if head else 1 - starting_block ^= 1 - - def __check_track_head_block(self, track: int, head: int, block: int) -> None: - if (track < 0 or track >= self.__DISK_TRACKS): - raise ValueError('Track outside of possible range') - if (head < 0 or head >= self.__DISK_HEADS): - raise ValueError('Head outside of possible range') - if (block < 0 or block >= self.__DISK_BLOCKS_PER_TRACK): - raise ValueError('Block outside of possible range') - - def __get_table_index(self, track: int, head: int, block: int) -> int: - return (track << 2) | (head << 1) | (block) - - def __get_block_info(self, track: int, head: int, block: int) -> Optional[tuple[int, int]]: - if (self.__file.closed): - return None - self.__check_track_head_block(track, head, block) - index = self.__get_table_index(track, head, block) - return self.__block_info_table[index] - - def load(self, path: str) -> None: - self.unload() - self.__file = open(path, 'rb+') - self.__parse_disk() - self.loaded = True - - def unload(self) -> None: - self.loaded = False - if (self.__file != None and not self.__file.closed): - self.__file.close() - self.__drive_type = None - - def get_drive_type(self) -> str: - return self.__drive_type - - def read_block(self, track: int, head: int, block: int) -> bytes: - info = self.__get_block_info(track, head, block) - if (info == None): - raise BadBlockError - (offset, block_size) = info - self.__file.seek(offset) - return self.__file.read(block_size) - - def write_block(self, track: int, head: int, block: int, data: bytes) -> None: - info = self.__get_block_info(track, head, block) - if (info == None): - raise BadBlockError - (offset, block_size) = info - if (len(data) != block_size): - raise ValueError(f'Provided data block size is different than expected ({len(data)} != {block_size})') - self.__file.seek(offset) - self.__file.write(data) - - - -if __name__ == '__main__': - id_lba_locations = [ - (7, 0, 1), - (7, 0, 0) - ] - if (len(sys.argv) >= 2): - dd = DD64Image() - dd.load(sys.argv[1]) - print(dd.get_drive_type()) - for (track, head, block) in id_lba_locations: - try: - print(dd.read_block(track, head, block)[:4]) - except BadBlockError: - print(f'Bad ID block [track: {track}, head: {head}, block: {block}]') - dd.unload() - else: - print(f'[{sys.argv[0]}]: Expected disk image path as first argument') +#!/usr/bin/env python3 + +import sys +from io import BufferedReader +from typing import Optional + + + +class BadBlockError(Exception): + pass + + +class DD64Image: + __DISK_HEADS = 2 + __DISK_TRACKS = 1175 + __DISK_BLOCKS_PER_TRACK = 2 + __DISK_SECTORS_PER_BLOCK = 85 + __DISK_BAD_TRACKS_PER_ZONE = 12 + __DISK_SYSTEM_SECTOR_SIZE = 232 + __DISK_ZONES = [ + (0, 232, 158, 0), + (0, 216, 158, 158), + (0, 208, 149, 316), + (0, 192, 149, 465), + (0, 176, 149, 614), + (0, 160, 149, 763), + (0, 144, 149, 912), + (0, 128, 114, 1061), + (1, 216, 158, 157), + (1, 208, 158, 315), + (1, 192, 149, 464), + (1, 176, 149, 613), + (1, 160, 149, 762), + (1, 144, 149, 911), + (1, 128, 149, 1060), + (1, 112, 114, 1174), + ] + __DISK_VZONE_TO_PZONE = [ + [0, 1, 2, 9, 8, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10], + [0, 1, 2, 3, 10, 9, 8, 4, 5, 6, 7, 15, 14, 13, 12, 11], + [0, 1, 2, 3, 4, 11, 10, 9, 8, 5, 6, 7, 15, 14, 13, 12], + [0, 1, 2, 3, 4, 5, 12, 11, 10, 9, 8, 6, 7, 15, 14, 13], + [0, 1, 2, 3, 4, 5, 6, 13, 12, 11, 10, 9, 8, 7, 15, 14], + [0, 1, 2, 3, 4, 5, 6, 7, 14, 13, 12, 11, 10, 9, 8, 15], + [0, 1, 2, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10, 9, 8], + ] + __DISK_DRIVE_TYPES = [( + 'development', + 192, + [11, 10, 3, 2], + [0, 1, 8, 9, 16, 17, 18, 19, 20, 21, 22, 23], + ), ( + 'retail', + 232, + [9, 8, 1, 0], + [2, 3, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23], + )] + + __file: Optional[BufferedReader] + __drive_type: Optional[str] + __block_info_table: list[tuple[int, int]] + loaded: bool = False + + def __init__(self) -> None: + self.__file = None + self.__drive_type = None + block_info_table_length = self.__DISK_HEADS * self.__DISK_TRACKS * self.__DISK_BLOCKS_PER_TRACK + self.__block_info_table = [None] * block_info_table_length + + def __del__(self) -> None: + self.unload() + + def __check_system_block(self, lba: int, sector_size: int, check_disk_type: bool) -> tuple[bool, bytes]: + self.__file.seek(lba * self.__DISK_SYSTEM_SECTOR_SIZE * self.__DISK_SECTORS_PER_BLOCK) + system_block_data = self.__file.read(sector_size * self.__DISK_SECTORS_PER_BLOCK) + system_data = system_block_data[:sector_size] + for sector in range(1, self.__DISK_SECTORS_PER_BLOCK): + sector_data = system_block_data[(sector * sector_size):][:sector_size] + if (system_data != sector_data): + return (False, None) + if (check_disk_type): + if (system_data[4] != 0x10): + return (False, None) + if ((system_data[5] & 0xF0) != 0x10): + return (False, None) + return (True, system_data) + + def __parse_disk(self) -> None: + disk_system_data = None + disk_id_data = None + disk_bad_lbas = [] + + drive_index = 0 + while (disk_system_data == None) and (drive_index < len(self.__DISK_DRIVE_TYPES)): + (drive_type, system_sector_size, system_data_lbas, bad_lbas) = self.__DISK_DRIVE_TYPES[drive_index] + disk_bad_lbas.clear() + disk_bad_lbas.extend(bad_lbas) + for system_lba in system_data_lbas: + (valid, system_data) = self.__check_system_block(system_lba, system_sector_size, check_disk_type=True) + if (valid): + self.__drive_type = drive_type + disk_system_data = system_data + else: + disk_bad_lbas.append(system_lba) + drive_index += 1 + + for id_lba in [15, 14]: + (valid, id_data) = self.__check_system_block(id_lba, self.__DISK_SYSTEM_SECTOR_SIZE, check_disk_type=False) + if (valid): + disk_id_data = id_data + else: + disk_bad_lbas.append(id_lba) + + if not (disk_system_data and disk_id_data): + raise ValueError('Provided 64DD disk file is not valid') + + disk_zone_bad_tracks = [] + + for zone in range(len(self.__DISK_ZONES)): + zone_bad_tracks = [] + start = 0 if zone == 0 else system_data[0x07 + zone] + stop = system_data[0x07 + zone + 1] + for offset in range(start, stop): + zone_bad_tracks.append(system_data[0x20 + offset]) + for ignored_track in range(self.__DISK_BAD_TRACKS_PER_ZONE - len(zone_bad_tracks)): + zone_bad_tracks.append(self.__DISK_ZONES[zone][2] - ignored_track - 1) + disk_zone_bad_tracks.append(zone_bad_tracks) + + disk_type = disk_system_data[5] & 0x0F + + current_lba = 0 + starting_block = 0 + disk_file_offset = 0 + + for zone in self.__DISK_VZONE_TO_PZONE[disk_type]: + (head, sector_size, tracks, track) = self.__DISK_ZONES[zone] + + for zone_track in range(tracks): + current_zone_track = ( + (tracks - 1) - zone_track) if head else zone_track + + if (current_zone_track in disk_zone_bad_tracks[zone]): + track += (-1) if head else 1 + continue + + for block in range(self.__DISK_BLOCKS_PER_TRACK): + index = (track << 2) | (head << 1) | (starting_block ^ block) + if (current_lba not in disk_bad_lbas): + self.__block_info_table[index] = (disk_file_offset, sector_size * self.__DISK_SECTORS_PER_BLOCK) + else: + self.__block_info_table[index] = None + disk_file_offset += sector_size * self.__DISK_SECTORS_PER_BLOCK + current_lba += 1 + + track += (-1) if head else 1 + starting_block ^= 1 + + def __check_track_head_block(self, track: int, head: int, block: int) -> None: + if (track < 0 or track >= self.__DISK_TRACKS): + raise ValueError('Track outside of possible range') + if (head < 0 or head >= self.__DISK_HEADS): + raise ValueError('Head outside of possible range') + if (block < 0 or block >= self.__DISK_BLOCKS_PER_TRACK): + raise ValueError('Block outside of possible range') + + def __get_table_index(self, track: int, head: int, block: int) -> int: + return (track << 2) | (head << 1) | (block) + + def __get_block_info(self, track: int, head: int, block: int) -> Optional[tuple[int, int]]: + if (self.__file.closed): + return None + self.__check_track_head_block(track, head, block) + index = self.__get_table_index(track, head, block) + return self.__block_info_table[index] + + def load(self, path: str) -> None: + self.unload() + self.__file = open(path, 'rb+') + self.__parse_disk() + self.loaded = True + + def unload(self) -> None: + self.loaded = False + if (self.__file != None and not self.__file.closed): + self.__file.close() + self.__drive_type = None + + def get_drive_type(self) -> str: + return self.__drive_type + + def read_block(self, track: int, head: int, block: int) -> bytes: + info = self.__get_block_info(track, head, block) + if (info == None): + raise BadBlockError + (offset, block_size) = info + self.__file.seek(offset) + return self.__file.read(block_size) + + def write_block(self, track: int, head: int, block: int, data: bytes) -> None: + info = self.__get_block_info(track, head, block) + if (info == None): + raise BadBlockError + (offset, block_size) = info + if (len(data) != block_size): + raise ValueError(f'Provided data block size is different than expected ({len(data)} != {block_size})') + self.__file.seek(offset) + self.__file.write(data) + + + +if __name__ == '__main__': + id_lba_locations = [ + (7, 0, 1), + (7, 0, 0) + ] + if (len(sys.argv) >= 2): + dd = DD64Image() + dd.load(sys.argv[1]) + print(dd.get_drive_type()) + for (track, head, block) in id_lba_locations: + try: + print(dd.read_block(track, head, block)[:4]) + except BadBlockError: + print(f'Bad ID block [track: {track}, head: {head}, block: {block}]') + dd.unload() + else: + print(f'[{sys.argv[0]}]: Expected disk image path as first argument') diff --git a/sw/pc/sc64.py b/sw/pc/sc64.py old mode 100644 new mode 100755 index 783b421..67fe227 --- a/sw/pc/sc64.py +++ b/sw/pc/sc64.py @@ -1,711 +1,714 @@ -#!/usr/bin/env python3 - -import argparse -import os -import queue -import serial -import sys -import time -from datetime import datetime -from dd64 import BadBlockError, DD64Image -from enum import Enum, IntEnum -from serial.tools import list_ports -from threading import Thread -from typing import Optional - - - -class ConnectionException(Exception): - pass - - -class SC64Serial: - __disconnect = False - __serial: Optional[serial.Serial] = None - __thread_read = None - __thread_write = None - __queue_output = queue.Queue() - __queue_input = queue.Queue() - __queue_packet = queue.Queue() - - __VID = 0x0403 - __PID = 0x6014 - - def __init__(self) -> None: - ports = list_ports.comports() - device_found = False - - if (self.__serial != None and self.__serial.is_open): - raise ConnectionException('Serial port is already open') - - for p in ports: - if (p.vid == self.__VID and p.pid == self.__PID and p.serial_number.startswith('SC64')): - try: - self.__serial = serial.Serial(p.device, timeout=0.1, write_timeout=5.0) - self.__reset_link() - except (serial.SerialException, ConnectionException): - if (self.__serial): - self.__serial.close() - continue - device_found = True - break - - if (not device_found): - raise ConnectionException('No SC64 device was found') - - self.__thread_read = Thread(target=self.__serial_process_input, daemon=True) - self.__thread_write = Thread(target=self.__serial_process_output, daemon=True) - - self.__thread_read.start() - self.__thread_write.start() - - def __del__(self) -> None: - self.__disconnect = True - if (self.__thread_read != None and self.__thread_read.is_alive()): - self.__thread_read.join(1) - if (self.__thread_write != None and self.__thread_write.is_alive()): - self.__thread_write.join(6) - if (self.__serial != None and self.__serial.is_open): - self.__serial.close() - - def __reset_link(self) -> None: - self.__serial.reset_output_buffer() - - retry_counter = 0 - self.__serial.dtr = 1 - while (self.__serial.dsr == 0): - time.sleep(0.1) - retry_counter += 1 - if (retry_counter >= 10): - raise ConnectionException('Could not reset SC64 device') - - self.__serial.reset_input_buffer() - - retry_counter = 0 - self.__serial.dtr = 0 - while (self.__serial.dsr == 1): - time.sleep(0.1) - retry_counter += 1 - if (retry_counter >= 10): - raise ConnectionException('Could not reset SC64 device') - - def __write(self, data: bytes) -> None: - try: - if (self.__disconnect): - raise ConnectionException - self.__serial.write(data) - self.__serial.flush() - except (serial.SerialException, serial.SerialTimeoutException): - raise ConnectionException - - def __read(self, length: int) -> bytes: - try: - data = b'' - while (len(data) < length and not self.__disconnect): - data += self.__serial.read(length - len(data)) - if (self.__disconnect): - raise ConnectionException - return data - except serial.SerialException: - raise ConnectionException - - def __read_int(self) -> int: - return int.from_bytes(self.__read(4), byteorder='big') - - def __serial_process_output(self) -> None: - while (not self.__disconnect and self.__serial != None and self.__serial.is_open): - try: - packet: bytes = self.__queue_output.get(timeout=0.1) - self.__write(packet) - self.__queue_output.task_done() - except queue.Empty: - continue - except ConnectionException: - break - - def __serial_process_input(self) -> None: - while (not self.__disconnect and self.__serial != None and self.__serial.is_open): - try: - token = self.__read(4) - if (len(token) == 4): - identifier = token[0:3] - command = token[3:4] - if (identifier == b'PKT'): - data = self.__read(self.__read_int()) - self.__queue_packet.put((command, data)) - elif (identifier == b'CMP' or identifier == b'ERR'): - data = self.__read(self.__read_int()) - success = identifier == b'CMP' - self.__queue_input.put((command, data, success)) - else: - raise ConnectionException - except ConnectionException: - break - - def __check_threads(self) -> None: - if (not (self.__thread_write.is_alive() and self.__thread_read.is_alive())): - raise ConnectionException('Serial link is closed') - - def __queue_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'') -> None: - if (len(cmd) != 1): - raise ValueError('Length of command is different than 1 byte') - if (len(args) != 2): - raise ValueError('Number of arguments is different than 2') - packet: bytes = b'CMD' - packet += cmd[0:1] - for arg in args: - packet += arg.to_bytes(4, byteorder='big') - packet += data - self.__queue_output.put(packet) - - def __pop_response(self, cmd: bytes, timeout: float) -> bytes: - try: - (response_cmd, data, success) = self.__queue_input.get(timeout=timeout) - self.__queue_input.task_done() - if (cmd != response_cmd): - raise ConnectionException('CMD wrong command response') - if (success == False): - raise ConnectionException('CMD response error') - return data - except queue.Empty: - raise ConnectionException('CMD response timeout') - - def execute_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'', response: bool=True, timeout: float=5.0) -> Optional[bytes]: - self.__check_threads() - self.__queue_cmd(cmd, args, data) - if (response): - return self.__pop_response(cmd, timeout) - return None - - def get_packet(self, timeout: float=0.1) -> Optional[tuple[bytes, bytes]]: - self.__check_threads() - try: - packet = self.__queue_packet.get(timeout=timeout) - self.__queue_packet.task_done() - return packet - except queue.Empty: - return None - - -class SC64: - class __Address(IntEnum): - SDRAM = 0x0000_0000 - FLASH = 0x0400_0000 - BUFFER = 0x0500_0000 - EEPROM = 0x0500_2000 - FIRMWARE = 0x0200_0000 - DDIPL = 0x03BC_0000 - SAVE = 0x03FE_0000 - SHADOW = 0x04FE_0000 - - class __Length(IntEnum): - SDRAM = (64 * 1024 * 1024) - FLASH = (16 * 1024 * 1024) - BUFFER = (8 * 1024) - EEPROM = (2 * 1024) - DDIPL = (4 * 1024 * 1024) - SAVE = (128 * 1024) - SHADOW = (128 * 1024) - - class __SaveLength(IntEnum): - NONE = 0 - EEPROM_4K = 512 - EEPROM_16K = (2 * 1024) - SRAM = (32 * 1024) - FLASHRAM = (128 * 1024) - SRAM_3X = (3 * 32 * 1024) - - class __CfgId(IntEnum): - BOOTLOADER_SWITCH = 0 - ROM_WRITE_ENABLE = 1 - ROM_SHADOW_ENABLE = 2 - DD_MODE = 3 - ISV_ENABLE = 4 - BOOT_MODE = 5 - SAVE_TYPE = 6 - CIC_SEED = 7 - TV_TYPE = 8 - FLASH_ERASE_BLOCK = 9 - DD_DRIVE_TYPE = 10 - DD_DISK_STATE = 11 - BUTTON_STATE = 12 - BUTTON_MODE = 13 - - class __UpdateError(IntEnum): - OK = 0 - TOKEN = 1 - CHECKSUM = 2 - SIZE = 3 - UNKNOWN_CHUNK = 4 - READ = 5 - - class __UpdateStatus(IntEnum): - MCU = 1 - FPGA = 2 - BOOTLOADER = 3 - DONE = 0x80 - ERROR = 0xFF - - class __DDMode(IntEnum): - NONE = 0 - REGS = 1 - DDIPL = 2 - FULL = 3 - - class __DDDriveType(IntEnum): - RETAIL = 0 - DEVELOPMENT = 1 - - class __DDDiskState(IntEnum): - EJECTED = 0 - INSERTED = 1 - CHANGED = 2 - - class __ButtonMode(IntEnum): - NONE = 0 - N64_IRQ = 1 - USB_PACKET = 2 - DD_DISK_SWAP = 3 - - class BootMode(IntEnum): - SD = 0 - USB = 1 - ROM = 2 - DDIPL = 3 - DIRECT = 4 - - class SaveType(IntEnum): - NONE = 0 - EEPROM_4K = 1 - EEPROM_16K = 2 - SRAM = 3 - FLASHRAM = 4 - SRAM_X3 = 5 - - class CICSeed(IntEnum): - ALECK = 0xAC - X101 = 0x3F - X102 = 0x3F - X103 = 0x78 - X105 = 0x91 - X106 = 0x85 - DD_JP = 0xDD - DD_US = 0xDE - AUTO = 0xFFFF - - class TVType(IntEnum): - PAL = 0 - NTSC = 1 - MPAL = 2 - AUTO = 3 - - def __init__(self) -> None: - self.__link = SC64Serial() - version = self.__link.execute_cmd(cmd=b'v') - if (version != b'SCv2'): - raise ConnectionException('Unknown SC64 API version') - - def __get_int(self, data: bytes) -> int: - return int.from_bytes(data[:4], byteorder='big') - - def __set_config(self, config: __CfgId, value: int) -> None: - self.__link.execute_cmd(cmd=b'C', args=[config, value]) - - def __get_config(self, config: __CfgId) -> int: - data = self.__link.execute_cmd(cmd=b'c', args=[config, 0]) - return self.__get_int(data) - - def __write_memory(self, address: int, data: bytes) -> None: - if (len(data) > 0): - self.__link.execute_cmd(cmd=b'M', args=[address, len(data)], data=data, timeout=10.0) - - def __read_memory(self, address: int, length: int) -> bytes: - if (length > 0): - return self.__link.execute_cmd(cmd=b'm', args=[address, length], timeout=10.0) - return bytes([]) - - def __erase_flash_region(self, address: int, length: int) -> None: - if (address < self.__Address.FLASH): - raise ValueError('Flash erase address or length outside of possible range') - if ((address + length) > (self.__Address.FLASH + self.__Length.FLASH)): - raise ValueError('Flash erase address or length outside of possible range') - erase_block_size = self.__get_config(self.__CfgId.FLASH_ERASE_BLOCK) - if ((address % erase_block_size != 0) or (length % erase_block_size != 0)): - raise ValueError('Flash erase address or length not aligned to block size') - for offset in range(address, address + length, erase_block_size): - self.__set_config(self.__CfgId.FLASH_ERASE_BLOCK, offset) - - def reset_state(self) -> None: - self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, False) - self.__set_config(self.__CfgId.ROM_SHADOW_ENABLE, False) - self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.NONE) - self.__set_config(self.__CfgId.ISV_ENABLE, False) - self.__set_config(self.__CfgId.BOOT_MODE, self.BootMode.USB) - self.__set_config(self.__CfgId.SAVE_TYPE, self.SaveType.NONE) - self.__set_config(self.__CfgId.CIC_SEED, self.CICSeed.AUTO) - self.__set_config(self.__CfgId.TV_TYPE, self.TVType.AUTO) - self.__set_config(self.__CfgId.DD_DRIVE_TYPE, self.__DDDriveType.RETAIL) - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.NONE) - self.set_cic_parameters() - - def get_state(self): - return { - 'bootloader_switch': bool(self.__get_config(self.__CfgId.BOOTLOADER_SWITCH)), - 'rom_write_enable': bool(self.__get_config(self.__CfgId.ROM_WRITE_ENABLE)), - 'rom_shadow_enable': bool(self.__get_config(self.__CfgId.ROM_SHADOW_ENABLE)), - 'dd_mode': self.__DDMode(self.__get_config(self.__CfgId.DD_MODE)), - 'isv_enable': bool(self.__get_config(self.__CfgId.ISV_ENABLE)), - 'boot_mode': self.BootMode(self.__get_config(self.__CfgId.BOOT_MODE)), - 'save_type': self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)), - 'cic_seed': self.CICSeed(self.__get_config(self.__CfgId.CIC_SEED)), - 'tv_type': self.TVType(self.__get_config(self.__CfgId.TV_TYPE)), - 'flash_erase_block': self.__get_config(self.__CfgId.FLASH_ERASE_BLOCK), - 'dd_drive_type': self.__DDDriveType(self.__get_config(self.__CfgId.DD_DRIVE_TYPE)), - 'dd_disk_state': self.__DDDiskState(self.__get_config(self.__CfgId.DD_DISK_STATE)), - 'button_state': bool(self.__get_config(self.__CfgId.BUTTON_STATE)), - 'button_mode': self.__ButtonMode(self.__get_config(self.__CfgId.BUTTON_MODE)), - } - - def upload_rom(self, data: bytes, use_shadow: bool=True): - rom_length = len(data) - if (rom_length > self.__Length.SDRAM): - raise ValueError('ROM size too big') - sdram_length = rom_length - shadow_enabled = use_shadow and rom_length > (self.__Length.SDRAM - self.__Length.SHADOW) - if (shadow_enabled): - sdram_length = (self.__Length.SDRAM - self.__Length.SHADOW) - shadow_length = rom_length - sdram_length - if (self.__read_memory(self.__Address.SHADOW, shadow_length) != data[sdram_length:]): - self.__erase_flash_region(self.__Address.SHADOW, self.__Length.SHADOW) - self.__write_memory(self.__Address.SHADOW, data[sdram_length:]) - if (self.__read_memory(self.__Address.SHADOW, shadow_length) != data[sdram_length:]): - raise ConnectionException('Shadow ROM program failure') - self.__write_memory(self.__Address.SDRAM, data[:sdram_length]) - self.__set_config(self.__CfgId.ROM_SHADOW_ENABLE, shadow_enabled) - - def upload_ddipl(self, data: bytes) -> None: - if (len(data) > self.__Length.DDIPL): - raise ValueError('DDIPL size too big') - self.__write_memory(self.__Address.DDIPL, data) - - def upload_save(self, data: bytes) -> None: - save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) - if (save_type not in self.SaveType): - raise ConnectionError('Unknown save type fetched from SC64 device') - if (save_type == self.SaveType.NONE): - raise ValueError('No save type set inside SC64 device') - if (len(data) != self.__SaveLength[save_type.name]): - raise ValueError('Wrong save data length') - address = self.__Address.SAVE - if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): - address = self.__Address.EEPROM - self.__write_memory(address, data) - - def download_save(self) -> bytes: - save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) - if (save_type not in self.SaveType): - raise ConnectionError('Unknown save type fetched from SC64 device') - if (save_type == self.SaveType.NONE): - raise ValueError('No save type set inside SC64 device') - address = self.__Address.SAVE - length = self.__SaveLength[save_type.name] - if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): - address = self.__Address.EEPROM - return self.__read_memory(address, length) - - def set_rtc(self, t: datetime) -> None: - to_bcd = lambda v: ((int((v / 10) % 10) << 4) | int(int(v) % 10)) - data = bytes([ - to_bcd(t.weekday() + 1), - to_bcd(t.hour), - to_bcd(t.minute), - to_bcd(t.second), - 0, - to_bcd(t.year), - to_bcd(t.month), - to_bcd(t.day), - ]) - self.__link.execute_cmd(cmd=b'T', args=[self.__get_int(data[0:4]), self.__get_int(data[4:8])]) - - def set_boot_mode(self, mode: BootMode) -> None: - if (mode not in self.BootMode): - raise ValueError('Boot mode outside of allowed values') - self.__set_config(self.__CfgId.BOOT_MODE, mode) - - def set_cic_seed(self, seed: int) -> None: - if (seed != self.CICSeed.AUTO): - if (seed < 0 or seed > 0xFF): - raise ValueError('CIC seed outside of allowed values') - self.__set_config(self.__CfgId.CIC_SEED, seed) - - def set_tv_type(self, type: TVType) -> None: - if (type not in self.TVType): - raise ValueError('TV type outside of allowed values') - self.__set_config(self.__CfgId.TV_TYPE, type) - - def set_save_type(self, type: SaveType) -> None: - if (type not in self.SaveType): - raise ValueError('Save type outside of allowed values') - self.__set_config(self.__CfgId.SAVE_TYPE, type) - - def set_cic_parameters(self, dd_mode: bool=False, seed: int=0x3F, checksum: bytes=bytes([0xA5, 0x36, 0xC0, 0xF1, 0xD8, 0x59])) -> None: - if (seed < 0 or seed > 0xFF): - raise ValueError('CIC seed outside of allowed values') - if (len(checksum) != 6): - raise ValueError('CIC checksum length outside of allowed values') - data = bytes([1 if dd_mode else 0, seed]) - data = [*data, *checksum] - self.__link.execute_cmd(cmd=b'B', args=[self.__get_int(data[0:4]), self.__get_int(data[4:8])]) - - def update_firmware(self, data: bytes) -> None: - address = self.__Address.FIRMWARE - self.__write_memory(address, data) - response = self.__link.execute_cmd(cmd=b'F', args=[address, len(data)]) - error = self.__UpdateError(self.__get_int(response[0:4])) - if (error != self.__UpdateError.OK): - raise ConnectionException(f'Bad update image [{error.name}]') - status = None - while status != self.__UpdateStatus.DONE: - packet = self.__link.get_packet(timeout=60.0) - if (packet == None): - raise ConnectionException('Update timeout') - (cmd, data) = packet - if (cmd != b'F'): - raise ConnectionException('Wrong update status packet') - status = self.__UpdateStatus(self.__get_int(data)) - print(f'Update status [{status.name}]') - if (status == self.__UpdateStatus.ERROR): - raise ConnectionException('Update error, device is most likely bricked') - time.sleep(2) - - def backup_firmware(self) -> bytes: - address = self.__Address.FIRMWARE - info = self.__link.execute_cmd(cmd=b'f', args=[address, 0], timeout=60.0) - error = self.__UpdateError(self.__get_int(info[0:4])) - length = self.__get_int(info[4:8]) - if (error != self.__UpdateError.OK): - raise ConnectionException('Error while getting firmware backup') - return self.__read_memory(address, length) - - def __handle_dd_packet(self, dd: Optional[DD64Image], data: bytes) -> None: - CMD_READ_BLOCK = 1 - CMD_WRITE_BLOCK = 2 - cmd = self.__get_int(data[0:]) - address = self.__get_int(data[4:]) - track_head_block = self.__get_int(data[8:]) - track = (track_head_block >> 2) & 0xFFF - head = (track_head_block >> 1) & 0x1 - block = track_head_block & 0x1 - try: - if (not dd or not dd.loaded): - raise BadBlockError - if (cmd == CMD_READ_BLOCK): - block_data = dd.read_block(track, head, block) - self.__link.execute_cmd(cmd=b'D', args=[address, len(block_data)], data=block_data) - elif (cmd == CMD_WRITE_BLOCK): - dd.write_block(track, head, block, data[12:]) - self.__link.execute_cmd(cmd=b'd', args=[0, 0]) - else: - self.__link.execute_cmd(cmd=b'd', args=[-1, 0]) - except BadBlockError: - self.__link.execute_cmd(cmd=b'd', args=[1, 0]) - - def __handle_isv_packet(self, data: bytes) -> None: - print(data.decode('EUC-JP', errors='backslashreplace'), end='') - - def __handle_usb_packet(self, data: bytes) -> None: - print(data) - - def debug_loop(self, isv: bool=False, disks: Optional[list[str]]=None) -> None: - dd = None - current_image = 0 - next_image = 0 - - self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, isv) - self.__set_config(self.__CfgId.ISV_ENABLE, isv) - if (isv): - print('IS-Viewer64 support set to [ENABLED]') - if (self.__get_config(self.__CfgId.ROM_SHADOW_ENABLE)): - print('ROM shadow enabled - ISV support will NOT work (use --no-shadow option to disable it)') - - if (disks): - dd = DD64Image() - drive_type = None - for path in disks: - try: - dd.load(path) - if (drive_type == None): - drive_type = dd.get_drive_type() - elif (drive_type != dd.get_drive_type()): - raise ValueError('Disk drive type mismatch') - dd.unload() - except ValueError as e: - dd = None - print(f'64DD disabled, incorrect disk images provided: {e}') - break - if (dd): - self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.FULL) - self.__set_config(self.__CfgId.DD_DRIVE_TYPE, { - 'retail': self.__DDDriveType.RETAIL, - 'development': self.__DDDriveType.DEVELOPMENT - }[drive_type]) - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.USB_PACKET) - print('64DD enabled, loaded disks:') - for disk in disks: - print(f' - {os.path.basename(disk)}') - print('Press button on SC64 device to cycle through provided disks') - - print('Debug loop started, use Ctrl-C to exit') - - try: - while (True): - packet = self.__link.get_packet() - if (packet != None): - (cmd, data) = packet - if (cmd == b'D'): - self.__handle_dd_packet(dd, data) - if (cmd == b'I'): - self.__handle_isv_packet(data) - if (cmd == b'U'): - self.__handle_usb_packet(data) - if (cmd == b'B'): - if (not dd.loaded): - dd.load(disks[next_image]) - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.INSERTED) - current_image = next_image - next_image += 1 - if (next_image >= len(disks)): - next_image = 0 - print(f'64DD disk inserted - {os.path.basename(disks[current_image])}') - else: - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - dd.unload() - print(f'64DD disk ejected - {os.path.basename(disks[current_image])}') - except KeyboardInterrupt: - if (dd and dd.loaded): - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - - -class EnumAction(argparse.Action): - def __init__(self, **kwargs): - type = kwargs.pop('type', None) - if type is None: - raise ValueError('No type was provided') - if not issubclass(type, Enum): - raise TypeError('Provided type is not an Enum subclass') - items = (choice.lower().replace('_', '-') for (choice, _) in type.__members__.items()) - kwargs.setdefault('choices', tuple(items)) - super(EnumAction, self).__init__(**kwargs) - self.__enum = type - - def __call__(self, parser, namespace, values, option_string): - key = str(values).upper().replace('-', '_') - value = self.__enum[key] - setattr(namespace, self.dest, value) - - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='SC64 control software') - parser.add_argument('--backup', help='backup SC64 firmware and write it to specified file') - parser.add_argument('--update', help='update SC64 firmware from specified file') - parser.add_argument('--reset-state', action='store_true', help='reset SC64 internal state') - parser.add_argument('--print-state', action='store_true', help='print SC64 internal state') - parser.add_argument('--boot', type=SC64.BootMode, action=EnumAction, help='set boot mode') - parser.add_argument('--tv', type=SC64.TVType, action=EnumAction, help='force TV type to set value') - parser.add_argument('--cic', type=SC64.CICSeed, action=EnumAction, help='force CIC seed to set value') - parser.add_argument('--rtc', action='store_true', help='update clock in SC64 to system time') - parser.add_argument('--rom', help='upload ROM from specified file') - parser.add_argument('--no-shadow', action='store_false', help='do not put last 128 kB of ROM inside flash memory (can corrupt non EEPROM saves)') - parser.add_argument('--save-type', type=SC64.SaveType, action=EnumAction, help='set save type') - parser.add_argument('--save', help='upload save from specified file') - parser.add_argument('--backup-save', help='download save and write it to specified file') - parser.add_argument('--ddipl', help='upload 64DD IPL from specified file') - parser.add_argument('--disk', action='append', help='path to 64DD disk (.ndd format), can be specified multiple times') - parser.add_argument('--isv', action='store_true', help='enable IS-Viewer64 support') - parser.add_argument('--debug', action='store_true', help='run debug loop (required for IS-Viewer64 and 64DD)') - - if (len(sys.argv) <= 1): - parser.print_help() - parser.exit() - - args = parser.parse_args() - - try: - sc64 = SC64() - - if (args.backup): - with open(args.backup, 'wb+') as f: - print('Generating backup, this might take a while... ', end='', flush=True) - f.write(sc64.backup_firmware()) - print('done') - - if (args.update): - with open(args.update, 'rb+') as f: - print('Updating firmware, this might take a while... ', end='', flush=True) - sc64.update_firmware(f.read()) - print('done') - - if (args.reset_state): - sc64.reset_state() - print('SC64 internal state reset') - - if (args.print_state): - state = sc64.get_state() - print('Current SC64 internal state:') - for key, value in state.items(): - if (hasattr(value, 'name')): - value = getattr(value, 'name') - print(f' {key}: {value}') - - if (args.boot != None): - sc64.set_boot_mode(args.boot) - print(f'Boot mode set to [{args.boot.name}]') - - if (args.tv != None): - sc64.set_tv_type(args.tv) - print(f'TV type set to [{args.tv.name}]') - - if (args.cic != None): - sc64.set_cic_seed(args.cic) - print(f'CIC seed set to [0x{args.cic:X}]') - - if (args.rtc): - sc64.set_rtc(datetime.now()) - - if (args.rom): - with open(args.rom, 'rb+') as f: - print('Uploading ROM... ', end='', flush=True) - sc64.upload_rom(f.read(), use_shadow=args.no_shadow) - print('done') - - if (args.save_type != None): - sc64.set_save_type(args.save_type) - print(f'Save type set to [{args.save_type.name}]') - - if (args.save): - with open(args.save, 'rb+') as f: - print('Uploading save... ', end='', flush=True) - sc64.upload_save(f.read()) - print('done') - - if (args.ddipl): - with open(args.ddipl, 'rb+') as f: - print('Uploading 64DD IPL... ', end='', flush=True) - sc64.upload_ddipl(f.read()) - print('done') - - if (args.debug): - sc64.debug_loop(isv=args.isv, disks=args.disk) - - if (args.backup_save): - with open(args.backup_save, 'wb+') as f: - print('Downloading save... ', end='', flush=True) - f.write(sc64.download_save()) - print('done') - except ValueError as e: - print(f'\nValue error: {e}') - except ConnectionException as e: - print(f'\nSC64 error: {e}') +#!/usr/bin/env python3 + +import argparse +import os +import queue +import serial +import sys +import time +from datetime import datetime +from dd64 import BadBlockError, DD64Image +from enum import Enum, IntEnum +from serial.tools import list_ports +from threading import Thread +from typing import Optional + + + +class ConnectionException(Exception): + pass + + +class SC64Serial: + __disconnect = False + __serial: Optional[serial.Serial] = None + __thread_read = None + __thread_write = None + __queue_output = queue.Queue() + __queue_input = queue.Queue() + __queue_packet = queue.Queue() + + __VID = 0x0403 + __PID = 0x6014 + + __CHUNK_SIZE = (64 * 1024) + + def __init__(self) -> None: + ports = list_ports.comports() + device_found = False + + if (self.__serial != None and self.__serial.is_open): + raise ConnectionException('Serial port is already open') + + for p in ports: + if (p.vid == self.__VID and p.pid == self.__PID and p.serial_number.startswith('SC64')): + try: + self.__serial = serial.Serial(p.device, timeout=1.0, write_timeout=1.0) + self.__reset_link() + except (serial.SerialException, ConnectionException): + if (self.__serial): + self.__serial.close() + continue + device_found = True + break + + if (not device_found): + raise ConnectionException('No SC64 device was found') + + self.__thread_read = Thread(target=self.__serial_process_input, daemon=True) + self.__thread_write = Thread(target=self.__serial_process_output, daemon=True) + + self.__thread_read.start() + self.__thread_write.start() + + def __del__(self) -> None: + self.__disconnect = True + if (self.__thread_read != None and self.__thread_read.is_alive()): + self.__thread_read.join(1) + if (self.__thread_write != None and self.__thread_write.is_alive()): + self.__thread_write.join(1) + if (self.__serial != None and self.__serial.is_open): + self.__serial.close() + + def __reset_link(self) -> None: + self.__serial.reset_output_buffer() + + retry_counter = 0 + self.__serial.dtr = 1 + while (self.__serial.dsr == 0): + time.sleep(0.1) + retry_counter += 1 + if (retry_counter >= 10): + raise ConnectionException('Could not reset SC64 device') + + self.__serial.reset_input_buffer() + + retry_counter = 0 + self.__serial.dtr = 0 + while (self.__serial.dsr == 1): + time.sleep(0.1) + retry_counter += 1 + if (retry_counter >= 10): + raise ConnectionException('Could not reset SC64 device') + + def __write(self, data: bytes) -> None: + try: + if (self.__disconnect): + raise ConnectionException + for offset in range(0, len(data), self.__CHUNK_SIZE): + self.__serial.write(data[offset:offset + self.__CHUNK_SIZE]) + self.__serial.flush() + except (serial.SerialException, serial.SerialTimeoutException): + raise ConnectionException + + def __read(self, length: int) -> bytes: + try: + data = b'' + while (len(data) < length and not self.__disconnect): + data += self.__serial.read(length - len(data)) + if (self.__disconnect): + raise ConnectionException + return data + except serial.SerialException: + raise ConnectionException + + def __read_int(self) -> int: + return int.from_bytes(self.__read(4), byteorder='big') + + def __serial_process_output(self) -> None: + while (not self.__disconnect and self.__serial != None and self.__serial.is_open): + try: + packet: bytes = self.__queue_output.get(timeout=0.1) + self.__write(packet) + self.__queue_output.task_done() + except queue.Empty: + continue + except ConnectionException: + break + + def __serial_process_input(self) -> None: + while (not self.__disconnect and self.__serial != None and self.__serial.is_open): + try: + token = self.__read(4) + if (len(token) == 4): + identifier = token[0:3] + command = token[3:4] + if (identifier == b'PKT'): + data = self.__read(self.__read_int()) + self.__queue_packet.put((command, data)) + elif (identifier == b'CMP' or identifier == b'ERR'): + data = self.__read(self.__read_int()) + success = identifier == b'CMP' + self.__queue_input.put((command, data, success)) + else: + raise ConnectionException + except ConnectionException: + break + + def __check_threads(self) -> None: + if (not (self.__thread_write.is_alive() and self.__thread_read.is_alive())): + raise ConnectionException('Serial link is closed') + + def __queue_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'') -> None: + if (len(cmd) != 1): + raise ValueError('Length of command is different than 1 byte') + if (len(args) != 2): + raise ValueError('Number of arguments is different than 2') + packet: bytes = b'CMD' + packet += cmd[0:1] + for arg in args: + packet += arg.to_bytes(4, byteorder='big') + packet += data + self.__queue_output.put(packet) + + def __pop_response(self, cmd: bytes, timeout: float) -> bytes: + try: + (response_cmd, data, success) = self.__queue_input.get(timeout=timeout) + self.__queue_input.task_done() + if (cmd != response_cmd): + raise ConnectionException('CMD wrong command response') + if (success == False): + raise ConnectionException('CMD response error') + return data + except queue.Empty: + raise ConnectionException('CMD response timeout') + + def execute_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'', response: bool=True, timeout: float=5.0) -> Optional[bytes]: + self.__check_threads() + self.__queue_cmd(cmd, args, data) + if (response): + return self.__pop_response(cmd, timeout) + return None + + def get_packet(self, timeout: float=0.1) -> Optional[tuple[bytes, bytes]]: + self.__check_threads() + try: + packet = self.__queue_packet.get(timeout=timeout) + self.__queue_packet.task_done() + return packet + except queue.Empty: + return None + + +class SC64: + class __Address(IntEnum): + SDRAM = 0x0000_0000 + FLASH = 0x0400_0000 + BUFFER = 0x0500_0000 + EEPROM = 0x0500_2000 + FIRMWARE = 0x0200_0000 + DDIPL = 0x03BC_0000 + SAVE = 0x03FE_0000 + SHADOW = 0x04FE_0000 + + class __Length(IntEnum): + SDRAM = (64 * 1024 * 1024) + FLASH = (16 * 1024 * 1024) + BUFFER = (8 * 1024) + EEPROM = (2 * 1024) + DDIPL = (4 * 1024 * 1024) + SAVE = (128 * 1024) + SHADOW = (128 * 1024) + + class __SaveLength(IntEnum): + NONE = 0 + EEPROM_4K = 512 + EEPROM_16K = (2 * 1024) + SRAM = (32 * 1024) + FLASHRAM = (128 * 1024) + SRAM_3X = (3 * 32 * 1024) + + class __CfgId(IntEnum): + BOOTLOADER_SWITCH = 0 + ROM_WRITE_ENABLE = 1 + ROM_SHADOW_ENABLE = 2 + DD_MODE = 3 + ISV_ENABLE = 4 + BOOT_MODE = 5 + SAVE_TYPE = 6 + CIC_SEED = 7 + TV_TYPE = 8 + FLASH_ERASE_BLOCK = 9 + DD_DRIVE_TYPE = 10 + DD_DISK_STATE = 11 + BUTTON_STATE = 12 + BUTTON_MODE = 13 + + class __UpdateError(IntEnum): + OK = 0 + TOKEN = 1 + CHECKSUM = 2 + SIZE = 3 + UNKNOWN_CHUNK = 4 + READ = 5 + + class __UpdateStatus(IntEnum): + MCU = 1 + FPGA = 2 + BOOTLOADER = 3 + DONE = 0x80 + ERROR = 0xFF + + class __DDMode(IntEnum): + NONE = 0 + REGS = 1 + DDIPL = 2 + FULL = 3 + + class __DDDriveType(IntEnum): + RETAIL = 0 + DEVELOPMENT = 1 + + class __DDDiskState(IntEnum): + EJECTED = 0 + INSERTED = 1 + CHANGED = 2 + + class __ButtonMode(IntEnum): + NONE = 0 + N64_IRQ = 1 + USB_PACKET = 2 + DD_DISK_SWAP = 3 + + class BootMode(IntEnum): + SD = 0 + USB = 1 + ROM = 2 + DDIPL = 3 + DIRECT = 4 + + class SaveType(IntEnum): + NONE = 0 + EEPROM_4K = 1 + EEPROM_16K = 2 + SRAM = 3 + FLASHRAM = 4 + SRAM_X3 = 5 + + class CICSeed(IntEnum): + ALECK = 0xAC + X101 = 0x13F + X102 = 0x3F + X103 = 0x78 + X105 = 0x91 + X106 = 0x85 + DD_JP = 0xDD + DD_US = 0xDE + AUTO = 0xFFFF + + class TVType(IntEnum): + PAL = 0 + NTSC = 1 + MPAL = 2 + AUTO = 3 + + def __init__(self) -> None: + self.__link = SC64Serial() + version = self.__link.execute_cmd(cmd=b'v') + if (version != b'SCv2'): + raise ConnectionException('Unknown SC64 API version') + + def __get_int(self, data: bytes) -> int: + return int.from_bytes(data[:4], byteorder='big') + + def __set_config(self, config: __CfgId, value: int) -> None: + self.__link.execute_cmd(cmd=b'C', args=[config, value]) + + def __get_config(self, config: __CfgId) -> int: + data = self.__link.execute_cmd(cmd=b'c', args=[config, 0]) + return self.__get_int(data) + + def __write_memory(self, address: int, data: bytes) -> None: + if (len(data) > 0): + self.__link.execute_cmd(cmd=b'M', args=[address, len(data)], data=data, timeout=20.0) + + def __read_memory(self, address: int, length: int) -> bytes: + if (length > 0): + return self.__link.execute_cmd(cmd=b'm', args=[address, length], timeout=20.0) + return bytes([]) + + def __erase_flash_region(self, address: int, length: int) -> None: + if (address < self.__Address.FLASH): + raise ValueError('Flash erase address or length outside of possible range') + if ((address + length) > (self.__Address.FLASH + self.__Length.FLASH)): + raise ValueError('Flash erase address or length outside of possible range') + erase_block_size = self.__get_config(self.__CfgId.FLASH_ERASE_BLOCK) + if ((address % erase_block_size != 0) or (length % erase_block_size != 0)): + raise ValueError('Flash erase address or length not aligned to block size') + for offset in range(address, address + length, erase_block_size): + self.__set_config(self.__CfgId.FLASH_ERASE_BLOCK, offset) + + def reset_state(self) -> None: + self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, False) + self.__set_config(self.__CfgId.ROM_SHADOW_ENABLE, False) + self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.NONE) + self.__set_config(self.__CfgId.ISV_ENABLE, False) + self.__set_config(self.__CfgId.BOOT_MODE, self.BootMode.USB) + self.__set_config(self.__CfgId.SAVE_TYPE, self.SaveType.NONE) + self.__set_config(self.__CfgId.CIC_SEED, self.CICSeed.AUTO) + self.__set_config(self.__CfgId.TV_TYPE, self.TVType.AUTO) + self.__set_config(self.__CfgId.DD_DRIVE_TYPE, self.__DDDriveType.RETAIL) + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.NONE) + self.set_cic_parameters() + + def get_state(self): + return { + 'bootloader_switch': bool(self.__get_config(self.__CfgId.BOOTLOADER_SWITCH)), + 'rom_write_enable': bool(self.__get_config(self.__CfgId.ROM_WRITE_ENABLE)), + 'rom_shadow_enable': bool(self.__get_config(self.__CfgId.ROM_SHADOW_ENABLE)), + 'dd_mode': self.__DDMode(self.__get_config(self.__CfgId.DD_MODE)), + 'isv_enable': bool(self.__get_config(self.__CfgId.ISV_ENABLE)), + 'boot_mode': self.BootMode(self.__get_config(self.__CfgId.BOOT_MODE)), + 'save_type': self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)), + 'cic_seed': self.CICSeed(self.__get_config(self.__CfgId.CIC_SEED)), + 'tv_type': self.TVType(self.__get_config(self.__CfgId.TV_TYPE)), + 'flash_erase_block': self.__get_config(self.__CfgId.FLASH_ERASE_BLOCK), + 'dd_drive_type': self.__DDDriveType(self.__get_config(self.__CfgId.DD_DRIVE_TYPE)), + 'dd_disk_state': self.__DDDiskState(self.__get_config(self.__CfgId.DD_DISK_STATE)), + 'button_state': bool(self.__get_config(self.__CfgId.BUTTON_STATE)), + 'button_mode': self.__ButtonMode(self.__get_config(self.__CfgId.BUTTON_MODE)), + } + + def upload_rom(self, data: bytes, use_shadow: bool=True): + rom_length = len(data) + if (rom_length > self.__Length.SDRAM): + raise ValueError('ROM size too big') + sdram_length = rom_length + shadow_enabled = use_shadow and rom_length > (self.__Length.SDRAM - self.__Length.SHADOW) + if (shadow_enabled): + sdram_length = (self.__Length.SDRAM - self.__Length.SHADOW) + shadow_length = rom_length - sdram_length + if (self.__read_memory(self.__Address.SHADOW, shadow_length) != data[sdram_length:]): + self.__erase_flash_region(self.__Address.SHADOW, self.__Length.SHADOW) + self.__write_memory(self.__Address.SHADOW, data[sdram_length:]) + if (self.__read_memory(self.__Address.SHADOW, shadow_length) != data[sdram_length:]): + raise ConnectionException('Shadow ROM program failure') + self.__write_memory(self.__Address.SDRAM, data[:sdram_length]) + self.__set_config(self.__CfgId.ROM_SHADOW_ENABLE, shadow_enabled) + + def upload_ddipl(self, data: bytes) -> None: + if (len(data) > self.__Length.DDIPL): + raise ValueError('DDIPL size too big') + self.__write_memory(self.__Address.DDIPL, data) + + def upload_save(self, data: bytes) -> None: + save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) + if (save_type not in self.SaveType): + raise ConnectionError('Unknown save type fetched from SC64 device') + if (save_type == self.SaveType.NONE): + raise ValueError('No save type set inside SC64 device') + if (len(data) != self.__SaveLength[save_type.name]): + raise ValueError('Wrong save data length') + address = self.__Address.SAVE + if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): + address = self.__Address.EEPROM + self.__write_memory(address, data) + + def download_save(self) -> bytes: + save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) + if (save_type not in self.SaveType): + raise ConnectionError('Unknown save type fetched from SC64 device') + if (save_type == self.SaveType.NONE): + raise ValueError('No save type set inside SC64 device') + address = self.__Address.SAVE + length = self.__SaveLength[save_type.name] + if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): + address = self.__Address.EEPROM + return self.__read_memory(address, length) + + def set_rtc(self, t: datetime) -> None: + to_bcd = lambda v: ((int((v / 10) % 10) << 4) | int(int(v) % 10)) + data = bytes([ + to_bcd(t.weekday() + 1), + to_bcd(t.hour), + to_bcd(t.minute), + to_bcd(t.second), + 0, + to_bcd(t.year), + to_bcd(t.month), + to_bcd(t.day), + ]) + self.__link.execute_cmd(cmd=b'T', args=[self.__get_int(data[0:4]), self.__get_int(data[4:8])]) + + def set_boot_mode(self, mode: BootMode) -> None: + if (mode not in self.BootMode): + raise ValueError('Boot mode outside of allowed values') + self.__set_config(self.__CfgId.BOOT_MODE, mode) + + def set_cic_seed(self, seed: int) -> None: + if (seed != self.CICSeed.AUTO): + if (seed < 0 or seed > 0x1FF): + raise ValueError('CIC seed outside of allowed values') + self.__set_config(self.__CfgId.CIC_SEED, seed) + + def set_tv_type(self, type: TVType) -> None: + if (type not in self.TVType): + raise ValueError('TV type outside of allowed values') + self.__set_config(self.__CfgId.TV_TYPE, type) + + def set_save_type(self, type: SaveType) -> None: + if (type not in self.SaveType): + raise ValueError('Save type outside of allowed values') + self.__set_config(self.__CfgId.SAVE_TYPE, type) + + def set_cic_parameters(self, dd_mode: bool=False, seed: int=0x3F, checksum: bytes=bytes([0xA5, 0x36, 0xC0, 0xF1, 0xD8, 0x59])) -> None: + if (seed < 0 or seed > 0xFF): + raise ValueError('CIC seed outside of allowed values') + if (len(checksum) != 6): + raise ValueError('CIC checksum length outside of allowed values') + data = bytes([1 if dd_mode else 0, seed]) + data = [*data, *checksum] + self.__link.execute_cmd(cmd=b'B', args=[self.__get_int(data[0:4]), self.__get_int(data[4:8])]) + + def update_firmware(self, data: bytes) -> None: + address = self.__Address.FIRMWARE + self.__write_memory(address, data) + response = self.__link.execute_cmd(cmd=b'F', args=[address, len(data)]) + error = self.__UpdateError(self.__get_int(response[0:4])) + if (error != self.__UpdateError.OK): + raise ConnectionException(f'Bad update image [{error.name}]') + status = None + while status != self.__UpdateStatus.DONE: + packet = self.__link.get_packet(timeout=60.0) + if (packet == None): + raise ConnectionException('Update timeout') + (cmd, data) = packet + if (cmd != b'F'): + raise ConnectionException('Wrong update status packet') + status = self.__UpdateStatus(self.__get_int(data)) + print(f'Update status [{status.name}]') + if (status == self.__UpdateStatus.ERROR): + raise ConnectionException('Update error, device is most likely bricked') + time.sleep(2) + + def backup_firmware(self) -> bytes: + address = self.__Address.FIRMWARE + info = self.__link.execute_cmd(cmd=b'f', args=[address, 0], timeout=60.0) + error = self.__UpdateError(self.__get_int(info[0:4])) + length = self.__get_int(info[4:8]) + if (error != self.__UpdateError.OK): + raise ConnectionException('Error while getting firmware backup') + return self.__read_memory(address, length) + + def __handle_dd_packet(self, dd: Optional[DD64Image], data: bytes) -> None: + CMD_READ_BLOCK = 1 + CMD_WRITE_BLOCK = 2 + cmd = self.__get_int(data[0:]) + address = self.__get_int(data[4:]) + track_head_block = self.__get_int(data[8:]) + track = (track_head_block >> 2) & 0xFFF + head = (track_head_block >> 1) & 0x1 + block = track_head_block & 0x1 + try: + if (not dd or not dd.loaded): + raise BadBlockError + if (cmd == CMD_READ_BLOCK): + block_data = dd.read_block(track, head, block) + self.__link.execute_cmd(cmd=b'D', args=[address, len(block_data)], data=block_data) + elif (cmd == CMD_WRITE_BLOCK): + dd.write_block(track, head, block, data[12:]) + self.__link.execute_cmd(cmd=b'd', args=[0, 0]) + else: + self.__link.execute_cmd(cmd=b'd', args=[-1, 0]) + except BadBlockError: + self.__link.execute_cmd(cmd=b'd', args=[1, 0]) + + def __handle_isv_packet(self, data: bytes) -> None: + print(data.decode('EUC-JP', errors='backslashreplace'), end='') + + def __handle_usb_packet(self, data: bytes) -> None: + print(data) + + def debug_loop(self, isv: bool=False, disks: Optional[list[str]]=None) -> None: + dd = None + current_image = 0 + next_image = 0 + + self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, isv) + self.__set_config(self.__CfgId.ISV_ENABLE, isv) + if (isv): + print('IS-Viewer64 support set to [ENABLED]') + if (self.__get_config(self.__CfgId.ROM_SHADOW_ENABLE)): + print('ROM shadow enabled - ISV support will NOT work (use --no-shadow option to disable it)') + + if (disks): + dd = DD64Image() + drive_type = None + for path in disks: + try: + dd.load(path) + if (drive_type == None): + drive_type = dd.get_drive_type() + elif (drive_type != dd.get_drive_type()): + raise ValueError('Disk drive type mismatch') + dd.unload() + except ValueError as e: + dd = None + print(f'64DD disabled, incorrect disk images provided: {e}') + break + if (dd): + self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.FULL) + self.__set_config(self.__CfgId.DD_DRIVE_TYPE, { + 'retail': self.__DDDriveType.RETAIL, + 'development': self.__DDDriveType.DEVELOPMENT + }[drive_type]) + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.USB_PACKET) + print('64DD enabled, loaded disks:') + for disk in disks: + print(f' - {os.path.basename(disk)}') + print('Press button on SC64 device to cycle through provided disks') + + print('Debug loop started, use Ctrl-C to exit') + + try: + while (True): + packet = self.__link.get_packet() + if (packet != None): + (cmd, data) = packet + if (cmd == b'D'): + self.__handle_dd_packet(dd, data) + if (cmd == b'I'): + self.__handle_isv_packet(data) + if (cmd == b'U'): + self.__handle_usb_packet(data) + if (cmd == b'B'): + if (not dd.loaded): + dd.load(disks[next_image]) + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.INSERTED) + current_image = next_image + next_image += 1 + if (next_image >= len(disks)): + next_image = 0 + print(f'64DD disk inserted - {os.path.basename(disks[current_image])}') + else: + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + dd.unload() + print(f'64DD disk ejected - {os.path.basename(disks[current_image])}') + except KeyboardInterrupt: + if (dd and dd.loaded): + self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) + + +class EnumAction(argparse.Action): + def __init__(self, **kwargs): + type = kwargs.pop('type', None) + if type is None: + raise ValueError('No type was provided') + if not issubclass(type, Enum): + raise TypeError('Provided type is not an Enum subclass') + items = (choice.lower().replace('_', '-') for (choice, _) in type.__members__.items()) + kwargs.setdefault('choices', tuple(items)) + super(EnumAction, self).__init__(**kwargs) + self.__enum = type + + def __call__(self, parser, namespace, values, option_string): + key = str(values).upper().replace('-', '_') + value = self.__enum[key] + setattr(namespace, self.dest, value) + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='SC64 control software') + parser.add_argument('--backup', help='backup SC64 firmware and write it to specified file') + parser.add_argument('--update', help='update SC64 firmware from specified file') + parser.add_argument('--reset-state', action='store_true', help='reset SC64 internal state') + parser.add_argument('--print-state', action='store_true', help='print SC64 internal state') + parser.add_argument('--boot', type=SC64.BootMode, action=EnumAction, help='set boot mode') + parser.add_argument('--tv', type=SC64.TVType, action=EnumAction, help='force TV type to set value') + parser.add_argument('--cic', type=SC64.CICSeed, action=EnumAction, help='force CIC seed to set value') + parser.add_argument('--rtc', action='store_true', help='update clock in SC64 to system time') + parser.add_argument('--rom', help='upload ROM from specified file') + parser.add_argument('--no-shadow', action='store_false', help='do not put last 128 kB of ROM inside flash memory (can corrupt non EEPROM saves)') + parser.add_argument('--save-type', type=SC64.SaveType, action=EnumAction, help='set save type') + parser.add_argument('--save', help='upload save from specified file') + parser.add_argument('--backup-save', help='download save and write it to specified file') + parser.add_argument('--ddipl', help='upload 64DD IPL from specified file') + parser.add_argument('--disk', action='append', help='path to 64DD disk (.ndd format), can be specified multiple times') + parser.add_argument('--isv', action='store_true', help='enable IS-Viewer64 support') + parser.add_argument('--debug', action='store_true', help='run debug loop (required for IS-Viewer64 and 64DD)') + + if (len(sys.argv) <= 1): + parser.print_help() + parser.exit() + + args = parser.parse_args() + + try: + sc64 = SC64() + + if (args.backup): + with open(args.backup, 'wb+') as f: + print('Generating backup, this might take a while... ', end='', flush=True) + f.write(sc64.backup_firmware()) + print('done') + + if (args.update): + with open(args.update, 'rb+') as f: + print('Updating firmware, this might take a while... ', end='', flush=True) + sc64.update_firmware(f.read()) + print('done') + + if (args.reset_state): + sc64.reset_state() + print('SC64 internal state reset') + + if (args.print_state): + state = sc64.get_state() + print('Current SC64 internal state:') + for key, value in state.items(): + if (hasattr(value, 'name')): + value = getattr(value, 'name') + print(f' {key}: {value}') + + if (args.boot != None): + sc64.set_boot_mode(args.boot) + print(f'Boot mode set to [{args.boot.name}]') + + if (args.tv != None): + sc64.set_tv_type(args.tv) + print(f'TV type set to [{args.tv.name}]') + + if (args.cic != None): + sc64.set_cic_seed(args.cic) + print(f'CIC seed set to [0x{args.cic:X}]') + + if (args.rtc): + sc64.set_rtc(datetime.now()) + + if (args.rom): + with open(args.rom, 'rb+') as f: + print('Uploading ROM... ', end='', flush=True) + sc64.upload_rom(f.read(), use_shadow=args.no_shadow) + print('done') + + if (args.save_type != None): + sc64.set_save_type(args.save_type) + print(f'Save type set to [{args.save_type.name}]') + + if (args.save): + with open(args.save, 'rb+') as f: + print('Uploading save... ', end='', flush=True) + sc64.upload_save(f.read()) + print('done') + + if (args.ddipl): + with open(args.ddipl, 'rb+') as f: + print('Uploading 64DD IPL... ', end='', flush=True) + sc64.upload_ddipl(f.read()) + print('done') + + if (args.debug): + sc64.debug_loop(isv=args.isv, disks=args.disk) + + if (args.backup_save): + with open(args.backup_save, 'wb+') as f: + print('Downloading save... ', end='', flush=True) + f.write(sc64.download_save()) + print('done') + except ValueError as e: + print(f'\nValue error: {e}') + except ConnectionException as e: + print(f'\nSC64 error: {e}') diff --git a/sw/update/update.py b/sw/update/update.py old mode 100644 new mode 100755 index 3ac476a..5bd7c5c --- a/sw/update/update.py +++ b/sw/update/update.py @@ -1,228 +1,228 @@ -#!/usr/bin/env python3 - -import argparse -import math -import os -import platform -import sys -from binascii import crc32 -from datetime import datetime -from io import BufferedRandom - - - -class JedecError(Exception): - pass - - -class JedecFile: - __fuse_length: int = 0 - __fuse_offset: int = 0 - __fuse_data: bytes = b'' - __byte_buffer: int = 0 - - def __handle_q_field(self, f: BufferedRandom) -> None: - type = f.read(1) - if (type == b'F'): - value = b'' - while (True): - data = f.read(1) - if (data == b'*'): - value = value.decode('ascii', errors='backslashreplace') - if (not value.isdecimal()): - raise JedecError('Invalid Q field data') - self.__fuse_length = int(value) - break - else: - value += data - else: - self.__ignore_field(f) - - def __handle_l_field(self, f: BufferedRandom) -> None: - if (self.__fuse_length <= 0): - raise JedecError('Found fuse data before declaring fuse count') - - offset = b'' - while (True): - data = f.read(1) - if (data >= b'0' and data <= b'9'): - offset += data - elif (data == b'\r' or data == b'\n'): - offset = offset.decode('ascii', errors='backslashreplace') - if (not offset.isdecimal()): - raise JedecError('Invalid L field offset data') - offset = int(offset) - if (offset != self.__fuse_offset): - raise JedecError('Fuse data is not continuous') - break - else: - raise JedecError('Unexpected byte inside L field offset data') - - data = b'' - while (True): - data = f.read(1) - if (data == b'0' or data == b'1'): - shift = (7 - (self.__fuse_offset % 8)) - self.__byte_buffer |= (1 if data == b'1' else 0) << shift - if (((self.__fuse_offset % 8) == 7) or (self.__fuse_offset == (self.__fuse_length - 1))): - self.__fuse_data += int.to_bytes(self.__byte_buffer, 1, byteorder='little') - self.__byte_buffer = 0 - self.__fuse_offset += 1 - elif (data == b'\r' or data == b'\n'): - pass - elif (data == b'*'): - break - elif (data == b''): - raise JedecError('Unexpected end of file') - else: - raise JedecError('Unexpected byte inside L field fuse data') - - def __ignore_field(self, f: BufferedRandom) -> None: - data = None - while (data != b'*'): - data = f.read(1) - if (data == b''): - raise JedecError('Unexpected end of file') - - def parse(self, path: str) -> bytes: - self.__fuse_length = 0 - self.__fuse_offset = 0 - self.__fuse_data = b'' - self.__byte_buffer = 0 - - field = None - with open(path, 'rb+') as f: - while (True): - field = f.read(1) - if (field == b'\x02'): - f.seek(-1, os.SEEK_CUR) - break - elif (field == b''): - raise JedecError('Unexpected end of file') - - while (True): - field = f.read(1) - if (field == b'Q'): - self.__handle_q_field(f) - elif (field == b'L'): - self.__handle_l_field(f) - elif (field == b'\r' or field == b'\n'): - pass - elif (field == b'\x03'): - break - elif (field == b''): - raise JedecError('Unexpected end of file') - else: - self.__ignore_field(f) - - if (self.__fuse_length <= 0): - raise JedecError('No fuse data found') - - if (self.__fuse_offset != self.__fuse_length): - raise JedecError('Missing fuse data inside JEDEC file') - - if (len(self.__fuse_data) != math.ceil(self.__fuse_length / 8)): - raise JedecError('Missing fuse data inside JEDEC file') - - return self.__fuse_data - - -class SC64UpdateData: - __UPDATE_TOKEN = b'SC64 Update v2.0' - - __CHUNK_ID_UPDATE_INFO = 1 - __CHUNK_ID_MCU_DATA = 2 - __CHUNK_ID_FPGA_DATA = 3 - __CHUNK_ID_BOOTLOADER_DATA = 4 - - __data = b'' - - def __int_to_bytes(self, value: int) -> bytes: - return value.to_bytes(4, byteorder='little') - - def __align(self, value: int) -> int: - if (value % 16 != 0): - value += (16 - (value % 16)) - return value - - def __add_chunk(self, id: int, data: bytes) -> None: - chunk = b'' - chunk_length = (16 + len(data)) - aligned_length = self.__align(chunk_length) - chunk += self.__int_to_bytes(id) - chunk += self.__int_to_bytes(aligned_length - 8) - chunk += self.__int_to_bytes(crc32(data)) - chunk += self.__int_to_bytes(len(data)) - chunk += data - chunk += bytes([0] * (aligned_length - chunk_length)) - self.__data += chunk - - def create_update_data(self) -> None: - self.__data = self.__UPDATE_TOKEN - - def add_update_info(self, data: bytes) -> None: - self.__add_chunk(self.__CHUNK_ID_UPDATE_INFO, data) - - def add_mcu_data(self, data: bytes) -> None: - self.__add_chunk(self.__CHUNK_ID_MCU_DATA, data) - - def add_fpga_data(self, data: bytes) -> None: - self.__add_chunk(self.__CHUNK_ID_FPGA_DATA, data) - - def add_bootloader_data(self, data: bytes) -> None: - self.__add_chunk(self.__CHUNK_ID_BOOTLOADER_DATA, data) - - def get_update_data(self) -> bytes: - return self.__data - - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='SC64 update file generator') - parser.add_argument('--git', metavar='git', required=False, help='git text to embed in update info') - parser.add_argument('--mcu', metavar='mcu_path', required=False, help='path to MCU update data') - parser.add_argument('--fpga', metavar='fpga_path', required=False, help='path to FPGA update data') - parser.add_argument('--boot', metavar='bootloader_path', required=False, help='path to N64 bootloader update data') - parser.add_argument('output', metavar='output_path', help='path to final update data') - - if (len(sys.argv) <= 1): - parser.print_help() - parser.exit() - - args = parser.parse_args() - - try: - update = SC64UpdateData() - update.create_update_data() - - hostname = platform.node() - creation_datetime = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - info = [ - f'build system: [{hostname}]', - f'creation datetime: [{creation_datetime}]', - ] - if (args.git): - info.append(args.git) - update_info = ' '.join(info) - print(update_info) - update.add_update_info(update_info.encode()) - - if (args.mcu): - with open(args.mcu, 'rb+') as f: - update.add_mcu_data(f.read()) - - if (args.fpga): - update.add_fpga_data(JedecFile().parse(args.fpga)) - - if (args.boot): - with open(args.boot, 'rb+') as f: - update.add_bootloader_data(f.read()) - - with open(args.output, 'wb+') as f: - f.write(update.get_update_data()) - except JedecError as e: - print(f'Error while parsing FPGA update data: {e}') - exit(-1) - except IOError as e: - print(f'IOError: {e}') - exit(-1) +#!/usr/bin/env python3 + +import argparse +import math +import os +import platform +import sys +from binascii import crc32 +from datetime import datetime +from io import BufferedRandom + + + +class JedecError(Exception): + pass + + +class JedecFile: + __fuse_length: int = 0 + __fuse_offset: int = 0 + __fuse_data: bytes = b'' + __byte_buffer: int = 0 + + def __handle_q_field(self, f: BufferedRandom) -> None: + type = f.read(1) + if (type == b'F'): + value = b'' + while (True): + data = f.read(1) + if (data == b'*'): + value = value.decode('ascii', errors='backslashreplace') + if (not value.isdecimal()): + raise JedecError('Invalid Q field data') + self.__fuse_length = int(value) + break + else: + value += data + else: + self.__ignore_field(f) + + def __handle_l_field(self, f: BufferedRandom) -> None: + if (self.__fuse_length <= 0): + raise JedecError('Found fuse data before declaring fuse count') + + offset = b'' + while (True): + data = f.read(1) + if (data >= b'0' and data <= b'9'): + offset += data + elif (data == b'\r' or data == b'\n'): + offset = offset.decode('ascii', errors='backslashreplace') + if (not offset.isdecimal()): + raise JedecError('Invalid L field offset data') + offset = int(offset) + if (offset != self.__fuse_offset): + raise JedecError('Fuse data is not continuous') + break + else: + raise JedecError('Unexpected byte inside L field offset data') + + data = b'' + while (True): + data = f.read(1) + if (data == b'0' or data == b'1'): + shift = (7 - (self.__fuse_offset % 8)) + self.__byte_buffer |= (1 if data == b'1' else 0) << shift + if (((self.__fuse_offset % 8) == 7) or (self.__fuse_offset == (self.__fuse_length - 1))): + self.__fuse_data += int.to_bytes(self.__byte_buffer, 1, byteorder='little') + self.__byte_buffer = 0 + self.__fuse_offset += 1 + elif (data == b'\r' or data == b'\n'): + pass + elif (data == b'*'): + break + elif (data == b''): + raise JedecError('Unexpected end of file') + else: + raise JedecError('Unexpected byte inside L field fuse data') + + def __ignore_field(self, f: BufferedRandom) -> None: + data = None + while (data != b'*'): + data = f.read(1) + if (data == b''): + raise JedecError('Unexpected end of file') + + def parse(self, path: str) -> bytes: + self.__fuse_length = 0 + self.__fuse_offset = 0 + self.__fuse_data = b'' + self.__byte_buffer = 0 + + field = None + with open(path, 'rb+') as f: + while (True): + field = f.read(1) + if (field == b'\x02'): + f.seek(-1, os.SEEK_CUR) + break + elif (field == b''): + raise JedecError('Unexpected end of file') + + while (True): + field = f.read(1) + if (field == b'Q'): + self.__handle_q_field(f) + elif (field == b'L'): + self.__handle_l_field(f) + elif (field == b'\r' or field == b'\n'): + pass + elif (field == b'\x03'): + break + elif (field == b''): + raise JedecError('Unexpected end of file') + else: + self.__ignore_field(f) + + if (self.__fuse_length <= 0): + raise JedecError('No fuse data found') + + if (self.__fuse_offset != self.__fuse_length): + raise JedecError('Missing fuse data inside JEDEC file') + + if (len(self.__fuse_data) != math.ceil(self.__fuse_length / 8)): + raise JedecError('Missing fuse data inside JEDEC file') + + return self.__fuse_data + + +class SC64UpdateData: + __UPDATE_TOKEN = b'SC64 Update v2.0' + + __CHUNK_ID_UPDATE_INFO = 1 + __CHUNK_ID_MCU_DATA = 2 + __CHUNK_ID_FPGA_DATA = 3 + __CHUNK_ID_BOOTLOADER_DATA = 4 + + __data = b'' + + def __int_to_bytes(self, value: int) -> bytes: + return value.to_bytes(4, byteorder='little') + + def __align(self, value: int) -> int: + if (value % 16 != 0): + value += (16 - (value % 16)) + return value + + def __add_chunk(self, id: int, data: bytes) -> None: + chunk = b'' + chunk_length = (16 + len(data)) + aligned_length = self.__align(chunk_length) + chunk += self.__int_to_bytes(id) + chunk += self.__int_to_bytes(aligned_length - 8) + chunk += self.__int_to_bytes(crc32(data)) + chunk += self.__int_to_bytes(len(data)) + chunk += data + chunk += bytes([0] * (aligned_length - chunk_length)) + self.__data += chunk + + def create_update_data(self) -> None: + self.__data = self.__UPDATE_TOKEN + + def add_update_info(self, data: bytes) -> None: + self.__add_chunk(self.__CHUNK_ID_UPDATE_INFO, data) + + def add_mcu_data(self, data: bytes) -> None: + self.__add_chunk(self.__CHUNK_ID_MCU_DATA, data) + + def add_fpga_data(self, data: bytes) -> None: + self.__add_chunk(self.__CHUNK_ID_FPGA_DATA, data) + + def add_bootloader_data(self, data: bytes) -> None: + self.__add_chunk(self.__CHUNK_ID_BOOTLOADER_DATA, data) + + def get_update_data(self) -> bytes: + return self.__data + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='SC64 update file generator') + parser.add_argument('--git', metavar='git', required=False, help='git text to embed in update info') + parser.add_argument('--mcu', metavar='mcu_path', required=False, help='path to MCU update data') + parser.add_argument('--fpga', metavar='fpga_path', required=False, help='path to FPGA update data') + parser.add_argument('--boot', metavar='bootloader_path', required=False, help='path to N64 bootloader update data') + parser.add_argument('output', metavar='output_path', help='path to final update data') + + if (len(sys.argv) <= 1): + parser.print_help() + parser.exit() + + args = parser.parse_args() + + try: + update = SC64UpdateData() + update.create_update_data() + + hostname = platform.node() + creation_datetime = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + info = [ + f'build system: [{hostname}]', + f'creation datetime: [{creation_datetime}]', + ] + if (args.git): + info.append(args.git) + update_info = ' '.join(info) + print(update_info) + update.add_update_info(update_info.encode()) + + if (args.mcu): + with open(args.mcu, 'rb+') as f: + update.add_mcu_data(f.read()) + + if (args.fpga): + update.add_fpga_data(JedecFile().parse(args.fpga)) + + if (args.boot): + with open(args.boot, 'rb+') as f: + update.add_bootloader_data(f.read()) + + with open(args.output, 'wb+') as f: + f.write(update.get_update_data()) + except JedecError as e: + print(f'Error while parsing FPGA update data: {e}') + exit(-1) + except IOError as e: + print(f'IOError: {e}') + exit(-1)