From 3c2128b811a4066894e330f47c4a41a2a0c46439 Mon Sep 17 00:00:00 2001 From: Mateusz Faderewski Date: Thu, 9 Mar 2023 22:13:15 +0100 Subject: [PATCH] sc64.py is gone --- build.sh | 13 +- docs/06_manufacturing_guidelines.md | 28 +- sw/controller/src/cfg.c | 4 +- sw/controller/src/cfg.h | 2 +- sw/controller/src/usb.c | 19 +- sw/deployer/src/n64.rs | 2 +- sw/pc/.gitignore | 11 - sw/pc/sc64.py | 1422 --------------------------- sw/{update => tools}/.gitignore | 2 +- sw/{pc => tools}/primer.py | 1229 +++++++++++++---------- sw/{pc => tools}/requirements.txt | 1 - sw/{update => tools}/update.py | 12 +- 12 files changed, 737 insertions(+), 2008 deletions(-) delete mode 100644 sw/pc/.gitignore delete mode 100755 sw/pc/sc64.py rename sw/{update => tools}/.gitignore (85%) rename sw/{pc => tools}/primer.py (73%) rename sw/{pc => tools}/requirements.txt (50%) rename sw/{update => tools}/update.py (97%) mode change 100755 => 100644 diff --git a/build.sh b/build.sh index 8be8fd2..6a0c7c9 100755 --- a/build.sh +++ b/build.sh @@ -5,16 +5,15 @@ set -e PACKAGE_FILE_NAME="sc64-extra" TOP_FILES=( - "./sw/pc/primer.py" - "./sw/pc/requirements.txt" - "./sw/pc/sc64.py" - "./sw/update/sc64-firmware.bin" + "./fw/ftdi/ft232h_config.xml" + "./sw/tools/primer.py" + "./sw/tools/requirements.txt" + "./sw/tools/sc64-firmware.bin" ) FILES=( "./assets/*" "./docs/*" - "./fw/ftdi/ft232h_config.xml" "./hw/pcb/sc64_hw_v2.0a_bom.html" "./hw/pcb/sc64v2.kicad_pcb" "./hw/pcb/sc64v2.kicad_pro" @@ -84,7 +83,7 @@ build_update () { build_controller build_fpga - pushd sw/update > /dev/null + pushd sw/tools > /dev/null if [ "$FORCE_CLEAN" = true ]; then rm -f ./sc64-firmware.bin fi @@ -117,7 +116,7 @@ build_release () { zip -j -r $PACKAGE ${TOP_FILES[@]} zip -r $PACKAGE ${FILES[@]} - cp sw/update/sc64-firmware.bin ./sc64-firmware${SC64_VERSION}.bin + cp sw/tools/sc64-firmware.bin ./sc64-firmware${SC64_VERSION}.bin BUILT_RELEASE=true } diff --git a/docs/06_manufacturing_guidelines.md b/docs/06_manufacturing_guidelines.md index b287ee6..4880f60 100644 --- a/docs/06_manufacturing_guidelines.md +++ b/docs/06_manufacturing_guidelines.md @@ -11,7 +11,8 @@ ## Step by step guide how to make SC64 -All necessary manufacturing files are packaged in every `sc64-extra-{version}.zip` file in GitHub releases. Please download latest release before proceeding with the instructions. +All necessary manufacturing files are packaged in every `sc64-extra-{version}.zip` file in GitHub releases. +Please download latest release before proceeding with the instructions. --- @@ -45,13 +46,16 @@ All necessary manufacturing files are packaged in every `sc64-extra-{version}.zi 1. Locate `.stl` files inside `hw/shell` folder 2. Use these files in the slicer and 3D printer of your choice or order ready made prints from 3D printing company - 3. Find matching screws, dimensions are the same as on retail N64 cartridges + 3. Find matching screws, go to discussions tab for community recommendations --- ### **Putting it together** -There are no special requirements for soldering components to board. All chips and connectors can be soldered with standard manual soldering iron, although hot air station is recommended. Interactive BOM has every component and its value highlighted on PCB drawings and it's strongly recommended to use during assembly process. You can skip this step if PCB assembly service was used in previous steps. +There are no special requirements for soldering components to board. +All chips and connectors can be soldered with standard manual soldering iron, although hot air station is recommended. +Interactive BOM has every component and its value highlighted on PCB drawings and it's strongly recommended to use during assembly process. +You can skip this step if PCB assembly service was used in previous steps. --- @@ -59,17 +63,18 @@ There are no special requirements for soldering components to board. All chips a **Please read the following instructions carefully before proceeding with programming.** -For initial programming you are going to need a PC and a USB to UART (serial) adapter (3.3V signaling is required). These steps assume you are using modern Windows OS (version 10 or higher). +For initial programming you are going to need a PC and a USB to UART (serial) adapter (3.3V signaling is required). +These steps assume you are using modern Windows OS (version 10 or higher). As for software here's list of required applications: - [FT_PROG](https://ftdichip.com/utilities/#ft_prog) - FTDI FT232H EEPROM programming software - - [Python 3](https://www.python.org/downloads/) with `pip3` - necessary for initial programming script, `primer.py` + - [Python 3](https://www.python.org/downloads/) with `pip3` - necessary for initial programming script: `primer.py` Programming must be done in specific order for `primer.py` script to work correctly. First, program FT232H EEPROM: 1. Connect SC64 board to the PC with USB-C cable - 2. Locate FT232H EEPROM template in `fw/ftdi` folder + 2. Locate FT232H EEPROM template `ft232h_config.xml` 3. Launch `FT_PROG` software 4. Click on `Scan and parse` if no device has shown up 5. Right click on SC64 device and choose `Apply Template -> From File` @@ -78,18 +83,18 @@ First, program FT232H EEPROM: Your SC64 should be ready for next programming step. -Second, program FPGA, microcontroller and Flash memory: +Second, program FPGA, microcontroller and bootloader: 1. Disconnect SC64 board from power (unplug USB-C cable) 2. Connect serial adapter to `TX/RX/GND` pads marked on the PCB 3. Connect serial adapter to the PC 4. Check in device manager which port number `COMx` is assigned to serial adapter 5. Connect SC64 board to the PC with USB-C cable (***IMPORTANT:*** connect it to the same computer as serial adapter) 6. Locate `primer.py` script in root folder - 7. Make sure these files are located in the same folder as `primer.py` script: `requirements.txt`, `sc64.py`, `sc64-firmware.bin` + 7. Make sure these files are located in the same folder as `primer.py` script: `requirements.txt`, `sc64-firmware.bin` 8. Run `pip3 install -r requirements.txt` to install required python packages 9. Run `python3 primer.py COMx sc64-firmware.bin` (replace `COMx` with port located in step **4**) 10. Follow the instructions on the screen - 11. Wait until programming process has finished (**DO NOT STOP PROGRAMMING PROCESS OR DISCONNECT SC64 BOARD FROM PC**, doing so might irrecoverably break programming through UART header and you would need to program microcontroller, FPGA and bootloader with separate dedicated programming interfaces through *Tag-Connect* connector on the PCB) + 11. Wait until programming process has finished (**DO NOT STOP PROGRAMMING PROCESS OR DISCONNECT SC64 BOARD FROM PC**, doing so might irrecoverably break programming through UART header and you would need to program FPGA and/or microcontroller with separate dedicated programming interfaces through *Tag-Connect* connector on the PCB) Congratulations! Your SC64 flashcart should be ready for use! @@ -99,4 +104,7 @@ Congratulations! Your SC64 flashcart should be ready for use! *`primer.py` threw error on `Bootloader -> SC64 FLASH` step* -This issue can be attributed to incorrectly programmed FT232H EEPROM in the first programming step. Check again in `FT_PROG` program if device was configured properly. Once FPGA and microcontroller has been programmed successfully `primer.py` script needs to be run in special mode. Please use command `python3 primer.py COMx sc64-firmware.bin --only-bootloader` to try programming bootloader again. +This issue can be attributed to incorrectly programmed FT232H EEPROM in the first programming step. +Check again in `FT_PROG` application if device was configured properly. +Once FPGA and microcontroller has been programmed successfully `primer.py` script needs to be run in special mode. +Please use command `python3 primer.py COMx sc64-firmware.bin --bootloader-only` to try programming bootloader again. diff --git a/sw/controller/src/cfg.c b/sw/controller/src/cfg.c index 1e1c8e8..359ac6d 100644 --- a/sw/controller/src/cfg.c +++ b/sw/controller/src/cfg.c @@ -397,10 +397,8 @@ bool cfg_update_setting (uint32_t *args) { return false; } -bool cfg_set_rom_write_enable (bool value) { - uint32_t scr = fpga_reg_get(REG_CFG_SCR); +void cfg_set_rom_write_enable (bool value) { cfg_change_scr_bits(CFG_SCR_ROM_WRITE_ENABLED, value); - return (scr & CFG_SCR_ROM_WRITE_ENABLED); } save_type_t cfg_get_save_type (void) { diff --git a/sw/controller/src/cfg.h b/sw/controller/src/cfg.h index cebc36f..bd67d20 100644 --- a/sw/controller/src/cfg.h +++ b/sw/controller/src/cfg.h @@ -21,7 +21,7 @@ bool cfg_query (uint32_t *args); bool cfg_update (uint32_t *args); bool cfg_query_setting (uint32_t *args); bool cfg_update_setting (uint32_t *args); -bool cfg_set_rom_write_enable (bool value); +void cfg_set_rom_write_enable (bool value); save_type_t cfg_get_save_type (void); void cfg_get_time (uint32_t *args); void cfg_set_time (uint32_t *args); diff --git a/sw/controller/src/usb.c b/sw/controller/src/usb.c index 2ea6a3b..c0a8e3f 100644 --- a/sw/controller/src/usb.c +++ b/sw/controller/src/usb.c @@ -281,7 +281,9 @@ static void usb_rx_process (void) { break; case 'U': - if ((p.read_length > 0) && usb_dma_ready()) { + if (p.rx_args[1] == 0) { + p.rx_state = RX_STATE_IDLE; + } else if ((p.read_length > 0) && usb_dma_ready()) { uint32_t length = (p.read_length > p.rx_args[1]) ? p.rx_args[1] : p.read_length; if (!p.rx_dma_running) { fpga_reg_set(REG_USB_DMA_ADDRESS, p.read_address); @@ -295,9 +297,6 @@ static void usb_rx_process (void) { p.read_length -= length; p.read_address += length; p.read_ready = true; - if (p.rx_args[1] == 0) { - p.rx_state = RX_STATE_IDLE; - } } } break; @@ -328,19 +327,17 @@ static void usb_rx_process (void) { p.response_pending = true; break; - case 'f': { - bool rom_write_enable_restore = cfg_set_rom_write_enable(false); + case 'f': + cfg_set_rom_write_enable(false); p.response_info.data[0] = update_backup(p.rx_args[0], &p.response_info.data[1]); p.rx_state = RX_STATE_IDLE; p.response_pending = true; p.response_error = (p.response_info.data[0] != UPDATE_OK); p.response_info.data_length = 8; - cfg_set_rom_write_enable(rom_write_enable_restore); break; - } - case 'F': { - bool rom_write_enable_restore = cfg_set_rom_write_enable(false); + case 'F': + cfg_set_rom_write_enable(false); p.response_info.data[0] = update_prepare(p.rx_args[0], p.rx_args[1]); p.rx_state = RX_STATE_IDLE; p.response_pending = true; @@ -349,10 +346,8 @@ static void usb_rx_process (void) { p.response_info.done_callback = update_start; } else { p.response_error = true; - cfg_set_rom_write_enable(rom_write_enable_restore); } break; - } case '?': p.rx_state = RX_STATE_IDLE; diff --git a/sw/deployer/src/n64.rs b/sw/deployer/src/n64.rs index f6b25ae..0f681ff 100644 --- a/sw/deployer/src/n64.rs +++ b/sw/deployer/src/n64.rs @@ -10,7 +10,7 @@ pub enum SaveType { Sram128kB, } -const HASH_CHUNK_LENGTH: usize = 256 * 1024; +const HASH_CHUNK_LENGTH: usize = 1 * 1024 * 1024; pub fn guess_save_type( reader: &mut T, diff --git a/sw/pc/.gitignore b/sw/pc/.gitignore deleted file mode 100644 index 0ad4956..0000000 --- a/sw/pc/.gitignore +++ /dev/null @@ -1,11 +0,0 @@ -/__pycache__ -/build -/dist -*.bin -*.eep -*.fla -*.n64 -*.spec -*.srm -*.v64 -*.z64 diff --git a/sw/pc/sc64.py b/sw/pc/sc64.py deleted file mode 100755 index adc091d..0000000 --- a/sw/pc/sc64.py +++ /dev/null @@ -1,1422 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import os -import queue -import serial -import socket -import sys -import time -from binascii import crc32 -from datetime import datetime -from enum import Enum, IntEnum -from io import BufferedReader -from serial.tools import list_ports -from threading import Thread -from typing import Callable, Optional -from PIL import Image - - - -class BadBlockError(Exception): - pass - - -class DD64Image: - __DISK_HEADS = 2 - __DISK_TRACKS = 1175 - __DISK_BLOCKS_PER_TRACK = 2 - __DISK_SECTORS_PER_BLOCK = 85 - __DISK_BAD_TRACKS_PER_ZONE = 12 - __DISK_SYSTEM_SECTOR_SIZE = 232 - __DISK_ZONES = [ - (0, 232, 158, 0), - (0, 216, 158, 158), - (0, 208, 149, 316), - (0, 192, 149, 465), - (0, 176, 149, 614), - (0, 160, 149, 763), - (0, 144, 149, 912), - (0, 128, 114, 1061), - (1, 216, 158, 157), - (1, 208, 158, 315), - (1, 192, 149, 464), - (1, 176, 149, 613), - (1, 160, 149, 762), - (1, 144, 149, 911), - (1, 128, 149, 1060), - (1, 112, 114, 1174), - ] - __DISK_VZONE_TO_PZONE = [ - [0, 1, 2, 9, 8, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10], - [0, 1, 2, 3, 10, 9, 8, 4, 5, 6, 7, 15, 14, 13, 12, 11], - [0, 1, 2, 3, 4, 11, 10, 9, 8, 5, 6, 7, 15, 14, 13, 12], - [0, 1, 2, 3, 4, 5, 12, 11, 10, 9, 8, 6, 7, 15, 14, 13], - [0, 1, 2, 3, 4, 5, 6, 13, 12, 11, 10, 9, 8, 7, 15, 14], - [0, 1, 2, 3, 4, 5, 6, 7, 14, 13, 12, 11, 10, 9, 8, 15], - [0, 1, 2, 3, 4, 5, 6, 7, 15, 14, 13, 12, 11, 10, 9, 8], - ] - __DISK_DRIVE_TYPES = [( - 'development', - 192, - [11, 10, 3, 2], - [0, 1, 8, 9, 16, 17, 18, 19, 20, 21, 22, 23], - ), ( - 'retail', - 232, - [9, 8, 1, 0], - [2, 3, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23], - )] - - __file: Optional[BufferedReader] - __drive_type: Optional[str] - __block_info_table: list[tuple[int, int]] - loaded: bool = False - - def __init__(self) -> None: - self.__file = None - self.__drive_type = None - block_info_table_length = self.__DISK_HEADS * self.__DISK_TRACKS * self.__DISK_BLOCKS_PER_TRACK - self.__block_info_table = [None] * block_info_table_length - - def __del__(self) -> None: - self.unload() - - def __check_system_block(self, lba: int, sector_size: int, check_disk_type: bool) -> tuple[bool, bytes]: - self.__file.seek(lba * self.__DISK_SYSTEM_SECTOR_SIZE * self.__DISK_SECTORS_PER_BLOCK) - system_block_data = self.__file.read(sector_size * self.__DISK_SECTORS_PER_BLOCK) - system_data = system_block_data[:sector_size] - for sector in range(1, self.__DISK_SECTORS_PER_BLOCK): - sector_data = system_block_data[(sector * sector_size):][:sector_size] - if (system_data != sector_data): - return (False, None) - if (check_disk_type): - if (system_data[4] != 0x10): - return (False, None) - if ((system_data[5] & 0xF0) != 0x10): - return (False, None) - return (True, system_data) - - def __parse_disk(self) -> None: - disk_system_data = None - disk_id_data = None - disk_bad_lbas = [] - - drive_index = 0 - while (disk_system_data == None) and (drive_index < len(self.__DISK_DRIVE_TYPES)): - (drive_type, system_sector_size, system_data_lbas, bad_lbas) = self.__DISK_DRIVE_TYPES[drive_index] - disk_bad_lbas.clear() - disk_bad_lbas.extend(bad_lbas) - for system_lba in system_data_lbas: - (valid, system_data) = self.__check_system_block(system_lba, system_sector_size, check_disk_type=True) - if (valid): - self.__drive_type = drive_type - disk_system_data = system_data - else: - disk_bad_lbas.append(system_lba) - drive_index += 1 - - for id_lba in [15, 14]: - (valid, id_data) = self.__check_system_block(id_lba, self.__DISK_SYSTEM_SECTOR_SIZE, check_disk_type=False) - if (valid): - disk_id_data = id_data - else: - disk_bad_lbas.append(id_lba) - - if not (disk_system_data and disk_id_data): - raise ValueError('Provided 64DD disk file is not valid') - - disk_zone_bad_tracks = [] - - for zone in range(len(self.__DISK_ZONES)): - zone_bad_tracks = [] - start = 0 if zone == 0 else system_data[0x07 + zone] - stop = system_data[0x07 + zone + 1] - for offset in range(start, stop): - zone_bad_tracks.append(system_data[0x20 + offset]) - for ignored_track in range(self.__DISK_BAD_TRACKS_PER_ZONE - len(zone_bad_tracks)): - zone_bad_tracks.append(self.__DISK_ZONES[zone][2] - ignored_track - 1) - disk_zone_bad_tracks.append(zone_bad_tracks) - - disk_type = disk_system_data[5] & 0x0F - - current_lba = 0 - starting_block = 0 - disk_file_offset = 0 - - for zone in self.__DISK_VZONE_TO_PZONE[disk_type]: - (head, sector_size, tracks, track) = self.__DISK_ZONES[zone] - - for zone_track in range(tracks): - current_zone_track = ( - (tracks - 1) - zone_track) if head else zone_track - - if (current_zone_track in disk_zone_bad_tracks[zone]): - track += (-1) if head else 1 - continue - - for block in range(self.__DISK_BLOCKS_PER_TRACK): - index = (track << 2) | (head << 1) | (starting_block ^ block) - if (current_lba not in disk_bad_lbas): - self.__block_info_table[index] = (disk_file_offset, sector_size * self.__DISK_SECTORS_PER_BLOCK) - else: - self.__block_info_table[index] = None - disk_file_offset += sector_size * self.__DISK_SECTORS_PER_BLOCK - current_lba += 1 - - track += (-1) if head else 1 - starting_block ^= 1 - - def __check_track_head_block(self, track: int, head: int, block: int) -> None: - if (track < 0 or track >= self.__DISK_TRACKS): - raise ValueError('Track outside of possible range') - if (head < 0 or head >= self.__DISK_HEADS): - raise ValueError('Head outside of possible range') - if (block < 0 or block >= self.__DISK_BLOCKS_PER_TRACK): - raise ValueError('Block outside of possible range') - - def __get_table_index(self, track: int, head: int, block: int) -> int: - return (track << 2) | (head << 1) | (block) - - def __get_block_info(self, track: int, head: int, block: int) -> Optional[tuple[int, int]]: - if (self.__file.closed): - return None - self.__check_track_head_block(track, head, block) - index = self.__get_table_index(track, head, block) - return self.__block_info_table[index] - - def load(self, path: str) -> None: - self.unload() - self.__file = open(path, 'rb+') - self.__parse_disk() - self.loaded = True - - def unload(self) -> None: - self.loaded = False - if (self.__file != None and not self.__file.closed): - self.__file.close() - self.__drive_type = None - - def get_block_info_table(self) -> list[tuple[int, int]]: - return self.__block_info_table - - def get_drive_type(self) -> str: - return self.__drive_type - - def read_block(self, track: int, head: int, block: int) -> bytes: - info = self.__get_block_info(track, head, block) - if (info == None): - raise BadBlockError - (offset, block_size) = info - self.__file.seek(offset) - return self.__file.read(block_size) - - def write_block(self, track: int, head: int, block: int, data: bytes) -> None: - info = self.__get_block_info(track, head, block) - if (info == None): - raise BadBlockError - (offset, block_size) = info - if (len(data) != block_size): - raise ValueError(f'Provided data block size is different than expected ({len(data)} != {block_size})') - self.__file.seek(offset) - self.__file.write(data) - - -class ConnectionException(Exception): - pass - - -class SC64Serial: - __disconnect = False - __serial: Optional[serial.Serial] = None - __thread_read = None - __thread_write = None - __queue_output = queue.Queue() - __queue_input = queue.Queue() - __queue_packet = queue.Queue() - - __VID = 0x0403 - __PID = 0x6014 - - __CHUNK_SIZE = (64 * 1024) - - def __init__(self) -> None: - ports = list_ports.comports() - device_found = False - - if (self.__serial != None and self.__serial.is_open): - raise ConnectionException('Serial port is already open') - - for p in ports: - if (p.vid == self.__VID and p.pid == self.__PID and p.serial_number.startswith('SC64')): - try: - self.__serial = serial.Serial(p.device, timeout=1.0, write_timeout=1.0) - self.__reset_link() - except (serial.SerialException, ConnectionException): - if (self.__serial): - self.__serial.close() - continue - device_found = True - break - - if (not device_found): - raise ConnectionException('No SC64 device was found') - - self.__thread_read = Thread(target=self.__serial_process_input, daemon=True) - self.__thread_write = Thread(target=self.__serial_process_output, daemon=True) - - self.__thread_read.start() - self.__thread_write.start() - - def __del__(self) -> None: - self.__disconnect = True - if (self.__thread_read != None and self.__thread_read.is_alive()): - self.__thread_read.join(1) - if (self.__thread_write != None and self.__thread_write.is_alive()): - self.__thread_write.join(1) - if (self.__serial != None and self.__serial.is_open): - self.__serial.close() - - def __reset_link(self) -> None: - self.__serial.reset_output_buffer() - - retry_counter = 0 - self.__serial.dtr = 1 - while (self.__serial.dsr == 0): - time.sleep(0.1) - retry_counter += 1 - if (retry_counter >= 10): - raise ConnectionException('Could not reset SC64 device') - - self.__serial.reset_input_buffer() - - retry_counter = 0 - self.__serial.dtr = 0 - while (self.__serial.dsr == 1): - time.sleep(0.1) - retry_counter += 1 - if (retry_counter >= 10): - raise ConnectionException('Could not reset SC64 device') - - def __write(self, data: bytes) -> None: - try: - if (self.__disconnect): - raise ConnectionException - for offset in range(0, len(data), self.__CHUNK_SIZE): - self.__serial.write(data[offset:offset + self.__CHUNK_SIZE]) - self.__serial.flush() - except (serial.SerialException, serial.SerialTimeoutException): - raise ConnectionException - - def __read(self, length: int) -> bytes: - try: - data = b'' - while (len(data) < length and not self.__disconnect): - data += self.__serial.read(length - len(data)) - if (self.__disconnect): - raise ConnectionException - return data - except serial.SerialException: - raise ConnectionException - - def __read_int(self) -> int: - return int.from_bytes(self.__read(4), byteorder='big') - - def __serial_process_output(self) -> None: - while (not self.__disconnect and self.__serial != None and self.__serial.is_open): - try: - packet: bytes = self.__queue_output.get(timeout=0.1) - self.__write(packet) - self.__queue_output.task_done() - except queue.Empty: - continue - except ConnectionException: - break - - def __serial_process_input(self) -> None: - while (not self.__disconnect and self.__serial != None and self.__serial.is_open): - try: - token = self.__read(4) - if (len(token) == 4): - identifier = token[0:3] - command = token[3:4] - if (identifier == b'PKT'): - data = self.__read(self.__read_int()) - self.__queue_packet.put((command, data)) - elif (identifier == b'CMP' or identifier == b'ERR'): - data = self.__read(self.__read_int()) - success = identifier == b'CMP' - self.__queue_input.put((command, data, success)) - else: - raise ConnectionException - except ConnectionException: - break - - def __check_threads(self) -> None: - if (not (self.__thread_write.is_alive() and self.__thread_read.is_alive())): - raise ConnectionException('Serial link is closed') - - def __queue_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'') -> None: - if (len(cmd) != 1): - raise ValueError('Length of command is different than 1 byte') - if (len(args) != 2): - raise ValueError('Number of arguments is different than 2') - packet: bytes = b'CMD' - packet += cmd[0:1] - for arg in args: - packet += arg.to_bytes(4, byteorder='big') - packet += data - self.__queue_output.put(packet) - - def __pop_response(self, cmd: bytes, timeout: float, raise_on_err: bool) -> bytes: - try: - (response_cmd, data, success) = self.__queue_input.get(timeout=timeout) - self.__queue_input.task_done() - if (cmd != response_cmd): - raise ConnectionException('CMD wrong command response') - if (raise_on_err and success == False): - raise ConnectionException('CMD response error') - return data - except queue.Empty: - raise ConnectionException('CMD response timeout') - - def execute_cmd(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'', response: bool=True, timeout: float=5.0, raise_on_err: bool=True) -> Optional[bytes]: - self.__check_threads() - self.__queue_cmd(cmd, args, data) - if (response): - return self.__pop_response(cmd, timeout, raise_on_err) - return None - - def get_packet(self, timeout: float=0.1) -> Optional[tuple[bytes, bytes]]: - self.__check_threads() - try: - packet = self.__queue_packet.get(timeout=timeout) - self.__queue_packet.task_done() - return packet - except queue.Empty: - return None - - -class SC64: - class __Address(IntEnum): - MEMORY = 0x0000_0000 - SDRAM = 0x0000_0000 - FLASH = 0x0400_0000 - BUFFER = 0x0500_0000 - EEPROM = 0x0500_2000 - END = 0x0500_297F - FIRMWARE = 0x0200_0000 - DDIPL = 0x03BC_0000 - SAVE = 0x03FE_0000 - EXTENDED = 0x0400_0000 - BOOTLOADER = 0x04E0_0000 - SHADOW = 0x04FE_0000 - - class __Length(IntEnum): - MEMORY = 0x0500_2980 - SDRAM = (64 * 1024 * 1024) - FLASH = (16 * 1024 * 1024) - BUFFER = (8 * 1024) - EEPROM = (2 * 1024) - DDIPL = (4 * 1024 * 1024) - SAVE = (128 * 1024) - EXTENDED = (14 * 1024 * 1024) - BOOTLOADER = (1920 * 1024) - SHADOW = (128 * 1024) - - class __SaveLength(IntEnum): - NONE = 0 - EEPROM_4K = 512 - EEPROM_16K = (2 * 1024) - SRAM = (32 * 1024) - FLASHRAM = (128 * 1024) - SRAM_BANKED = (3 * 32 * 1024) - - class __CfgId(IntEnum): - BOOTLOADER_SWITCH = 0 - ROM_WRITE_ENABLE = 1 - ROM_SHADOW_ENABLE = 2 - DD_MODE = 3 - ISV_ADDRESS = 4 - BOOT_MODE = 5 - SAVE_TYPE = 6 - CIC_SEED = 7 - TV_TYPE = 8 - DD_SD_ENABLE = 9 - DD_DRIVE_TYPE = 10 - DD_DISK_STATE = 11 - BUTTON_STATE = 12 - BUTTON_MODE = 13 - ROM_EXTENDED_ENABLE = 14 - - class __SettingId(IntEnum): - LED_ENABLE = 0 - - class __UpdateError(IntEnum): - OK = 0 - TOKEN = 1 - CHECKSUM = 2 - SIZE = 3 - UNKNOWN_CHUNK = 4 - READ = 5 - - class __UpdateStatus(IntEnum): - MCU = 1 - FPGA = 2 - BOOTLOADER = 3 - DONE = 0x80 - ERROR = 0xFF - - class __DDMode(IntEnum): - NONE = 0 - REGS = 1 - DDIPL = 2 - FULL = 3 - - class __DDDriveType(IntEnum): - RETAIL = 0 - DEVELOPMENT = 1 - - class __DDDiskState(IntEnum): - EJECTED = 0 - INSERTED = 1 - CHANGED = 2 - - class __ButtonMode(IntEnum): - NONE = 0 - N64_IRQ = 1 - USB_PACKET = 2 - DD_DISK_SWAP = 3 - - class BootMode(IntEnum): - MENU = 0 - ROM = 1 - DDIPL = 2 - DIRECT_ROM = 3 - DIRECT_DDIPL = 4 - - class SaveType(IntEnum): - NONE = 0 - EEPROM_4K = 1 - EEPROM_16K = 2 - SRAM = 3 - FLASHRAM = 4 - SRAM_BANKED = 5 - - class CICSeed(IntEnum): - DEFAULT = 0x3F - X103 = 0x78 - X105 = 0x91 - X106 = 0x85 - ALECK = 0xAC - DD_JP = 0xDD - DD_US = 0xDE - AUTO = 0xFFFF - - class TVType(IntEnum): - PAL = 0 - NTSC = 1 - MPAL = 2 - AUTO = 3 - - class __DebugDatatype(IntEnum): - TEXT = 1 - RAWBINARY = 2 - HEADER = 3 - SCREENSHOT = 4 - GDB = 0xDB - - __SUPPORTED_MAJOR_VERSION = 2 - __SUPPORTED_MINOR_VERSION = 12 - - __isv_line_buffer: bytes = b'' - __debug_header: Optional[bytes] = None - __gdb_client: Optional[socket.socket] = None - - def __init__(self) -> None: - self.__link = SC64Serial() - identifier = self.__link.execute_cmd(cmd=b'v') - if (identifier != b'SCv2'): - raise ConnectionException('Unknown SC64 v2 identifier') - - def __get_int(self, data: bytes) -> int: - return int.from_bytes(data[:4], byteorder='big') - - def check_firmware_version(self) -> tuple[str, bool]: - try: - version = self.__link.execute_cmd(cmd=b'V') - major = self.__get_int(version[0:2]) - minor = self.__get_int(version[2:4]) - if (major != self.__SUPPORTED_MAJOR_VERSION): - raise ConnectionException() - if (minor < self.__SUPPORTED_MINOR_VERSION): - raise ConnectionException() - return (f'{major}.{minor}', minor > self.__SUPPORTED_MINOR_VERSION) - except ConnectionException: - raise ConnectionException(f'Unsupported SC64 version [{major}.{minor}], please update firmware') - - def __set_config(self, config: __CfgId, value: int) -> None: - try: - self.__link.execute_cmd(cmd=b'C', args=[config, value]) - except ConnectionException: - raise ValueError(f'Could not set config {config.name} to {value:08X}') - - def __get_config(self, config: __CfgId) -> int: - try: - data = self.__link.execute_cmd(cmd=b'c', args=[config, 0]) - except ConnectionException: - raise ValueError(f'Could not get config {config.name}') - return self.__get_int(data) - - def __set_setting(self, setting: __SettingId, value: int) -> None: - try: - self.__link.execute_cmd(cmd=b'A', args=[setting, value]) - except ConnectionException: - raise ValueError(f'Could not set setting {setting.name} to {value:08X}') - - def __get_setting(self, setting: __SettingId) -> int: - try: - data = self.__link.execute_cmd(cmd=b'a', args=[setting, 0]) - except ConnectionException: - raise ValueError(f'Could not get setting {setting.name}') - return self.__get_int(data) - - def __write_memory(self, address: int, data: bytes) -> None: - if (len(data) > 0): - self.__link.execute_cmd(cmd=b'M', args=[address, len(data)], data=data, timeout=20.0) - - def __read_memory(self, address: int, length: int) -> bytes: - if (length > 0): - return self.__link.execute_cmd(cmd=b'm', args=[address, length], timeout=20.0) - return bytes([]) - - def __dd_set_block_ready(self, error: int) -> None: - self.__link.execute_cmd(cmd=b'D', args=[error, 0]) - - def __flash_wait_busy(self) -> None: - self.__link.execute_cmd(cmd=b'p', args=[True, 0]) - - def __flash_get_erase_block_size(self) -> int: - data = self.__link.execute_cmd(cmd=b'p', args=[False, 0]) - return self.__get_int(data[0:4]) - - def __flash_erase_block(self, address: int) -> None: - self.__link.execute_cmd(cmd=b'P', args=[address, 0]) - - def __erase_flash_region(self, address: int, length: int) -> None: - if (address < self.__Address.FLASH): - raise ValueError('Flash erase address or length outside of possible range') - if ((address + length) > (self.__Address.FLASH + self.__Length.FLASH)): - raise ValueError('Flash erase address or length outside of possible range') - erase_block_size = self.__flash_get_erase_block_size() - if (address % erase_block_size != 0): - raise ValueError('Flash erase address not aligned to block size') - for offset in range(address, address + length, erase_block_size): - self.__flash_erase_block(offset) - - def __program_flash(self, address: int, data: bytes): - program_chunk_size = (128 * 1024) - if (self.__read_memory(address, len(data)) != data): - self.__erase_flash_region(address, len(data)) - for offset in range(0, len(data), program_chunk_size): - self.__write_memory(address + offset, data[offset:offset + program_chunk_size]) - self.__flash_wait_busy() - if (self.__read_memory(address, len(data)) != data): - raise ConnectionException('Flash memory program failure') - - def autodetect_save_type(self, data: bytes) -> SaveType: - if (len(data) < 0x40): - return self.SaveType.NONE - - if (data[0x3C:0x3E] == b'ED'): - save = (data[0x3F] >> 4) & 0x0F - if (save < 0 or save > 6): - return self.SaveType.NONE - save_type_mapping = [ - self.SaveType.NONE, - self.SaveType.EEPROM_4K, - self.SaveType.EEPROM_16K, - self.SaveType.SRAM, - self.SaveType.SRAM_BANKED, - self.SaveType.FLASHRAM, - self.SaveType.SRAM, - ] - return save_type_mapping[save] - - # Original ROM database sourced from ares emulator: https://github.com/ares-emulator/ares/blob/master/mia/medium/nintendo-64.cpp - - rom_id = data[0x3B:0x3E] - region = data[0x3E] - revision = data[0x3F] - - save_type_mapping = [ - ([ - b'NTW', b'NHF', b'NOS', b'NTC', b'NER', b'NAG', b'NAB', b'NS3', b'NTN', b'NBN', b'NBK', b'NFH', - b'NMU', b'NBC', b'NBH', b'NHA', b'NBM', b'NBV', b'NBD', b'NCT', b'NCH', b'NCG', b'NP2', b'NXO', - b'NCU', b'NCX', b'NDY', b'NDQ', b'NDR', b'NN6', b'NDU', b'NJM', b'NFW', b'NF2', b'NKA', b'NFG', - b'NGL', b'NGV', b'NGE', b'NHP', b'NPG', b'NIJ', b'NIC', b'NFY', b'NKI', b'NLL', b'NLR', b'NKT', - b'CLB', b'NLB', b'NMW', b'NML', b'NTM', b'NMI', b'NMG', b'NMO', b'NMS', b'NMR', b'NCR', b'NEA', - b'NPW', b'NPY', b'NPT', b'NRA', b'NWQ', b'NSU', b'NSN', b'NK2', b'NSV', b'NFX', b'NFP', b'NS6', - b'NNA', b'NRS', b'NSW', b'NSC', b'NSA', b'NB6', b'NSS', b'NTX', b'NT6', b'NTP', b'NTJ', b'NRC', - b'NTR', b'NTB', b'NGU', b'NIR', b'NVL', b'NVY', b'NWC', b'NAD', b'NWU', b'NYK', b'NMZ', b'NSM', - b'NWR', - ], self.SaveType.EEPROM_4K), - ([ - b'NB7', b'NGT', b'NFU', b'NCW', b'NCZ', b'ND6', b'NDO', b'ND2', b'N3D', b'NMX', b'NGC', b'NIM', - b'NNB', b'NMV', b'NM8', b'NEV', b'NPP', b'NUB', b'NPD', b'NRZ', b'NR7', b'NEP', b'NYS', - ], self.SaveType.EEPROM_16K), - ([ - b'NTE', b'NVB', b'NB5', b'CFZ', b'NFZ', b'NSI', b'NG6', b'NGP', b'NYW', b'NHY', b'NIB', b'NPS', - b'NPA', b'NP4', b'NJ5', b'NP6', b'NPE', b'NJG', b'CZL', b'NZL', b'NKG', b'NMF', b'NRI', b'NUT', - b'NUM', b'NOB', b'CPS', b'NPM', b'NRE', b'NAL', b'NT3', b'NS4', b'NA2', b'NVP', b'NWL', b'NW2', - b'NWX', - ], self.SaveType.SRAM), - ([ - b'CDZ', - ], self.SaveType.SRAM_BANKED), - ([ - b'NCC', b'NDA', b'NAF', b'NJF', b'NKJ', b'NZS', b'NM6', b'NCK', b'NMQ', b'NPN', b'NPF', b'NPO', - b'CP2', b'NP3', b'NRH', b'NSQ', b'NT9', b'NW4', b'NDP', - ], self.SaveType.FLASHRAM), - ] - - for (rom_id_table, save) in save_type_mapping: - if (rom_id in rom_id_table): - return save - - special_mapping = [ - (b'NKD', b'J', None, self.SaveType.EEPROM_4K), - (b'NWT', b'J', None, self.SaveType.EEPROM_4K), - (b'ND3', b'J', None, self.SaveType.EEPROM_16K), - (b'ND4', b'J', None, self.SaveType.EEPROM_16K), - (b'N3H', b'J', None, self.SaveType.SRAM), - (b'NK4', b'J', 2, self.SaveType.SRAM), - ] - - for (special_rom_id, special_region, special_revision, save) in special_mapping: - if (rom_id != special_rom_id): - continue - if (region != special_region): - continue - if (special_revision != None and revision >= special_revision): - continue - return save - - return self.SaveType.NONE - - def reset_state(self) -> None: - self.__link.execute_cmd(cmd=b'R') - - def get_state(self): - return { - 'bootloader_switch': bool(self.__get_config(self.__CfgId.BOOTLOADER_SWITCH)), - 'rom_write_enable': bool(self.__get_config(self.__CfgId.ROM_WRITE_ENABLE)), - 'rom_shadow_enable': bool(self.__get_config(self.__CfgId.ROM_SHADOW_ENABLE)), - 'dd_mode': self.__DDMode(self.__get_config(self.__CfgId.DD_MODE)), - 'isv_address': self.__get_config(self.__CfgId.ISV_ADDRESS), - 'boot_mode': self.BootMode(self.__get_config(self.__CfgId.BOOT_MODE)), - 'save_type': self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)), - 'cic_seed': self.CICSeed(self.__get_config(self.__CfgId.CIC_SEED)), - 'tv_type': self.TVType(self.__get_config(self.__CfgId.TV_TYPE)), - 'dd_sd_enable': bool(self.__get_config(self.__CfgId.DD_SD_ENABLE)), - 'dd_drive_type': self.__DDDriveType(self.__get_config(self.__CfgId.DD_DRIVE_TYPE)), - 'dd_disk_state': self.__DDDiskState(self.__get_config(self.__CfgId.DD_DISK_STATE)), - 'button_state': bool(self.__get_config(self.__CfgId.BUTTON_STATE)), - 'button_mode': self.__ButtonMode(self.__get_config(self.__CfgId.BUTTON_MODE)), - 'rom_extended_enable': bool(self.__get_config(self.__CfgId.ROM_EXTENDED_ENABLE)), - 'led_enable': bool(self.__get_setting(self.__SettingId.LED_ENABLE)), - } - - def debug_send(self, datatype: __DebugDatatype, data: bytes) -> None: - if (len(data) > (8 * 1024 * 1024)): - raise ValueError('Debug data size too big') - self.__link.execute_cmd(cmd=b'U', args=[datatype, len(data)], data=data, response=False) - - def download_memory(self, address: int, length: int) -> bytes: - if ((address < 0) or (length < 0) or ((address + length) > self.__Address.END)): - raise ValueError('Invalid address or length') - return self.__read_memory(address, length) - - def upload_rom(self, data: bytes, use_shadow: bool=True) -> None: - rom_length = len(data) - if (rom_length > (self.__Length.SDRAM + self.__Length.EXTENDED)): - raise ValueError('ROM size too big') - sdram_length = self.__Length.SDRAM - shadow_enabled = use_shadow and rom_length > (self.__Length.SDRAM - self.__Length.SHADOW) - extended_enabled = rom_length > self.__Length.SDRAM - if (shadow_enabled): - sdram_length = (self.__Length.SDRAM - self.__Length.SHADOW) - shadow_data = data[sdram_length:sdram_length + self.__Length.SHADOW] - self.__program_flash(self.__Address.SHADOW, shadow_data) - self.__set_config(self.__CfgId.ROM_SHADOW_ENABLE, shadow_enabled) - if (extended_enabled): - extended_data = data[self.__Length.SDRAM:] - self.__program_flash(self.__Address.EXTENDED, extended_data) - self.__set_config(self.__CfgId.ROM_EXTENDED_ENABLE, extended_enabled) - self.__write_memory(self.__Address.SDRAM, data[:sdram_length]) - - def upload_ddipl(self, data: bytes) -> None: - if (len(data) > self.__Length.DDIPL): - raise ValueError('DDIPL size too big') - self.__write_memory(self.__Address.DDIPL, data) - - def upload_save(self, data: bytes) -> None: - save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) - if (save_type == self.SaveType.NONE): - raise ValueError('No save type set inside SC64 device') - if (len(data) != self.__SaveLength[save_type.name]): - raise ValueError('Wrong save data length') - address = self.__Address.SAVE - if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): - address = self.__Address.EEPROM - self.__write_memory(address, data) - - def download_save(self) -> bytes: - save_type = self.SaveType(self.__get_config(self.__CfgId.SAVE_TYPE)) - if (save_type == self.SaveType.NONE): - raise ValueError('No save type set inside SC64 device') - address = self.__Address.SAVE - length = self.__SaveLength[save_type.name] - if (save_type == self.SaveType.EEPROM_4K or save_type == self.SaveType.EEPROM_16K): - address = self.__Address.EEPROM - return self.__read_memory(address, length) - - def upload_bootloader(self, data: bytes) -> None: - if (len(data) > self.__Length.BOOTLOADER): - raise ValueError('Bootloader size too big') - padded_data = data + (b'\xFF' * (self.__Length.BOOTLOADER - len(data))) - if (self.__read_memory(self.__Address.BOOTLOADER, self.__Length.BOOTLOADER) != padded_data): - self.__erase_flash_region(self.__Address.BOOTLOADER, self.__Length.BOOTLOADER) - self.__write_memory(self.__Address.BOOTLOADER, data) - self.__flash_wait_busy() - if (self.__read_memory(self.__Address.BOOTLOADER, self.__Length.BOOTLOADER) != padded_data): - raise ConnectionException('Bootloader program failure') - - def set_rtc(self, t: datetime) -> None: - to_bcd = lambda v: ((int((v / 10) % 10) << 4) | int(int(v) % 10)) - data = bytes([ - to_bcd(t.weekday() + 1), - to_bcd(t.hour), - to_bcd(t.minute), - to_bcd(t.second), - 0, - to_bcd(t.year), - to_bcd(t.month), - to_bcd(t.day), - ]) - self.__link.execute_cmd(cmd=b'T', args=[self.__get_int(data[0:4]), self.__get_int(data[4:8])]) - - def set_boot_mode(self, mode: BootMode) -> None: - self.__set_config(self.__CfgId.BOOT_MODE, mode) - - def set_cic_seed(self, seed: int) -> None: - if (seed != self.CICSeed.AUTO): - if (seed < 0 or seed > 0xFF): - raise ValueError('CIC seed outside of allowed values') - self.__set_config(self.__CfgId.CIC_SEED, seed) - - def set_tv_type(self, type: TVType) -> bool: - self.__set_config(self.__CfgId.TV_TYPE, type) - boot_mode = self.__get_config(self.__CfgId.BOOT_MODE) - direct = (boot_mode == self.BootMode.DIRECT_ROM) or (boot_mode == self.BootMode.DIRECT_DDIPL) - return direct - - def set_save_type(self, type: SaveType) -> None: - self.__set_config(self.__CfgId.SAVE_TYPE, type) - - def set_led_enable(self, enabled: bool) -> None: - self.__set_setting(self.__SettingId.LED_ENABLE, enabled) - - def update_firmware(self, data: bytes, status_callback: Optional[Callable[[str], None]]=None) -> None: - address = self.__Address.FIRMWARE - self.__write_memory(address, data) - response = self.__link.execute_cmd(cmd=b'F', args=[address, len(data)], raise_on_err=False) - error = self.__UpdateError(self.__get_int(response[0:4])) - if (error != self.__UpdateError.OK): - raise ConnectionException(f'Bad update image [{error.name}]') - status = None - while status != self.__UpdateStatus.DONE: - packet = self.__link.get_packet(timeout=60.0) - if (packet == None): - raise ConnectionException('Update timeout') - (cmd, data) = packet - if (cmd != b'F'): - raise ConnectionException('Wrong update status packet') - status = self.__UpdateStatus(self.__get_int(data)) - if (status_callback): - status_callback(status.name) - if (status == self.__UpdateStatus.ERROR): - raise ConnectionException('Update error, device is most likely bricked') - time.sleep(2) - - def backup_firmware(self) -> bytes: - address = self.__Address.FIRMWARE - info = self.__link.execute_cmd(cmd=b'f', args=[address, 0], timeout=60.0, raise_on_err=False) - error = self.__UpdateError(self.__get_int(info[0:4])) - length = self.__get_int(info[4:8]) - if (error != self.__UpdateError.OK): - raise ConnectionException('Error while getting firmware backup') - return self.__read_memory(address, length) - - def update_cic_parameters(self, seed: Optional[int]=None, disabled: Optional[bool]=False) -> tuple[int, int, bool, bool]: - if ((seed != None) and (seed < 0 or seed > 0xFF)): - raise ValueError('CIC seed outside of allowed values') - boot_mode = self.__get_config(self.__CfgId.BOOT_MODE) - address = self.__Address.BOOTLOADER - if (boot_mode == self.BootMode.DIRECT_ROM): - address = self.__Address.SDRAM - elif (boot_mode == self.BootMode.DIRECT_DDIPL): - address = self.__Address.DDIPL - ipl3 = self.__read_memory(address, 4096)[0x40:0x1000] - seed = seed if (seed != None) else self.__guess_ipl3_seed(ipl3) - checksum = self.__calculate_ipl3_checksum(ipl3, seed) - data = [(1 << 0) if disabled else 0, seed, *checksum.to_bytes(6, byteorder='big')] - self.__link.execute_cmd(cmd=b'B', args=[self.__get_int(data[0:4]), self.__get_int(data[4:8])]) - direct = (boot_mode == self.BootMode.DIRECT_ROM) or (boot_mode == self.BootMode.DIRECT_DDIPL) - return (seed, checksum, boot_mode == self.BootMode.DIRECT_DDIPL, direct) - - def __guess_ipl3_seed(self, ipl3: bytes) -> int: - checksum = crc32(ipl3) - seed_mapping = { - 0x587BD543: 0xAC, # 5101 - 0x6170A4A1: 0x3F, # 6101 - 0x009E9EA3: 0x3F, # 7102 - 0x90BB6CB5: 0x3F, # 6102/7101 - 0x0B050EE0: 0x78, # x103 - 0x98BC2C86: 0x91, # x105 - 0xACC8580A: 0x85, # x106 - 0x0E018159: 0xDD, # 5167 - 0x10C68B18: 0xDD, # NDXJ0 - 0xBC605D0A: 0xDD, # NDDJ0 - 0x502C4466: 0xDD, # NDDJ1 - 0x0C965795: 0xDD, # NDDJ2 - 0x8FEBA21E: 0xDE, # NDDE0 - } - return seed_mapping[checksum] if checksum in seed_mapping else 0x3F - - def __calculate_ipl3_checksum(self, ipl3: bytes, seed: int) -> int: - _CHECKSUM_MAGIC = 0x6C078965 - - _add = lambda a1, a2: ((a1 + a2) & 0xFFFFFFFF) - _sub = lambda a1, a2: ((a1 - a2) & 0xFFFFFFFF) - _mul = lambda a1, a2: ((a1 * a2) & 0xFFFFFFFF) - _xor = lambda a1, a2: ((a1 ^ a2) & 0xFFFFFFFF) - _lsh = lambda a, s: (((a & 0xFFFFFFFF) << s) & 0xFFFFFFFF) - _rsh = lambda a, s: (((a & 0xFFFFFFFF) >> s) & 0xFFFFFFFF) - - def _get(offset: int) -> int: - offset *= 4 - return int.from_bytes(ipl3[offset:(offset + 4)], byteorder='big') - - def _checksum(a0: int, a1: int, a2: int) -> int: - prod = (a0 * (a2 if (a1 == 0) else a1)) - hi = ((prod >> 32) & 0xFFFFFFFF) - lo = (prod & 0xFFFFFFFF) - diff = ((hi - lo) & 0xFFFFFFFF) - return ((a0 if (diff == 0) else diff) & 0xFFFFFFFF) - - if (seed < 0x00 or seed > 0xFF): - raise ValueError('Invalid seed') - - buffer = [_xor(_add(_mul(_CHECKSUM_MAGIC, seed), 1), _get(0))] * 16 - - for i in range(1, 1009): - data_prev = data_curr if (i > 1) else _get(0) - data_curr = _get(i - 1) - data_next = _get(i) - - buffer[0] = _add(buffer[0], _checksum(_sub(1007, i), data_curr, i)) - buffer[1] = _checksum(buffer[1], data_curr, i) - buffer[2] = _xor(buffer[2], data_curr) - buffer[3] = _add(buffer[3], _checksum(_add(data_curr, 5), _CHECKSUM_MAGIC, i)) - - shift = (data_prev & 0x1F) - data_left = _lsh(data_curr, (32 - shift)) - data_right = _rsh(data_curr, shift) - b4_shifted = (data_left | data_right) - buffer[4] = _add(buffer[4], b4_shifted) - - shift = _rsh(data_prev, 27) - data_left = _lsh(data_curr, shift) - data_right = _rsh(data_curr, (32 - shift)) - b5_shifted = (data_left | data_right) - buffer[5] = _add(buffer[5], b5_shifted) - - if (data_curr < buffer[6]): - buffer[6] = _xor(_add(buffer[3], buffer[6]), _add(data_curr, i)) - else: - buffer[6] = _xor(_add(buffer[4], data_curr), buffer[6]) - - shift = (data_prev & 0x1F) - data_left = _lsh(data_curr, shift) - data_right = _rsh(data_curr, (32 - shift)) - buffer[7] = _checksum(buffer[7], (data_left | data_right), i) - - shift = _rsh(data_prev, 27) - data_left = _lsh(data_curr, (32 - shift)) - data_right = _rsh(data_curr, shift) - buffer[8] = _checksum(buffer[8], (data_left | data_right), i) - - if (data_prev < data_curr): - buffer[9] = _checksum(buffer[9], data_curr, i) - else: - buffer[9] = _add(buffer[9], data_curr) - - if (i == 1008): - break - - buffer[10] = _checksum(_add(buffer[10], data_curr), data_next, i) - buffer[11] = _checksum(_xor(buffer[11], data_curr), data_next, i) - buffer[12] = _add(buffer[12], _xor(buffer[8], data_curr)) - - shift = (data_curr & 0x1F) - data_left = _lsh(data_curr, (32 - shift)) - data_right = _rsh(data_curr, shift) - tmp = (data_left | data_right) - shift = (data_next & 0x1F) - data_left = _lsh(data_next, (32 - shift)) - data_right = _rsh(data_next, shift) - buffer[13] = _add(buffer[13], _add(tmp, (data_left | data_right))) - - shift = (data_curr & 0x1F) - data_left = _lsh(data_next, (32 - shift)) - data_right = _rsh(data_next, shift) - sum = _checksum(buffer[14], b4_shifted, i) - buffer[14] = _checksum(sum, (data_left | data_right), i) - - shift = _rsh(data_curr, 27) - data_left = _lsh(data_next, shift) - data_right = _rsh(data_next, (32 - shift)) - sum = _checksum(buffer[15], b5_shifted, i) - buffer[15] = _checksum(sum, (data_left | data_right), i) - - final_buffer = [buffer[0]] * 4 - - for i in range(16): - data = buffer[i] - - shift = (data & 0x1F) - data_left = _lsh(data, (32 - shift)) - data_right = _rsh(data, shift) - b0_shifted = _add(final_buffer[0], (data_left | data_right)) - final_buffer[0] = b0_shifted - - if (data < b0_shifted): - final_buffer[1] = _add(final_buffer[1], data) - else: - final_buffer[1] = _checksum(final_buffer[1], data, i) - - if (_rsh((data & 0x02), 1) == (data & 0x01)): - final_buffer[2] = _add(final_buffer[2], data) - else: - final_buffer[2] = _checksum(final_buffer[2], data, i) - - if (data & 0x01): - final_buffer[3] = _xor(final_buffer[3], data) - else: - final_buffer[3] = _checksum(final_buffer[3], data, i) - - sum = _checksum(final_buffer[0], final_buffer[1], 16) - xor = _xor(final_buffer[3], final_buffer[2]) - - return ((sum << 32) | xor) & 0xFFFF_FFFFFFFF - - def __generate_filename(self, prefix: str, extension: str) -> str: - return f'{prefix}-{datetime.now().strftime("%y%m%d%H%M%S.%f")}.{extension}' - - def __handle_dd_packet(self, dd: Optional[DD64Image], data: bytes) -> None: - CMD_READ_BLOCK = 1 - CMD_WRITE_BLOCK = 2 - cmd = self.__get_int(data[0:]) - address = self.__get_int(data[4:]) - track_head_block = self.__get_int(data[8:]) - track = (track_head_block >> 2) & 0xFFF - head = (track_head_block >> 1) & 0x1 - block = track_head_block & 0x1 - try: - if (not dd or not dd.loaded): - raise BadBlockError - if (cmd == CMD_READ_BLOCK): - block_data = dd.read_block(track, head, block) - self.__write_memory(address, block_data) - self.__dd_set_block_ready(0) - elif (cmd == CMD_WRITE_BLOCK): - block_data = data[12:] - dd.write_block(track, head, block, block_data) - self.__dd_set_block_ready(0) - else: - self.__dd_set_block_ready(1) - except BadBlockError: - self.__dd_set_block_ready(1) - - def __handle_isv_packet(self, data: bytes) -> None: - self.__isv_line_buffer += data - while (b'\n' in self.__isv_line_buffer): - (line, self.__isv_line_buffer) = self.__isv_line_buffer.split(b'\n', 1) - print(line.decode('EUC-JP', errors='backslashreplace')) - - def __handle_usb_packet(self, data: bytes) -> None: - header = self.__get_int(data[0:4]) - datatype = self.__DebugDatatype((header >> 24) & 0xFF) - length = (header & 0xFFFFFF) - packet = data[4:] - if (len(packet) != length): - print(f'Debug packet length and data length did not match') - elif (datatype == self.__DebugDatatype.TEXT): - print(packet.decode('UTF-8', errors='backslashreplace'), end='') - elif (datatype == self.__DebugDatatype.RAWBINARY): - filename = self.__generate_filename('binaryout', 'bin') - with open(filename, 'wb') as f: - f.write(packet) - print(f'Wrote {len(packet)} bytes to {filename}') - elif (datatype == self.__DebugDatatype.HEADER): - if (len(packet) == 16): - self.__debug_header = packet - else: - print(f'Size of header packet is invalid: {len(packet)}') - elif (datatype == self.__DebugDatatype.SCREENSHOT): - filename = self.__generate_filename('screenshot', 'png') - if (self.__debug_header != None): - header_datatype = self.__get_int(self.__debug_header[0:4]) - pixel_format = self.__get_int(self.__debug_header[4:8]) - image_w = self.__get_int(self.__debug_header[8:12]) - image_h = self.__get_int(self.__debug_header[12:16]) - if (header_datatype == 0x04 and pixel_format != 0 and image_w != 0 and image_h != 0): - mode = 'RGBA' if pixel_format == 4 else 'I;16' - screenshot = Image.frombytes(mode, (image_w, image_h), packet) - screenshot.save(filename) - print(f'Wrote {image_w}x{image_h} pixels to {filename}') - else: - print('Screenshot header data is invalid') - else: - print('Got screenshot packet without header data') - elif (datatype == self.__DebugDatatype.GDB): - self.__handle_gdb_datatype(packet) - - def __handle_gdb_socket(self, gdb_port: int) -> None: - MAX_PACKET_SIZE = 65536 - gdb_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - gdb_socket.bind(('localhost', gdb_port)) - gdb_socket.listen() - while (True): - (self.__gdb_client, address) = gdb_socket.accept() - client_address = f'{address[0]}:{address[1]}' - print(f'[GDB]: New connection ({client_address})') - try: - connected = True - while (connected): - data = self.__gdb_client.recv(MAX_PACKET_SIZE) - if (data): - self.debug_send(self.__DebugDatatype.GDB, data) - else: - connected = False - except: - pass - finally: - self.__gdb_client.close() - print(f'[GDB]: Connection closed ({client_address})') - - def __handle_gdb_datatype(self, data: bytes) -> None: - if (self.__gdb_client): - try: - self.__gdb_client.sendall(data) - except: - pass - - def __handle_debug_input(self) -> None: - running = True - while (running): - try: - command = input() - if (len(command) > 0): - data = b'' - datatype = self.__DebugDatatype.TEXT - if (command.count('@') != 2): - data += f'{command}\0'.encode() - else: - start = command.index('@') - end = command.rindex('@') - filename = command[(start + 1):end] - if (len(filename) == 0): - raise FileNotFoundError - with open(filename, 'rb') as f: - file_data = f.read() - if (command.startswith('@') and command.endswith('@')): - datatype = self.__DebugDatatype.RAWBINARY - data += file_data - else: - data += f'{command[:start]}@{len(file_data)}@'.encode() - data += file_data - data += b'\0' - self.debug_send(datatype, data) - except ValueError as e: - print(f'Error: {e}') - except FileNotFoundError as e: - print(f'Error: Cannot open file {e.filename}') - except EOFError: - running = False - - def debug_loop(self, isv: int=0, disks: Optional[list[str]]=None, gdb_port: Optional[int]=None) -> None: - dd = None - current_image = 0 - next_image = 0 - - self.__set_config(self.__CfgId.ROM_WRITE_ENABLE, 1 if (isv != 0) else 0) - self.__set_config(self.__CfgId.ISV_ADDRESS, isv) - if (isv != 0): - print(f'IS-Viewer64 support set to [ENABLED] at ROM offset [0x{(isv):08X}]') - if (self.__get_config(self.__CfgId.ROM_SHADOW_ENABLE)): - print('ROM shadow enabled - IS-Viewer64 will NOT work (use --no-shadow option while uploading ROM to disable it)') - - if (disks): - dd = DD64Image() - drive_type = None - for path in disks: - try: - dd.load(path) - if (drive_type == None): - drive_type = dd.get_drive_type() - elif (drive_type != dd.get_drive_type()): - raise ValueError('Disk drive type mismatch') - dd.unload() - except ValueError as e: - dd = None - print(f'64DD disabled, incorrect disk images provided: {e}') - break - if (dd): - self.__set_config(self.__CfgId.DD_MODE, self.__DDMode.FULL) - self.__set_config(self.__CfgId.DD_SD_ENABLE, False) - self.__set_config(self.__CfgId.DD_DRIVE_TYPE, { - 'retail': self.__DDDriveType.RETAIL, - 'development': self.__DDDriveType.DEVELOPMENT - }[drive_type]) - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - self.__set_config(self.__CfgId.BUTTON_MODE, self.__ButtonMode.USB_PACKET) - print('64DD enabled, loaded disks:') - for disk in disks: - print(f' - {os.path.basename(disk)}') - print('Press button on SC64 device to cycle through provided disks') - - if (gdb_port): - gdb_thread = Thread(target=lambda: self.__handle_gdb_socket(gdb_port), daemon=True) - gdb_thread.start() - print(f'GDB server started and listening on port [{gdb_port}]') - - print('\nDebug loop started, press Ctrl-C to exit\n') - - try: - thread_input = Thread(target=self.__handle_debug_input, daemon=True) - thread_input.start() - while (thread_input.is_alive()): - packet = self.__link.get_packet() - if (packet != None): - (cmd, data) = packet - if (cmd == b'D'): - self.__handle_dd_packet(dd, data) - if (cmd == b'I'): - self.__handle_isv_packet(data) - if (cmd == b'U'): - self.__handle_usb_packet(data) - if (cmd == b'B'): - if (not dd.loaded): - dd.load(disks[next_image]) - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.INSERTED) - current_image = next_image - next_image += 1 - if (next_image >= len(disks)): - next_image = 0 - print(f'64DD disk inserted - {os.path.basename(disks[current_image])}') - else: - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - dd.unload() - print(f'64DD disk ejected - {os.path.basename(disks[current_image])}') - except KeyboardInterrupt: - pass - finally: - print('\nDebug loop stopped\n') - - if (dd and dd.loaded): - self.__set_config(self.__CfgId.DD_DISK_STATE, self.__DDDiskState.EJECTED) - if (isv != 0): - self.__set_config(self.__CfgId.ISV_ADDRESS, 0) - - -class EnumAction(argparse.Action): - def __init__(self, **kwargs): - type = kwargs.pop('type', None) - if type is None: - raise ValueError('No type was provided') - if not issubclass(type, Enum): - raise TypeError('Provided type is not an Enum subclass') - items = (choice.lower().replace('_', '-') for (choice, _) in type.__members__.items()) - kwargs.setdefault('choices', tuple(items)) - super(EnumAction, self).__init__(**kwargs) - self.__enum = type - - def __call__(self, parser, namespace, values, option_string): - key = str(values).upper().replace('-', '_') - value = self.__enum[key] - setattr(namespace, self.dest, value) - - - -if __name__ == '__main__': - def download_memory_type(argument: str): - params = argument.split(',') - if (len(params) < 2 or len(params) > 3): - raise argparse.ArgumentError() - address = int(params[0], 0) - length = int(params[1], 0) - file = params[2] if len(params) >= 3 else 'sc64dump.bin' - return (address, length, file) - - parser = argparse.ArgumentParser(description='SC64 control software') - parser.add_argument('rom', nargs='?', help='upload ROM from specified file') - parser.add_argument('--backup-firmware', metavar='file', help='backup SC64 firmware and write it to specified file') - parser.add_argument('--update-firmware', metavar='file', help='update SC64 firmware from specified file') - parser.add_argument('--reset-state', action='store_true', help='reset SC64 internal state') - parser.add_argument('--print-state', action='store_true', help='print SC64 internal state') - parser.add_argument('--led-blink', metavar='{yes,no}', help='enable or disable LED I/O activity blinking') - parser.add_argument('--rtc', action='store_true', help='update clock in SC64 to system time') - parser.add_argument('--boot', type=SC64.BootMode, action=EnumAction, help='set boot mode') - parser.add_argument('--tv', type=SC64.TVType, action=EnumAction, help='force TV type to set value, ignored when one of direct boot modes are selected') - parser.add_argument('--no-shadow', action='store_false', help='do not put last 128 kB of ROM inside flash memory (can corrupt non EEPROM saves)') - parser.add_argument('--save-type', type=SC64.SaveType, action=EnumAction, help='set save type') - parser.add_argument('--save', metavar='file', help='upload save from specified file') - parser.add_argument('--backup-save', metavar='file', help='download save and write it to specified file') - parser.add_argument('--ddipl', metavar='file', help='upload 64DD IPL from specified file') - parser.add_argument('--disk', metavar='file', action='append', help='path to 64DD disk (.ndd format), can be specified multiple times') - parser.add_argument('--isv', metavar='offset', type=lambda x: int(x, 0), default=0, help='enable IS-Viewer64 support at provided ROM offset') - parser.add_argument('--gdb', metavar='port', type=int, help='expose TCP socket port for GDB debugging') - parser.add_argument('--debug', action='store_true', help='run debug loop') - parser.add_argument('--download-memory', metavar='address,length,[file]', type=download_memory_type, help='download specified memory region and write it to file') - - if (len(sys.argv) <= 1): - parser.print_help() - parser.exit() - - args = parser.parse_args() - - def fix_rom_endianness(rom: bytes) -> bytes: - data = bytearray(rom) - pi_config = int.from_bytes(rom[0:4], byteorder='big') - if (pi_config == 0x37804012): - data[0::2], data[1::2] = data[1::2], data[0::2] - elif (pi_config == 0x40123780): - data[0::4], data[1::4], data[2::4], data[3::4] = data[3::4], data[2::4], data[1::4], data[0::4] - return bytes(data) - - try: - sc64 = SC64() - autodetected_save_type = None - - if (args.backup_firmware): - with open(args.backup_firmware, 'wb') as f: - print('Generating backup, this might take a while... ', end='', flush=True) - f.write(sc64.backup_firmware()) - print('done') - - if (args.update_firmware): - with open(args.update_firmware, 'rb') as f: - print('Updating firmware, this might take a while... ', end='', flush=True) - status_callback = lambda status: print(f'{status} ', end='', flush=True) - sc64.update_firmware(f.read(), status_callback) - print('done') - - (version, script_outdated) = sc64.check_firmware_version() - - print(f'\x1b[32mSC64 firmware version: [{version}]\x1b[0m') - if (script_outdated): - print('\x1b[33m') - print('[ SC64 firmware is newer than last known version ]') - print('[ Consider downloading latest sc64 executable from ]') - print('[ https://github.com/Polprzewodnikowy/SummerCart64/releases ]') - print('\x1b[0m') - - if (args.reset_state): - sc64.reset_state() - print('SC64 internal state reset') - - if (args.led_blink): - blink = (args.led_blink == 'yes') - sc64.set_led_enable(blink) - print(f'LED blinking set to [{"ENABLED" if blink else "DISABLED"}]') - - if (args.rtc): - value = datetime.now() - sc64.set_rtc(value) - print(f'RTC set to [{value.strftime("%Y-%m-%d %H:%M:%S")}]') - - if (args.rom): - with open(args.rom, 'rb') as f: - rom_data = fix_rom_endianness(f.read()) - print(f'Uploading ROM ({len(rom_data) / (1 * 1024 * 1024):.2f} MiB)... ', end='', flush=True) - sc64.upload_rom(rom_data, use_shadow=args.no_shadow) - autodetected_save_type = sc64.autodetect_save_type(rom_data) - print('done') - - if (args.ddipl): - with open(args.ddipl, 'rb') as f: - print('Uploading 64DD IPL... ', end='', flush=True) - sc64.upload_ddipl(f.read()) - print('done') - - if (args.rom or args.ddipl or args.boot != None): - mode = args.boot - if (mode == None): - mode = SC64.BootMode.ROM if args.rom else SC64.BootMode.DDIPL - sc64.set_boot_mode(mode) - print(f'Boot mode set to [{mode.name}]') - (seed, checksum, dd_mode, direct) = sc64.update_cic_parameters() - if (direct): - print('CIC parameters set to [', end='') - print(f'{"DDIPL" if dd_mode else "ROM"}, ', end='') - print(f'seed: 0x{seed:02X}, checksum: 0x{checksum:012X}', end='') - print(']') - - if (args.rom or args.ddipl or args.tv != None): - tv = args.tv if args.tv else SC64.TVType.AUTO - direct = sc64.set_tv_type(tv) - if (args.tv != None): - print(f'TV type set to [{args.tv.name}]{" (ignored)" if direct else ""}') - - if (args.save_type != None or autodetected_save_type != None): - save_type = args.save_type if args.save_type != None else autodetected_save_type - sc64.set_save_type(save_type) - print(f'Save type set to [{save_type.name}]{" (autodetected)" if autodetected_save_type != None else ""}') - - if (args.save): - with open(args.save, 'rb') as f: - print('Uploading save... ', end='', flush=True) - sc64.upload_save(f.read()) - print('done') - - if (args.print_state): - state = sc64.get_state() - print('Current SC64 internal state:') - for key, value in state.items(): - if (hasattr(value, 'name')): - value = getattr(value, 'name') - print(f' {key}: {value}') - - if (args.debug or args.isv or args.disk or args.gdb): - sc64.debug_loop(isv=args.isv, disks=args.disk, gdb_port=args.gdb) - - if (args.backup_save): - with open(args.backup_save, 'wb') as f: - print('Downloading save... ', end='', flush=True) - f.write(sc64.download_save()) - print('done') - - if (args.download_memory != None): - (address, length, file) = args.download_memory - with open(file, 'wb') as f: - print('Downloading memory... ', end='', flush=True) - f.write(sc64.download_memory(address, length)) - print('done') - except ValueError as e: - print(f'\n\x1b[31mValue error: {e}\x1b[0m\n') - except ConnectionException as e: - print(f'\n\x1b[31mSC64 error: {e}\x1b[0m\n') - except Exception as e: - print(f'\n\x1b[31mUnhandled error "{e.__class__.__name__}": {e}\x1b[0m\n') diff --git a/sw/update/.gitignore b/sw/tools/.gitignore similarity index 85% rename from sw/update/.gitignore rename to sw/tools/.gitignore index 4cfd978..a8a0dce 100644 --- a/sw/update/.gitignore +++ b/sw/tools/.gitignore @@ -1 +1 @@ -*.bin +*.bin diff --git a/sw/pc/primer.py b/sw/tools/primer.py similarity index 73% rename from sw/pc/primer.py rename to sw/tools/primer.py index e2be968..495413b 100644 --- a/sw/pc/primer.py +++ b/sw/tools/primer.py @@ -1,533 +1,696 @@ -#!/usr/bin/env python3 - -import io -import os -import serial -import signal -import sys -import time -from binascii import crc32 -from sc64 import SC64 -from sys import exit -from typing import Callable, Optional - - - -class Utils: - __progress_active = False - - @staticmethod - def log(message: str='') -> None: - print(message) - - @staticmethod - def log_no_end(message: str='') -> None: - print(message, end='', flush=True) - - @staticmethod - def info(message: str='') -> None: - print(f'\033[92m{message}\033[0m') - - @staticmethod - def warning(message: str='') -> None: - print(f'\033[93m{message}\033[0m') - - @staticmethod - def die(reason: str) -> None: - print(f'\033[91m{reason}\033[0m') - exit(-1) - - @property - def get_progress_active(self): - return self.__progress_active - - def progress(self, length: int, position: int, description: str) -> None: - value = ((position / length) * 100.0) - if (position == 0): - self.__progress_active = True - Utils.log_no_end(f'\r{value:5.1f}%: [{description}]') - if (position == length): - Utils.log() - self.__progress_active = False - - def exit_warning(self): - if (self.__progress_active): - Utils.log() - Utils.warning('Ctrl-C is prohibited during bring-up procedure') - - -class SC64UpdateDataException(Exception): - pass - -class SC64UpdateData: - __UPDATE_TOKEN = b'SC64 Update v2.0' - __CHUNK_ID_UPDATE_INFO = 1 - __CHUNK_ID_MCU_DATA = 2 - __CHUNK_ID_FPGA_DATA = 3 - __CHUNK_ID_BOOTLOADER_DATA = 4 - __CHUNK_ID_PRIMER_DATA = 5 - - __update_info: Optional[str] - __mcu_data: Optional[bytes] - __fpga_data: Optional[bytes] - __bootloader_data: Optional[bytes] - __primer_data: Optional[bytes] - - def __load_int(self, f: io.BufferedReader) -> int: - try: - data = f.read(4) - if (len(data) != 4): - raise ValueError('Read size did not match requested amount') - value = int.from_bytes(data, byteorder='little') - except ValueError as e: - raise SC64UpdateDataException(f'Error while reading chunk header: {e}') - return value - - def __load_chunk(self, f: io.BufferedReader) -> tuple[int, bytes]: - id = self.__load_int(f) - aligned_length = self.__load_int(f) - checksum = self.__load_int(f) - data_length = self.__load_int(f) - - data = f.read(data_length) - - align = (aligned_length - 4 - 4 - data_length) - f.seek(align, io.SEEK_CUR) - - if (crc32(data) != checksum): - raise SC64UpdateDataException(f'Invalid checksum for chunk id [{id}] inside update file') - - return (id, data) - - def load(self, path: str, require_all: bool=False) -> None: - self.__update_info = None - self.__mcu_data = None - self.__fpga_data = None - self.__bootloader_data = None - self.__primer_data = None - - try: - with open(path, 'rb') as f: - if (f.read(len(self.__UPDATE_TOKEN)) != self.__UPDATE_TOKEN): - raise SC64UpdateDataException('Invalid update file header') - - while (f.peek(1) != b''): - (id, data) = self.__load_chunk(f) - if (id == self.__CHUNK_ID_UPDATE_INFO): - self.__update_info = data.decode('ascii') - elif (id == self.__CHUNK_ID_MCU_DATA): - self.__mcu_data = data - elif (id == self.__CHUNK_ID_FPGA_DATA): - self.__fpga_data = data - elif (id == self.__CHUNK_ID_BOOTLOADER_DATA): - self.__bootloader_data = data - elif (id == self.__CHUNK_ID_PRIMER_DATA): - self.__primer_data = data - else: - raise SC64UpdateDataException('Unknown chunk inside update file') - - if (require_all): - if (not self.__update_info): - raise SC64UpdateDataException('No update info inside update file') - if (not self.__mcu_data): - raise SC64UpdateDataException('No MCU data inside update file') - if (not self.__fpga_data): - raise SC64UpdateDataException('No FPGA data inside update file') - if (not self.__bootloader_data): - raise SC64UpdateDataException('No bootloader data inside update file') - if (not self.__primer_data): - raise SC64UpdateDataException('No primer data inside update file') - - except IOError as e: - raise SC64UpdateDataException(f'IO error while loading update data: {e}') - - def get_update_info(self) -> Optional[str]: - return self.__update_info - - def get_mcu_data(self) -> Optional[bytes]: - return self.__mcu_data - - def get_fpga_data(self) -> Optional[bytes]: - return self.__fpga_data - - def get_bootloader_data(self) -> Optional[bytes]: - return self.__bootloader_data - - def get_primer_data(self) -> Optional[bytes]: - return self.__primer_data - - -class STM32BootloaderException(Exception): - pass - -class STM32Bootloader: - __INIT = b'\x7F' - __ACK = b'\x79' - __NACK = b'\x1F' - - __MEMORY_RW_MAX_SIZE = 256 - __FLASH_LOAD_ADDRESS = 0x08000000 - __FLASH_MAX_LOAD_SIZE = 0x8000 - __RAM_LOAD_ADDRESS = 0x20001000 - __RAM_MAX_LOAD_SIZE = 0x1000 - - DEV_ID_STM32G030XX = b'\x04\x66' - - __connected = False - - def __init__(self, write: Callable[[bytes], None], read: Callable[[int], bytes], progress: Callable[[int, int, str], None]): - self.__write = write - self.__read = read - self.__progress = progress - - def __append_xor(self, data: bytes) -> bytes: - xor = (0xFF if (len(data) == 1) else 0x00) - for b in data: - xor ^= b - return bytes([*data, xor]) - - def __check_ack(self) -> None: - response = self.__read(1) - if (response == None): - raise STM32BootloaderException('No ACK/NACK byte received') - if (response == self.__NACK): - raise STM32BootloaderException('NACK byte received') - if (response != self.__ACK): - raise STM32BootloaderException('Unknown ACK/NACK byte received') - - def __cmd_send(self, cmd: bytes) -> None: - if (len(cmd) != 1): - raise ValueError('Command must contain only one byte') - self.__write(self.__append_xor(cmd)) - self.__check_ack() - - def __data_write(self, data: bytes) -> None: - self.__write(self.__append_xor(data)) - self.__check_ack() - - def __data_read(self) -> bytes: - length = self.__read(1) - if (len(length) != 1): - raise STM32BootloaderException('Did not receive length byte') - length = length[0] - data = self.__read(length + 1) - self.__check_ack() - return data - - def __get_id(self) -> bytes: - self.__cmd_send(b'\x02') - return self.__data_read() - - def __read_memory(self, address: int, length: int) -> bytes: - if (length == 0 or length > self.__MEMORY_RW_MAX_SIZE): - raise ValueError('Wrong data size for read memory command') - self.__cmd_send(b'\x11') - self.__data_write(address.to_bytes(4, byteorder='big')) - self.__data_write(bytes([length - 1])) - return self.__read(length) - - def __go(self, address: int) -> None: - self.__cmd_send(b'\x21') - self.__data_write(address.to_bytes(4, byteorder='big')) - self.__connected = False - - def __write_memory(self, address: int, data: bytes) -> None: - length = len(data) - if (length == 0 or length > self.__MEMORY_RW_MAX_SIZE): - raise ValueError('Wrong data size for write memory command') - if (((address % 4) != 0) or ((length % 4) != 0)): - raise ValueError('Write memory command requires 4 byte alignment') - self.__cmd_send(b'\x31') - self.__data_write(address.to_bytes(4, byteorder='big')) - self.__data_write(bytes([length - 1, *data])) - - def __mass_erase(self) -> None: - self.__cmd_send(b'\x44') - self.__data_write(b'\xFF\xFF') - - def __load_memory(self, address: int, data: bytes, description: str='') -> None: - length = len(data) - self.__progress(length, 0, description) - for offset in range(0, length, self.__MEMORY_RW_MAX_SIZE): - chunk = data[offset:offset + self.__MEMORY_RW_MAX_SIZE] - self.__write_memory(address + offset, chunk) - verify = self.__read_memory(address + offset, len(chunk)) - if (chunk != verify): - raise STM32BootloaderException('Memory verify failed') - self.__progress(length, offset, description) - self.__progress(length, length, description) - - def connect(self, id: int) -> None: - if (not self.__connected): - self.__write(self.__INIT) - self.__check_ack() - self.__connected = True - dev_id = self.__get_id() - if (dev_id != id): - raise STM32BootloaderException('Unknown chip detected') - - def load_ram_and_run(self, data: bytes, description: str='') -> None: - if (len(data) > self.__RAM_MAX_LOAD_SIZE): - raise STM32BootloaderException('RAM image too big') - self.__load_memory(self.__RAM_LOAD_ADDRESS, data, description) - self.__go(self.__RAM_LOAD_ADDRESS) - - def load_flash_and_run(self, data: bytes, description: str='') -> None: - if (len(data) > self.__FLASH_MAX_LOAD_SIZE): - raise STM32BootloaderException('Flash image too big') - self.__mass_erase() - try: - self.__load_memory(self.__FLASH_LOAD_ADDRESS, data, description) - self.__go(self.__FLASH_LOAD_ADDRESS) - except STM32BootloaderException as e: - self.__mass_erase() - raise STM32BootloaderException(e) - - -class LCMXO2PrimerException(Exception): - pass - -class LCMXO2Primer: - __PRIMER_ID_LCMXO2 = b'MXO2' - - __CMD_GET_PRIMER_ID = b'?' - __CMD_PROBE_FPGA = b'#' - __CMD_RESTART = b'$' - __CMD_GET_DEVICE_ID = b'I' - __CMD_ENABLE_FLASH = b'E' - __CMD_ERASE_FLASH = b'X' - __CMD_RESET_ADDRESS = b'A' - __CMD_WRITE_PAGE = b'W' - __CMD_READ_PAGE = b'R' - __CMD_PROGRAM_DONE = b'F' - __CMD_INIT_FEATBITS = b'Q' - __CMD_REFRESH = b'B' - - __FLASH_PAGE_SIZE = 16 - __FLASH_NUM_PAGES = 11260 - - __FPGA_PROBE_VALUE = b'\x64' - - DEV_ID_LCMXO2_7000HC = b'\x01\x2B\xD0\x43' - - def __init__(self, write: Callable[[bytes], None], read: Callable[[int], bytes], progress: Callable[[int, int, str], None]): - self.__write = write - self.__read = read - self.__progress = progress - - def __cmd_execute(self, cmd: bytes, data: bytes=b'') -> bytes: - if (len(cmd) != 1): - raise ValueError('Command must contain only one byte') - if (len(data) >= 256): - raise ValueError('Data size too big') - - packet = b'CMD' + cmd - packet += len(data).to_bytes(1, byteorder='little') - packet += data - packet += crc32(packet).to_bytes(4, byteorder='little') - self.__write(packet) - - response = self.__read(5) - if (len(response) != 5): - raise LCMXO2PrimerException(f'No response received [{cmd}]') - length = int.from_bytes(response[4:5], byteorder='little') - response += self.__read(length) - calculated_checksum = crc32(response) - received_checksum = int.from_bytes(self.__read(4), byteorder='little') - - if (response[0:3] != b'RSP'): - raise LCMXO2PrimerException(f'Invalid response token [{response[0:3]} / {cmd}]') - if (response[3:4] != cmd): - raise LCMXO2PrimerException(f'Invalid response command [{cmd} / {response[3]}]') - if (calculated_checksum != received_checksum): - raise LCMXO2PrimerException(f'Invalid response checksum [{cmd}]') - - return response[5:] - - def connect(self, id: bytes) -> None: - primer_id = self.__cmd_execute(self.__CMD_GET_PRIMER_ID) - if (primer_id != self.__PRIMER_ID_LCMXO2): - raise LCMXO2PrimerException('Invalid primer ID received') - - dev_id = self.__cmd_execute(self.__CMD_GET_DEVICE_ID) - if (dev_id != id): - raise LCMXO2PrimerException('Invalid FPGA device id received') - - def load_flash_and_run(self, data: bytes, description: str) -> None: - erase_description = f'{description} / Erase' - program_description = f'{description} / Program' - verify_description = f'{description} / Verify' - - length = len(data) - if (length > (self.__FLASH_PAGE_SIZE * self.__FLASH_NUM_PAGES)): - raise LCMXO2PrimerException('FPGA data size too big') - if ((length % self.__FLASH_PAGE_SIZE) != 0): - raise LCMXO2PrimerException('FPGA data size not aligned to page size') - - self.__cmd_execute(self.__CMD_ENABLE_FLASH) - - self.__progress(length, 0, erase_description) - self.__cmd_execute(self.__CMD_ERASE_FLASH) - self.__progress(length, length, erase_description) - - try: - self.__cmd_execute(self.__CMD_RESET_ADDRESS) - self.__progress(length, 0, program_description) - for offset in range(0, length, self.__FLASH_PAGE_SIZE): - page_data = data[offset:(offset + self.__FLASH_PAGE_SIZE)] - self.__cmd_execute(self.__CMD_WRITE_PAGE, page_data) - self.__progress(length, offset, program_description) - self.__progress(length, length, program_description) - - self.__cmd_execute(self.__CMD_RESET_ADDRESS) - self.__progress(length, 0, verify_description) - for offset in range(0, length, self.__FLASH_PAGE_SIZE): - page_data = data[offset:(offset + self.__FLASH_PAGE_SIZE)] - verify_data = self.__cmd_execute(self.__CMD_READ_PAGE) - self.__progress(length, offset, verify_description) - if (page_data != verify_data): - raise LCMXO2PrimerException('FPGA verification error') - self.__progress(length, length, verify_description) - - self.__cmd_execute(self.__CMD_INIT_FEATBITS) - - self.__cmd_execute(self.__CMD_PROGRAM_DONE) - - self.__cmd_execute(self.__CMD_REFRESH) - - if (self.__cmd_execute(self.__CMD_PROBE_FPGA) != self.__FPGA_PROBE_VALUE): - raise LCMXO2PrimerException('Invalid FPGA ID value received') - - except LCMXO2PrimerException as e: - self.__cmd_execute(self.__CMD_ENABLE_FLASH) - self.__cmd_execute(self.__CMD_ERASE_FLASH) - self.__cmd_execute(self.__CMD_REFRESH) - self.__cmd_execute(self.__CMD_RESTART) - raise LCMXO2PrimerException(e) - - self.__cmd_execute(self.__CMD_RESTART) - - -class SC64BringUp: - __SERIAL_BAUD: int = 115200 - __SERIAL_TIMEOUT: float = 6.0 - __INTERVAL_TIME: float = 0.5 - - def __init__(self, progress: Callable[[int, int, str], None]) -> None: - self.__progress = progress - - def load_update_data(self, path: str) -> None: - self.__sc64_update_data = SC64UpdateData() - self.__sc64_update_data.load(path, require_all=True) - - def get_update_info(self) -> str: - return self.__sc64_update_data.get_update_info() - - def start_bring_up(self, port: str, only_bootloader: bool=False) -> None: - link = None - try: - if (not only_bootloader): - link = serial.Serial( - port, - baudrate=self.__SERIAL_BAUD, - parity=serial.PARITY_EVEN, - timeout=self.__SERIAL_TIMEOUT, - write_timeout=self.__SERIAL_TIMEOUT - ) - - stm32_bootloader = STM32Bootloader(link.write, link.read, self.__progress) - lcmxo2_primer = LCMXO2Primer(link.write, link.read, self.__progress) - - stm32_bootloader.connect(stm32_bootloader.DEV_ID_STM32G030XX) - stm32_bootloader.load_ram_and_run(self.__sc64_update_data.get_primer_data(), 'FPGA primer -> STM32 RAM') - time.sleep(self.__INTERVAL_TIME) - link.read_all() - - lcmxo2_primer.connect(lcmxo2_primer.DEV_ID_LCMXO2_7000HC) - lcmxo2_primer.load_flash_and_run(self.__sc64_update_data.get_fpga_data(), 'FPGA configuration -> LCMXO2 FLASH') - time.sleep(self.__INTERVAL_TIME) - link.read_all() - - stm32_bootloader.connect(stm32_bootloader.DEV_ID_STM32G030XX) - stm32_bootloader.load_flash_and_run(self.__sc64_update_data.get_mcu_data(), 'MCU software -> STM32 FLASH') - time.sleep(self.__INTERVAL_TIME) - link.read_all() - - bootloader_description = 'Bootloader -> SC64 FLASH' - bootloader_data = self.__sc64_update_data.get_bootloader_data() - bootloader_length = len(bootloader_data) - self.__progress(bootloader_length, 0, bootloader_description) - SC64().upload_bootloader(bootloader_data) - self.__progress(bootloader_length, bootloader_length, bootloader_description) - finally: - if (link and link.is_open): - link.close() - - -if __name__ == '__main__': - nargs = len(sys.argv) - if (nargs < 3 or nargs > 4): - Utils.die(f'Usage: {sys.argv[0]} serial_port update_file [--only-bootloader]') - - port = sys.argv[1] - update_data_path = sys.argv[2] - only_bootloader = False - if (nargs == 4): - if (sys.argv[3] == '--only-bootloader'): - only_bootloader = True - else: - Utils.die(f'Unknown argument: {sys.argv[3]}') - - utils = Utils() - sc64_bring_up = SC64BringUp(progress=utils.progress) - - Utils.log() - Utils.info('[ Welcome to SC64 flashcart board bring-up! ]') - Utils.log() - - Utils.log(f'Serial port: {port}') - Utils.log(f'Update data path: {os.path.abspath(update_data_path)}') - try: - sc64_bring_up.load_update_data(update_data_path) - except SC64UpdateDataException as e: - Utils.die(f'Provided \'{update_data_path}\' file is invalid: {e}') - Utils.log_no_end('Update info: ') - Utils.log(sc64_bring_up.get_update_info()) - Utils.log() - - if (only_bootloader): - Utils.log('Running in "only bootloader" mode') - Utils.log() - - Utils.warning('[ CAUTION ]') - Utils.warning('Configure FTDI chip with provided ft232h_config.xml before continuing') - Utils.warning('Connect SC64 USB port to the same computer you\'re running this script') - Utils.warning('Make sure SC64 USB port is recognized in system before continuing') - Utils.log() - - Utils.warning('[ IMPORTANT ]') - Utils.warning('Unplug SC64 board from power and reconnect it before proceeding') - Utils.log() - - try: - if (input('Type YES to continue: ') != 'YES'): - Utils.die('No confirmation received. Exiting') - Utils.log() - except KeyboardInterrupt: - Utils.log() - Utils.die('Aborted') - - original_sigint_handler = signal.getsignal(signal.SIGINT) - try: - signal.signal(signal.SIGINT, lambda *kwargs: utils.exit_warning()) - sc64_bring_up.start_bring_up(port, only_bootloader) - except (serial.SerialException, STM32BootloaderException, LCMXO2PrimerException) as e: - if (utils.get_progress_active): - Utils.log() - Utils.die(f'Error while running bring-up: {e}') - finally: - signal.signal(signal.SIGINT, original_sigint_handler) - - Utils.log() - Utils.info('[ SC64 flashcart board bring-up finished successfully! ]') - Utils.log() +#!/usr/bin/env python3 + +import io +import os +import queue +import serial +import signal +import sys +import time +from binascii import crc32 +from enum import IntEnum +from serial.tools import list_ports +from sys import exit +from typing import Callable, Optional + + + +class Utils: + __progress_active = False + + @staticmethod + def log(message: str='') -> None: + print(message) + + @staticmethod + def log_no_end(message: str='') -> None: + print(message, end='', flush=True) + + @staticmethod + def info(message: str='') -> None: + print(f'\033[92m{message}\033[0m') + + @staticmethod + def warning(message: str='') -> None: + print(f'\033[93m{message}\033[0m') + + @staticmethod + def die(reason: str) -> None: + print(f'\033[91m{reason}\033[0m') + exit(-1) + + @property + def get_progress_active(self): + return self.__progress_active + + def progress(self, length: int, position: int, description: str) -> None: + value = ((position / length) * 100.0) + if (position == 0): + self.__progress_active = True + Utils.log_no_end(f'\r{value:5.1f}%: [{description}]') + if (position == length): + Utils.log() + self.__progress_active = False + + def exit_warning(self): + if (self.__progress_active): + Utils.log() + Utils.warning('Ctrl-C is prohibited during bring-up procedure') + + +class SC64UpdateDataException(Exception): + pass + +class SC64UpdateData: + __UPDATE_TOKEN = b'SC64 Update v2.0' + + __CHUNK_ID_UPDATE_INFO = 1 + __CHUNK_ID_MCU_DATA = 2 + __CHUNK_ID_FPGA_DATA = 3 + __CHUNK_ID_BOOTLOADER_DATA = 4 + __CHUNK_ID_PRIMER_DATA = 5 + + __update_info: Optional[str] + __mcu_data: Optional[bytes] + __fpga_data: Optional[bytes] + __bootloader_data: Optional[bytes] + __primer_data: Optional[bytes] + + def __int_to_bytes(self, value: int) -> bytes: + return value.to_bytes(4, byteorder='little') + + def __align(self, value: int) -> int: + if (value % 16 != 0): + value += (16 - (value % 16)) + return value + + def __load_int(self, f: io.BufferedReader) -> int: + try: + data = f.read(4) + if (len(data) != 4): + raise ValueError('Read size did not match requested amount') + value = int.from_bytes(data, byteorder='little') + except ValueError as e: + raise SC64UpdateDataException(f'Error while reading chunk header: {e}') + return value + + def __load_chunk(self, f: io.BufferedReader) -> tuple[int, bytes]: + id = self.__load_int(f) + aligned_length = self.__load_int(f) + checksum = self.__load_int(f) + data_length = self.__load_int(f) + + data = f.read(data_length) + + align = (aligned_length - 4 - 4 - data_length) + f.seek(align, io.SEEK_CUR) + + if (crc32(data) != checksum): + raise SC64UpdateDataException(f'Invalid checksum for chunk id [{id}] inside update file') + + return (id, data) + + def load(self, path: str, require_all: bool=False) -> None: + self.__update_info = None + self.__mcu_data = None + self.__fpga_data = None + self.__bootloader_data = None + self.__primer_data = None + + try: + with open(path, 'rb') as f: + if (f.read(len(self.__UPDATE_TOKEN)) != self.__UPDATE_TOKEN): + raise SC64UpdateDataException('Invalid update file header') + + while (f.peek(1) != b''): + (id, data) = self.__load_chunk(f) + if (id == self.__CHUNK_ID_UPDATE_INFO): + self.__update_info = data.decode('ascii') + elif (id == self.__CHUNK_ID_MCU_DATA): + self.__mcu_data = data + elif (id == self.__CHUNK_ID_FPGA_DATA): + self.__fpga_data = data + elif (id == self.__CHUNK_ID_BOOTLOADER_DATA): + self.__bootloader_data = data + elif (id == self.__CHUNK_ID_PRIMER_DATA): + self.__primer_data = data + else: + raise SC64UpdateDataException('Unknown chunk inside update file') + + if (require_all): + if (not self.__update_info): + raise SC64UpdateDataException('No update info inside update file') + if (not self.__mcu_data): + raise SC64UpdateDataException('No MCU data inside update file') + if (not self.__fpga_data): + raise SC64UpdateDataException('No FPGA data inside update file') + if (not self.__bootloader_data): + raise SC64UpdateDataException('No bootloader data inside update file') + if (not self.__primer_data): + raise SC64UpdateDataException('No primer data inside update file') + + except IOError as e: + raise SC64UpdateDataException(f'IO error while loading update data: {e}') + + def get_update_info(self) -> Optional[str]: + return self.__update_info + + def get_mcu_data(self) -> Optional[bytes]: + return self.__mcu_data + + def get_fpga_data(self) -> Optional[bytes]: + return self.__fpga_data + + def get_bootloader_data(self) -> Optional[bytes]: + return self.__bootloader_data + + def get_primer_data(self) -> Optional[bytes]: + return self.__primer_data + + def create_bootloader_only_firmware(self): + if (self.__bootloader_data == None): + raise SC64UpdateDataException('No bootloader data available for firmware creation') + + chunk = b'' + chunk += self.__int_to_bytes(self.__CHUNK_ID_BOOTLOADER_DATA) + chunk += self.__int_to_bytes(8 + self.__align(len(self.__bootloader_data))) + chunk += self.__int_to_bytes(crc32(self.__bootloader_data)) + chunk += self.__int_to_bytes(len(self.__bootloader_data)) + chunk += self.__bootloader_data + chunk += bytes([0] * (self.__align(len(chunk)) - len(chunk))) + + data = b'' + data += self.__UPDATE_TOKEN + data += chunk + + return data + + +class STM32BootloaderException(Exception): + pass + +class STM32Bootloader: + __INIT = b'\x7F' + __ACK = b'\x79' + __NACK = b'\x1F' + + __MEMORY_RW_MAX_SIZE = 256 + __FLASH_LOAD_ADDRESS = 0x08000000 + __FLASH_MAX_LOAD_SIZE = 0x8000 + __RAM_LOAD_ADDRESS = 0x20001000 + __RAM_MAX_LOAD_SIZE = 0x1000 + + DEV_ID_STM32G030XX = b'\x04\x66' + + __connected = False + + def __init__(self, write: Callable[[bytes], None], read: Callable[[int], bytes], progress: Callable[[int, int, str], None]): + self.__write = write + self.__read = read + self.__progress = progress + + def __append_xor(self, data: bytes) -> bytes: + xor = (0xFF if (len(data) == 1) else 0x00) + for b in data: + xor ^= b + return bytes([*data, xor]) + + def __check_ack(self) -> None: + response = self.__read(1) + if (response == None): + raise STM32BootloaderException('No ACK/NACK byte received') + if (response == self.__NACK): + raise STM32BootloaderException('NACK byte received') + if (response != self.__ACK): + raise STM32BootloaderException('Unknown ACK/NACK byte received') + + def __cmd_send(self, cmd: bytes) -> None: + if (len(cmd) != 1): + raise ValueError('Command must contain only one byte') + self.__write(self.__append_xor(cmd)) + self.__check_ack() + + def __data_write(self, data: bytes) -> None: + self.__write(self.__append_xor(data)) + self.__check_ack() + + def __data_read(self) -> bytes: + length = self.__read(1) + if (len(length) != 1): + raise STM32BootloaderException('Did not receive length byte') + length = length[0] + data = self.__read(length + 1) + self.__check_ack() + return data + + def __get_id(self) -> bytes: + self.__cmd_send(b'\x02') + return self.__data_read() + + def __read_memory(self, address: int, length: int) -> bytes: + if (length == 0 or length > self.__MEMORY_RW_MAX_SIZE): + raise ValueError('Wrong data size for read memory command') + self.__cmd_send(b'\x11') + self.__data_write(address.to_bytes(4, byteorder='big')) + self.__data_write(bytes([length - 1])) + return self.__read(length) + + def __go(self, address: int) -> None: + self.__cmd_send(b'\x21') + self.__data_write(address.to_bytes(4, byteorder='big')) + self.__connected = False + + def __write_memory(self, address: int, data: bytes) -> None: + length = len(data) + if (length == 0 or length > self.__MEMORY_RW_MAX_SIZE): + raise ValueError('Wrong data size for write memory command') + if (((address % 4) != 0) or ((length % 4) != 0)): + raise ValueError('Write memory command requires 4 byte alignment') + self.__cmd_send(b'\x31') + self.__data_write(address.to_bytes(4, byteorder='big')) + self.__data_write(bytes([length - 1, *data])) + + def __mass_erase(self) -> None: + self.__cmd_send(b'\x44') + self.__data_write(b'\xFF\xFF') + + def __load_memory(self, address: int, data: bytes, description: str='') -> None: + length = len(data) + self.__progress(length, 0, description) + for offset in range(0, length, self.__MEMORY_RW_MAX_SIZE): + chunk = data[offset:offset + self.__MEMORY_RW_MAX_SIZE] + self.__write_memory(address + offset, chunk) + verify = self.__read_memory(address + offset, len(chunk)) + if (chunk != verify): + raise STM32BootloaderException('Memory verify failed') + self.__progress(length, offset, description) + self.__progress(length, length, description) + + def connect(self, id: int) -> None: + if (not self.__connected): + self.__write(self.__INIT) + self.__check_ack() + self.__connected = True + dev_id = self.__get_id() + if (dev_id != id): + raise STM32BootloaderException('Unknown chip detected') + + def load_ram_and_run(self, data: bytes, description: str='') -> None: + if (len(data) > self.__RAM_MAX_LOAD_SIZE): + raise STM32BootloaderException('RAM image too big') + self.__load_memory(self.__RAM_LOAD_ADDRESS, data, description) + self.__go(self.__RAM_LOAD_ADDRESS) + + def load_flash_and_run(self, data: bytes, description: str='') -> None: + if (len(data) > self.__FLASH_MAX_LOAD_SIZE): + raise STM32BootloaderException('Flash image too big') + self.__mass_erase() + try: + self.__load_memory(self.__FLASH_LOAD_ADDRESS, data, description) + self.__go(self.__FLASH_LOAD_ADDRESS) + except STM32BootloaderException as e: + self.__mass_erase() + raise STM32BootloaderException(e) + + +class LCMXO2PrimerException(Exception): + pass + +class LCMXO2Primer: + __PRIMER_ID_LCMXO2 = b'MXO2' + + __CMD_GET_PRIMER_ID = b'?' + __CMD_PROBE_FPGA = b'#' + __CMD_RESTART = b'$' + __CMD_GET_DEVICE_ID = b'I' + __CMD_ENABLE_FLASH = b'E' + __CMD_ERASE_FLASH = b'X' + __CMD_RESET_ADDRESS = b'A' + __CMD_WRITE_PAGE = b'W' + __CMD_READ_PAGE = b'R' + __CMD_PROGRAM_DONE = b'F' + __CMD_INIT_FEATBITS = b'Q' + __CMD_REFRESH = b'B' + + __FLASH_PAGE_SIZE = 16 + __FLASH_NUM_PAGES = 11260 + + __FPGA_PROBE_VALUE = b'\x64' + + DEV_ID_LCMXO2_7000HC = b'\x01\x2B\xD0\x43' + + def __init__(self, write: Callable[[bytes], None], read: Callable[[int], bytes], progress: Callable[[int, int, str], None]): + self.__write = write + self.__read = read + self.__progress = progress + + def __cmd_execute(self, cmd: bytes, data: bytes=b'') -> bytes: + if (len(cmd) != 1): + raise ValueError('Command must contain only one byte') + if (len(data) >= 256): + raise ValueError('Data size too big') + + packet = b'CMD' + cmd + packet += len(data).to_bytes(1, byteorder='little') + packet += data + packet += crc32(packet).to_bytes(4, byteorder='little') + self.__write(packet) + + response = self.__read(5) + if (len(response) != 5): + raise LCMXO2PrimerException(f'No response received [{cmd}]') + length = int.from_bytes(response[4:5], byteorder='little') + response += self.__read(length) + calculated_checksum = crc32(response) + received_checksum = int.from_bytes(self.__read(4), byteorder='little') + + if (response[0:3] != b'RSP'): + raise LCMXO2PrimerException(f'Invalid response token [{response[0:3]} / {cmd}]') + if (response[3:4] != cmd): + raise LCMXO2PrimerException(f'Invalid response command [{cmd} / {response[3]}]') + if (calculated_checksum != received_checksum): + raise LCMXO2PrimerException(f'Invalid response checksum [{cmd}]') + + return response[5:] + + def connect(self, id: bytes) -> None: + primer_id = self.__cmd_execute(self.__CMD_GET_PRIMER_ID) + if (primer_id != self.__PRIMER_ID_LCMXO2): + raise LCMXO2PrimerException('Invalid primer ID received') + + dev_id = self.__cmd_execute(self.__CMD_GET_DEVICE_ID) + if (dev_id != id): + raise LCMXO2PrimerException('Invalid FPGA device id received') + + def load_flash_and_run(self, data: bytes, description: str) -> None: + erase_description = f'{description} / Erase' + program_description = f'{description} / Program' + verify_description = f'{description} / Verify' + + length = len(data) + if (length > (self.__FLASH_PAGE_SIZE * self.__FLASH_NUM_PAGES)): + raise LCMXO2PrimerException('FPGA data size too big') + if ((length % self.__FLASH_PAGE_SIZE) != 0): + raise LCMXO2PrimerException('FPGA data size not aligned to page size') + + self.__cmd_execute(self.__CMD_ENABLE_FLASH) + + self.__progress(length, 0, erase_description) + self.__cmd_execute(self.__CMD_ERASE_FLASH) + self.__progress(length, length, erase_description) + + try: + self.__cmd_execute(self.__CMD_RESET_ADDRESS) + self.__progress(length, 0, program_description) + for offset in range(0, length, self.__FLASH_PAGE_SIZE): + page_data = data[offset:(offset + self.__FLASH_PAGE_SIZE)] + self.__cmd_execute(self.__CMD_WRITE_PAGE, page_data) + self.__progress(length, offset, program_description) + self.__progress(length, length, program_description) + + self.__cmd_execute(self.__CMD_RESET_ADDRESS) + self.__progress(length, 0, verify_description) + for offset in range(0, length, self.__FLASH_PAGE_SIZE): + page_data = data[offset:(offset + self.__FLASH_PAGE_SIZE)] + verify_data = self.__cmd_execute(self.__CMD_READ_PAGE) + self.__progress(length, offset, verify_description) + if (page_data != verify_data): + raise LCMXO2PrimerException('FPGA verification error') + self.__progress(length, length, verify_description) + + self.__cmd_execute(self.__CMD_INIT_FEATBITS) + + self.__cmd_execute(self.__CMD_PROGRAM_DONE) + + self.__cmd_execute(self.__CMD_REFRESH) + + if (self.__cmd_execute(self.__CMD_PROBE_FPGA) != self.__FPGA_PROBE_VALUE): + raise LCMXO2PrimerException('Invalid FPGA ID value received') + + except LCMXO2PrimerException as e: + self.__cmd_execute(self.__CMD_ENABLE_FLASH) + self.__cmd_execute(self.__CMD_ERASE_FLASH) + self.__cmd_execute(self.__CMD_REFRESH) + self.__cmd_execute(self.__CMD_RESTART) + raise LCMXO2PrimerException(e) + + self.__cmd_execute(self.__CMD_RESTART) + + +class SC64Exception(Exception): + pass + +class SC64: + __serial: Optional[serial.Serial] = None + __packets = queue.Queue() + + class __UpdateStatus(IntEnum): + MCU = 1 + FPGA = 2 + BOOTLOADER = 3 + DONE = 0x80 + ERROR = 0xFF + + def __init__(self) -> None: + SC64_VID = 0x0403 + SC64_PID = 0x6014 + SC64_SID = "SC64" + for p in list_ports.comports(): + if (p.vid == SC64_VID and p.pid == SC64_PID and p.serial_number.startswith(SC64_SID)): + try: + self.__serial = serial.Serial(p.device, timeout=10.0, write_timeout=10.0) + except serial.SerialException: + if (self.__serial): + self.__serial.close() + continue + return + raise SC64Exception('No SC64 USB device found') + + def __reset(self) -> None: + WAIT_DURATION = 0.01 + RETRY_COUNT = 100 + + self.__serial.dtr = 1 + for n in range(0, RETRY_COUNT + 1): + self.__serial.reset_input_buffer() + self.__serial.reset_output_buffer() + time.sleep(WAIT_DURATION) + if (self.__serial.dsr == 1): + break + if n == RETRY_COUNT: + raise SC64Exception('Couldn\'t reset SC64 device (on)') + + self.__serial.dtr = 0 + for n in range(0, RETRY_COUNT + 1): + time.sleep(WAIT_DURATION) + if (self.__serial.dsr == 0): + break + if n == RETRY_COUNT: + raise SC64Exception('Couldn\'t reset SC64 device (on)') + + def __process_incoming_data(self, wait_for_response: bool) -> Optional[tuple[bytes, bytes]]: + while (wait_for_response or self.__serial.in_waiting >= 4): + buffer = self.__serial.read(4) + token = buffer[0:3] + id = buffer[3:4] + if (token == b'CMP'): + length = int.from_bytes(self.__serial.read(4), byteorder='big') + data = self.__serial.read(length) + return (id, data) + elif (token == b'PKT'): + length = int.from_bytes(self.__serial.read(4), byteorder='big') + data = self.__serial.read(length) + self.__packets.put((id, data)) + if (not wait_for_response): + break + elif (token == b'ERR'): + raise SC64Exception('Command response error') + else: + raise SC64Exception('Invalid token received') + return None + + def __execute_command(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'') -> bytes: + if (len(cmd) != 1): + raise SC64Exception('Length of command is different than 1 byte') + if (len(args) != 2): + raise SC64Exception('Number of arguments is different than 2') + try: + self.__serial.write(b'CMD' + cmd) + self.__serial.write(args[0].to_bytes(4, byteorder='big')) + self.__serial.write(args[1].to_bytes(4, byteorder='big')) + if (len(data) > 0): + self.__serial.write(data) + self.__serial.flush() + (id, response) = self.__process_incoming_data(True) + if (cmd != id): + raise SC64Exception('Command response ID didn\'t match') + return response + except serial.SerialException as e: + raise SC64Exception(f'Serial exception: {e}') + + def __receive_data_packet(self) -> Optional[tuple[bytes, bytes]]: + if (self.__packets.empty()): + try: + if (self.__process_incoming_data(False) != None): + raise SC64Exception('Unexpected command response') + except serial.SerialException as e: + raise SC64Exception(f'Serial exception: {e}') + if (not self.__packets.empty()): + packet = self.__packets.get() + self.__packets.task_done() + return packet + return None + + def update_firmware(self, data: bytes) -> None: + FIRMWARE_ADDRESS = 0x00100000 + FIRMWARE_UPDATE_TIMEOUT = 90.0 + + self.__reset() + self.__execute_command(b'R') + self.__execute_command(b'M', [FIRMWARE_ADDRESS, len(data)], data) + self.__execute_command(b'F', [FIRMWARE_ADDRESS, len(data)]) + + timeout = time.time() + FIRMWARE_UPDATE_TIMEOUT + while True: + if (time.time() > timeout): + raise SC64Exception('Firmware update timeout') + packet = self.__receive_data_packet() + if (packet == None): + time.sleep(0.001) + continue + (id, packet_data) = packet + if (id != b'F'): + raise SC64Exception('Unexpected packet id received') + status = self.__UpdateStatus(int.from_bytes(packet_data[0:4], byteorder='big')) + if (status == self.__UpdateStatus.ERROR): + raise SC64Exception('Firmware update error') + if (status == self.__UpdateStatus.DONE): + time.sleep(2) + break + + +class SC64BringUp: + __SERIAL_BAUD: int = 115200 + __SERIAL_TIMEOUT: float = 6.0 + __INTERVAL_TIME: float = 0.5 + + def __init__(self, progress: Callable[[int, int, str], None]) -> None: + self.__progress = progress + + def load_update_data(self, path: str) -> None: + self.__sc64_update_data = SC64UpdateData() + self.__sc64_update_data.load(path, require_all=True) + self.__bootloader_only_firmware = self.__sc64_update_data.create_bootloader_only_firmware() + + def get_update_info(self) -> str: + return self.__sc64_update_data.get_update_info() + + def start_bring_up(self, port: str, bootloader_only: bool=False) -> None: + link = None + sc64 = SC64() + + try: + if (not bootloader_only): + link = serial.Serial( + port, + baudrate=self.__SERIAL_BAUD, + parity=serial.PARITY_EVEN, + timeout=self.__SERIAL_TIMEOUT, + write_timeout=self.__SERIAL_TIMEOUT + ) + + stm32_bootloader = STM32Bootloader(link.write, link.read, self.__progress) + lcmxo2_primer = LCMXO2Primer(link.write, link.read, self.__progress) + + stm32_bootloader.connect(stm32_bootloader.DEV_ID_STM32G030XX) + stm32_bootloader.load_ram_and_run(self.__sc64_update_data.get_primer_data(), 'FPGA primer -> STM32 RAM') + time.sleep(self.__INTERVAL_TIME) + link.read_all() + + lcmxo2_primer.connect(lcmxo2_primer.DEV_ID_LCMXO2_7000HC) + lcmxo2_primer.load_flash_and_run(self.__sc64_update_data.get_fpga_data(), 'FPGA configuration -> LCMXO2 FLASH') + time.sleep(self.__INTERVAL_TIME) + link.read_all() + + stm32_bootloader.connect(stm32_bootloader.DEV_ID_STM32G030XX) + stm32_bootloader.load_flash_and_run(self.__sc64_update_data.get_mcu_data(), 'MCU software -> STM32 FLASH') + time.sleep(self.__INTERVAL_TIME) + link.read_all() + + bootloader_description = 'Bootloader -> SC64 FLASH (no progress reporting)' + bootloader_length = len(self.__bootloader_only_firmware) + self.__progress(bootloader_length, 0, bootloader_description) + sc64.update_firmware(self.__bootloader_only_firmware) + self.__progress(bootloader_length, bootloader_length, bootloader_description) + finally: + if (link and link.is_open): + link.close() + + +if __name__ == '__main__': + nargs = len(sys.argv) + if (nargs < 3 or nargs > 4): + Utils.die(f'Usage: {sys.argv[0]} serial_port update_file [--bootloader-only]') + + port = sys.argv[1] + update_data_path = sys.argv[2] + bootloader_only = False + if (nargs == 4): + if (sys.argv[3] == '--bootloader-only'): + bootloader_only = True + else: + Utils.die(f'Unknown argument: {sys.argv[3]}') + + utils = Utils() + sc64_bring_up = SC64BringUp(progress=utils.progress) + + Utils.log() + Utils.info('[ Welcome to SC64 flashcart board bring-up! ]') + Utils.log() + + Utils.log(f'Serial port: {port}') + Utils.log(f'Update data path: {os.path.abspath(update_data_path)}') + try: + sc64_bring_up.load_update_data(update_data_path) + except SC64UpdateDataException as e: + Utils.die(f'Provided \'{update_data_path}\' file is invalid: {e}') + Utils.log('Update info: ') + Utils.log(sc64_bring_up.get_update_info()) + Utils.log() + + if bootloader_only: + Utils.log('Running in "bootloader only" mode') + Utils.log() + + Utils.warning('[ CAUTION ]') + Utils.warning('Configure FTDI chip with provided ft232h_config.xml before continuing') + Utils.warning('Connect SC64 USB port to the same computer you\'re running this script') + Utils.warning('Make sure SC64 USB port is recognized in system before continuing') + Utils.log() + + Utils.warning('[ IMPORTANT ]') + Utils.warning('Unplug SC64 board from power and reconnect it before proceeding') + Utils.log() + + try: + if (input('Type YES to continue: ') != 'YES'): + Utils.die('No confirmation received. Exiting') + Utils.log() + except KeyboardInterrupt: + Utils.log() + Utils.die('Aborted') + + original_sigint_handler = signal.getsignal(signal.SIGINT) + try: + signal.signal(signal.SIGINT, lambda *kwargs: utils.exit_warning()) + sc64_bring_up.start_bring_up(port, bootloader_only) + except (serial.SerialException, STM32BootloaderException, LCMXO2PrimerException, SC64Exception) as e: + if (utils.get_progress_active): + Utils.log() + Utils.die(f'Error while running bring-up: {e}') + finally: + signal.signal(signal.SIGINT, original_sigint_handler) + + Utils.log() + Utils.info('[ SC64 flashcart board bring-up finished successfully! ]') + Utils.log() diff --git a/sw/pc/requirements.txt b/sw/tools/requirements.txt similarity index 50% rename from sw/pc/requirements.txt rename to sw/tools/requirements.txt index c3f0da9..d0dfd58 100644 --- a/sw/pc/requirements.txt +++ b/sw/tools/requirements.txt @@ -1,2 +1 @@ -Pillow==9.2.0 pyserial==3.5 diff --git a/sw/update/update.py b/sw/tools/update.py old mode 100755 new mode 100644 similarity index 97% rename from sw/update/update.py rename to sw/tools/update.py index 8828577..3b0c338 --- a/sw/update/update.py +++ b/sw/tools/update.py @@ -145,13 +145,13 @@ class JedecFile: class SC64UpdateData: - __UPDATE_TOKEN = b'SC64 Update v2.0' + __UPDATE_TOKEN = b'SC64 Update v2.0' - __CHUNK_ID_UPDATE_INFO = 1 - __CHUNK_ID_MCU_DATA = 2 - __CHUNK_ID_FPGA_DATA = 3 - __CHUNK_ID_BOOTLOADER_DATA = 4 - __CHUNK_ID_PRIMER_DATA = 5 + __CHUNK_ID_UPDATE_INFO = 1 + __CHUNK_ID_MCU_DATA = 2 + __CHUNK_ID_FPGA_DATA = 3 + __CHUNK_ID_BOOTLOADER_DATA = 4 + __CHUNK_ID_PRIMER_DATA = 5 __data = b''