mirror of
https://github.com/Polprzewodnikowy/SummerCart64.git
synced 2024-11-28 08:24:14 +01:00
806 lines
30 KiB
Python
806 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import io
|
|
import os
|
|
import queue
|
|
import serial
|
|
import signal
|
|
import struct
|
|
import sys
|
|
import time
|
|
from binascii import crc32
|
|
from enum import IntEnum, StrEnum
|
|
from serial.tools import list_ports
|
|
from sys import exit
|
|
from typing import Callable, Optional
|
|
|
|
|
|
|
|
class Utils:
|
|
__progress_active = False
|
|
|
|
@staticmethod
|
|
def log(message: str='') -> None:
|
|
print(message)
|
|
|
|
@staticmethod
|
|
def log_no_end(message: str='') -> None:
|
|
print(message, end='', flush=True)
|
|
|
|
@staticmethod
|
|
def info(message: str='') -> None:
|
|
print(f'\033[92m{message}\033[0m')
|
|
|
|
@staticmethod
|
|
def warning(message: str='') -> None:
|
|
print(f'\033[93m{message}\033[0m')
|
|
|
|
@staticmethod
|
|
def die(reason: str) -> None:
|
|
print(f'\033[91m{reason}\033[0m')
|
|
exit(-1)
|
|
|
|
@property
|
|
def get_progress_active(self):
|
|
return self.__progress_active
|
|
|
|
def progress(self, length: int, position: int, description: str) -> None:
|
|
value = ((position / length) * 100.0)
|
|
if (position == 0):
|
|
self.__progress_active = True
|
|
Utils.log_no_end(f'\r{value:5.1f}%: [{description}]')
|
|
if (position == length):
|
|
Utils.log()
|
|
self.__progress_active = False
|
|
|
|
def exit_warning(self):
|
|
if (self.__progress_active):
|
|
Utils.log()
|
|
Utils.warning('Ctrl-C is prohibited during bring-up procedure')
|
|
|
|
|
|
class SC64UpdateDataException(Exception):
|
|
pass
|
|
|
|
class SC64UpdateData:
|
|
__UPDATE_TOKEN = b'SC64 Update v2.0'
|
|
|
|
__CHUNK_ID_UPDATE_INFO = 1
|
|
__CHUNK_ID_MCU_DATA = 2
|
|
__CHUNK_ID_FPGA_DATA = 3
|
|
__CHUNK_ID_BOOTLOADER_DATA = 4
|
|
__CHUNK_ID_PRIMER_DATA = 5
|
|
|
|
__update_info: Optional[str]
|
|
__mcu_data: Optional[bytes]
|
|
__fpga_data: Optional[bytes]
|
|
__bootloader_data: Optional[bytes]
|
|
__primer_data: Optional[bytes]
|
|
|
|
def __int_to_bytes(self, value: int) -> bytes:
|
|
return value.to_bytes(4, byteorder='little')
|
|
|
|
def __align(self, value: int) -> int:
|
|
if (value % 16 != 0):
|
|
value += (16 - (value % 16))
|
|
return value
|
|
|
|
def __load_int(self, f: io.BufferedReader) -> int:
|
|
try:
|
|
data = f.read(4)
|
|
if (len(data) != 4):
|
|
raise ValueError('Read size did not match requested amount')
|
|
value = int.from_bytes(data, byteorder='little')
|
|
except ValueError as e:
|
|
raise SC64UpdateDataException(f'Error while reading chunk header: {e}')
|
|
return value
|
|
|
|
def __load_chunk(self, f: io.BufferedReader) -> tuple[int, bytes]:
|
|
id = self.__load_int(f)
|
|
aligned_length = self.__load_int(f)
|
|
checksum = self.__load_int(f)
|
|
data_length = self.__load_int(f)
|
|
|
|
data = f.read(data_length)
|
|
|
|
align = (aligned_length - 4 - 4 - data_length)
|
|
f.seek(align, io.SEEK_CUR)
|
|
|
|
if (crc32(data) != checksum):
|
|
raise SC64UpdateDataException(f'Invalid checksum for chunk id [{id}] inside update file')
|
|
|
|
return (id, data)
|
|
|
|
def load(self, path: str, require_all: bool=False) -> None:
|
|
self.__update_info = None
|
|
self.__mcu_data = None
|
|
self.__fpga_data = None
|
|
self.__bootloader_data = None
|
|
self.__primer_data = None
|
|
|
|
try:
|
|
with open(path, 'rb') as f:
|
|
if (f.read(len(self.__UPDATE_TOKEN)) != self.__UPDATE_TOKEN):
|
|
raise SC64UpdateDataException('Invalid update file header')
|
|
|
|
while (f.peek(1) != b''):
|
|
(id, data) = self.__load_chunk(f)
|
|
if (id == self.__CHUNK_ID_UPDATE_INFO):
|
|
self.__update_info = data.decode('ascii')
|
|
elif (id == self.__CHUNK_ID_MCU_DATA):
|
|
self.__mcu_data = data
|
|
elif (id == self.__CHUNK_ID_FPGA_DATA):
|
|
self.__fpga_data = data
|
|
elif (id == self.__CHUNK_ID_BOOTLOADER_DATA):
|
|
self.__bootloader_data = data
|
|
elif (id == self.__CHUNK_ID_PRIMER_DATA):
|
|
self.__primer_data = data
|
|
else:
|
|
raise SC64UpdateDataException('Unknown chunk inside update file')
|
|
|
|
if (require_all):
|
|
if (not self.__update_info):
|
|
raise SC64UpdateDataException('No update info inside update file')
|
|
if (not self.__mcu_data):
|
|
raise SC64UpdateDataException('No MCU data inside update file')
|
|
if (not self.__fpga_data):
|
|
raise SC64UpdateDataException('No FPGA data inside update file')
|
|
if (not self.__bootloader_data):
|
|
raise SC64UpdateDataException('No bootloader data inside update file')
|
|
if (not self.__primer_data):
|
|
raise SC64UpdateDataException('No primer data inside update file')
|
|
|
|
except IOError as e:
|
|
raise SC64UpdateDataException(f'IO error while loading update data: {e}')
|
|
|
|
def get_update_info(self) -> Optional[str]:
|
|
return self.__update_info
|
|
|
|
def get_mcu_data(self) -> Optional[bytes]:
|
|
return self.__mcu_data
|
|
|
|
def get_fpga_data(self) -> Optional[bytes]:
|
|
return self.__fpga_data
|
|
|
|
def get_bootloader_data(self) -> Optional[bytes]:
|
|
return self.__bootloader_data
|
|
|
|
def get_primer_data(self) -> Optional[bytes]:
|
|
return self.__primer_data
|
|
|
|
def create_bootloader_only_firmware(self):
|
|
if (self.__bootloader_data == None):
|
|
raise SC64UpdateDataException('No bootloader data available for firmware creation')
|
|
|
|
chunk = b''
|
|
chunk += self.__int_to_bytes(self.__CHUNK_ID_BOOTLOADER_DATA)
|
|
chunk += self.__int_to_bytes(8 + self.__align(len(self.__bootloader_data)))
|
|
chunk += self.__int_to_bytes(crc32(self.__bootloader_data))
|
|
chunk += self.__int_to_bytes(len(self.__bootloader_data))
|
|
chunk += self.__bootloader_data
|
|
chunk += bytes([0] * (self.__align(len(chunk)) - len(chunk)))
|
|
|
|
data = b''
|
|
data += self.__UPDATE_TOKEN
|
|
data += chunk
|
|
|
|
return data
|
|
|
|
|
|
class STM32BootloaderException(Exception):
|
|
pass
|
|
|
|
class STM32Bootloader:
|
|
__INIT = b'\x7F'
|
|
__ACK = b'\x79'
|
|
__NACK = b'\x1F'
|
|
|
|
__MEMORY_RW_MAX_SIZE = 256
|
|
__FLASH_LOAD_ADDRESS = 0x08000000
|
|
__FLASH_MAX_LOAD_SIZE = 0x8000
|
|
__RAM_LOAD_ADDRESS = 0x20001000
|
|
__RAM_MAX_LOAD_SIZE = 0x1000
|
|
|
|
DEV_ID_STM32G030XX = b'\x04\x66'
|
|
|
|
__connected = False
|
|
|
|
def __init__(
|
|
self,
|
|
write: Callable[[bytes], None],
|
|
read: Callable[[int], bytes],
|
|
flush: Callable[[None], None],
|
|
progress: Callable[[int, int, str], None]
|
|
):
|
|
self.__write = write
|
|
self.__read = read
|
|
self.__flush = flush
|
|
self.__progress = progress
|
|
|
|
def __append_xor(self, data: bytes) -> bytes:
|
|
xor = (0xFF if (len(data) == 1) else 0x00)
|
|
for b in data:
|
|
xor ^= b
|
|
return bytes([*data, xor])
|
|
|
|
def __check_ack(self) -> None:
|
|
response = self.__read(1)
|
|
if (len(response) != 1):
|
|
raise STM32BootloaderException('No ACK/NACK byte received')
|
|
if (response == self.__NACK):
|
|
raise STM32BootloaderException('NACK byte received')
|
|
if (response != self.__ACK):
|
|
raise STM32BootloaderException('Unknown ACK/NACK byte received')
|
|
|
|
def __cmd_send(self, cmd: bytes) -> None:
|
|
if (len(cmd) != 1):
|
|
raise ValueError('Command must contain only one byte')
|
|
self.__write(self.__append_xor(cmd))
|
|
self.__flush()
|
|
self.__check_ack()
|
|
|
|
def __data_write(self, data: bytes) -> None:
|
|
self.__write(self.__append_xor(data))
|
|
self.__flush()
|
|
self.__check_ack()
|
|
|
|
def __data_read(self) -> bytes:
|
|
length = self.__read(1)
|
|
if (len(length) != 1):
|
|
raise STM32BootloaderException('Did not receive length byte')
|
|
length = (length[0] + 1)
|
|
data = self.__read(length)
|
|
if (len(data) != length):
|
|
raise STM32BootloaderException('Did not receive requested data bytes')
|
|
self.__check_ack()
|
|
return data
|
|
|
|
def __get_id(self) -> bytes:
|
|
self.__cmd_send(b'\x02')
|
|
return self.__data_read()
|
|
|
|
def __read_memory(self, address: int, length: int) -> bytes:
|
|
if (length == 0 or length > self.__MEMORY_RW_MAX_SIZE):
|
|
raise ValueError('Wrong data size for read memory command')
|
|
self.__cmd_send(b'\x11')
|
|
self.__data_write(address.to_bytes(4, byteorder='big'))
|
|
self.__data_write(bytes([length - 1]))
|
|
data = self.__read(length)
|
|
if (len(data) != length):
|
|
raise STM32BootloaderException(f'Did not receive requested memory bytes')
|
|
return data
|
|
|
|
def __go(self, address: int) -> None:
|
|
self.__cmd_send(b'\x21')
|
|
self.__data_write(address.to_bytes(4, byteorder='big'))
|
|
self.__connected = False
|
|
|
|
def __write_memory(self, address: int, data: bytes) -> None:
|
|
length = len(data)
|
|
if (length == 0 or length > self.__MEMORY_RW_MAX_SIZE):
|
|
raise ValueError('Wrong data size for write memory command')
|
|
if (((address % 4) != 0) or ((length % 4) != 0)):
|
|
raise ValueError('Write memory command requires 4 byte alignment')
|
|
self.__cmd_send(b'\x31')
|
|
self.__data_write(address.to_bytes(4, byteorder='big'))
|
|
self.__data_write(bytes([length - 1, *data]))
|
|
|
|
def __mass_erase(self) -> None:
|
|
self.__cmd_send(b'\x44')
|
|
self.__data_write(b'\xFF\xFF')
|
|
|
|
def __load_memory(self, address: int, data: bytes, description: str='') -> None:
|
|
length = len(data)
|
|
self.__progress(length, 0, description)
|
|
for offset in range(0, length, self.__MEMORY_RW_MAX_SIZE):
|
|
chunk = data[offset:offset + self.__MEMORY_RW_MAX_SIZE]
|
|
self.__write_memory(address + offset, chunk)
|
|
verify = self.__read_memory(address + offset, len(chunk))
|
|
if (chunk != verify):
|
|
raise STM32BootloaderException('Memory verify failed')
|
|
self.__progress(length, offset, description)
|
|
self.__progress(length, length, description)
|
|
|
|
def connect(self, id: int) -> None:
|
|
if (not self.__connected):
|
|
try:
|
|
self.__write(self.__INIT)
|
|
self.__flush()
|
|
self.__check_ack()
|
|
except STM32BootloaderException as e:
|
|
raise STM32BootloaderException(f'Could not connect to the STM32 ({e})')
|
|
self.__connected = True
|
|
dev_id = self.__get_id()
|
|
if (dev_id != id):
|
|
raise STM32BootloaderException('Unknown chip detected')
|
|
|
|
def load_ram_and_run(self, data: bytes, description: str='') -> None:
|
|
if (len(data) > self.__RAM_MAX_LOAD_SIZE):
|
|
raise STM32BootloaderException('RAM image too big')
|
|
self.__load_memory(self.__RAM_LOAD_ADDRESS, data, description)
|
|
self.__go(self.__RAM_LOAD_ADDRESS)
|
|
|
|
def load_flash_and_run(self, data: bytes, description: str='') -> None:
|
|
if (len(data) > self.__FLASH_MAX_LOAD_SIZE):
|
|
raise STM32BootloaderException('Flash image too big')
|
|
self.__mass_erase()
|
|
try:
|
|
self.__load_memory(self.__FLASH_LOAD_ADDRESS, data, description)
|
|
self.__go(self.__FLASH_LOAD_ADDRESS)
|
|
except STM32BootloaderException as e:
|
|
self.__mass_erase()
|
|
raise STM32BootloaderException(e)
|
|
|
|
|
|
class LCMXO2PrimerException(Exception):
|
|
pass
|
|
|
|
class LCMXO2Primer:
|
|
__PRIMER_ID_LCMXO2 = b'MXO2'
|
|
|
|
__CMD_GET_PRIMER_ID = b'?'
|
|
__CMD_PROBE_FPGA = b'#'
|
|
__CMD_RESTART = b'$'
|
|
__CMD_GET_DEVICE_ID = b'I'
|
|
__CMD_ENABLE_FLASH = b'E'
|
|
__CMD_ERASE_FLASH = b'X'
|
|
__CMD_RESET_ADDRESS = b'A'
|
|
__CMD_WRITE_PAGE = b'W'
|
|
__CMD_READ_PAGE = b'R'
|
|
__CMD_PROGRAM_DONE = b'F'
|
|
__CMD_INIT_FEATBITS = b'Q'
|
|
__CMD_REFRESH = b'B'
|
|
|
|
__FLASH_PAGE_SIZE = 16
|
|
__FLASH_NUM_PAGES = 11260
|
|
|
|
__FPGA_PROBE_VALUE = b'\x64'
|
|
|
|
DEV_ID_LCMXO2_7000HC = b'\x01\x2B\xD0\x43'
|
|
|
|
def __init__(
|
|
self,
|
|
write: Callable[[bytes], None],
|
|
read: Callable[[int], bytes],
|
|
flush: Callable[[None], None],
|
|
progress: Callable[[int, int, str], None]
|
|
):
|
|
self.__write = write
|
|
self.__read = read
|
|
self.__flush = flush
|
|
self.__progress = progress
|
|
|
|
def __cmd_execute(self, cmd: bytes, data: bytes=b'') -> bytes:
|
|
if (len(cmd) != 1):
|
|
raise ValueError('Command must contain only one byte')
|
|
if (len(data) >= 256):
|
|
raise ValueError('Data size too big')
|
|
|
|
packet = b'CMD' + cmd
|
|
packet += len(data).to_bytes(1, byteorder='little')
|
|
packet += data
|
|
packet += crc32(packet).to_bytes(4, byteorder='little')
|
|
self.__write(packet)
|
|
self.__flush()
|
|
|
|
response = self.__read(5)
|
|
if (len(response) != 5):
|
|
raise LCMXO2PrimerException(f'No response received [{cmd}]')
|
|
length = int.from_bytes(response[4:5], byteorder='little')
|
|
response_data = self.__read(length)
|
|
if (len(response_data) != length):
|
|
raise LCMXO2PrimerException(f'No response data received [{cmd}]')
|
|
checksum = self.__read(4)
|
|
if (len(checksum) != 4):
|
|
raise LCMXO2PrimerException(f'No response data checksum received [{cmd}]')
|
|
calculated_checksum = crc32(response + response_data)
|
|
received_checksum = int.from_bytes(checksum, byteorder='little')
|
|
|
|
if (response[0:3] != b'RSP'):
|
|
raise LCMXO2PrimerException(f'Invalid response token [{response[0:3]} / {cmd}]')
|
|
if (response[3:4] != cmd):
|
|
raise LCMXO2PrimerException(f'Invalid response command [{cmd} / {response[3]}]')
|
|
if (calculated_checksum != received_checksum):
|
|
raise LCMXO2PrimerException(f'Invalid response checksum [{cmd}]')
|
|
|
|
return response_data
|
|
|
|
def connect(self, id: bytes) -> None:
|
|
try:
|
|
primer_id = self.__cmd_execute(self.__CMD_GET_PRIMER_ID)
|
|
if (primer_id != self.__PRIMER_ID_LCMXO2):
|
|
raise LCMXO2PrimerException('Invalid primer ID received')
|
|
|
|
dev_id = self.__cmd_execute(self.__CMD_GET_DEVICE_ID)
|
|
if (dev_id != id):
|
|
raise LCMXO2PrimerException('Invalid FPGA device id received')
|
|
except LCMXO2PrimerException as e:
|
|
raise LCMXO2PrimerException(f'Could not connect to the LCMXO2 primer ({e})')
|
|
|
|
def load_flash_and_run(self, data: bytes, description: str) -> None:
|
|
erase_description = f'{description} / Erase'
|
|
program_description = f'{description} / Program'
|
|
verify_description = f'{description} / Verify'
|
|
|
|
length = len(data)
|
|
if (length > (self.__FLASH_PAGE_SIZE * self.__FLASH_NUM_PAGES)):
|
|
raise LCMXO2PrimerException('FPGA data size too big')
|
|
if ((length % self.__FLASH_PAGE_SIZE) != 0):
|
|
raise LCMXO2PrimerException('FPGA data size not aligned to page size')
|
|
|
|
self.__cmd_execute(self.__CMD_ENABLE_FLASH)
|
|
|
|
self.__progress(length, 0, erase_description)
|
|
self.__cmd_execute(self.__CMD_ERASE_FLASH)
|
|
self.__progress(length, length, erase_description)
|
|
|
|
try:
|
|
self.__cmd_execute(self.__CMD_RESET_ADDRESS)
|
|
self.__progress(length, 0, program_description)
|
|
for offset in range(0, length, self.__FLASH_PAGE_SIZE):
|
|
page_data = data[offset:(offset + self.__FLASH_PAGE_SIZE)]
|
|
self.__cmd_execute(self.__CMD_WRITE_PAGE, page_data)
|
|
self.__progress(length, offset, program_description)
|
|
self.__progress(length, length, program_description)
|
|
|
|
self.__cmd_execute(self.__CMD_RESET_ADDRESS)
|
|
self.__progress(length, 0, verify_description)
|
|
for offset in range(0, length, self.__FLASH_PAGE_SIZE):
|
|
page_data = data[offset:(offset + self.__FLASH_PAGE_SIZE)]
|
|
verify_data = self.__cmd_execute(self.__CMD_READ_PAGE)
|
|
self.__progress(length, offset, verify_description)
|
|
if (page_data != verify_data):
|
|
raise LCMXO2PrimerException('FPGA verification error')
|
|
self.__progress(length, length, verify_description)
|
|
|
|
self.__cmd_execute(self.__CMD_INIT_FEATBITS)
|
|
|
|
self.__cmd_execute(self.__CMD_PROGRAM_DONE)
|
|
|
|
self.__cmd_execute(self.__CMD_REFRESH)
|
|
|
|
if (self.__cmd_execute(self.__CMD_PROBE_FPGA) != self.__FPGA_PROBE_VALUE):
|
|
raise LCMXO2PrimerException('Invalid FPGA ID value received')
|
|
|
|
except LCMXO2PrimerException as e:
|
|
self.__cmd_execute(self.__CMD_ENABLE_FLASH)
|
|
self.__cmd_execute(self.__CMD_ERASE_FLASH)
|
|
self.__cmd_execute(self.__CMD_REFRESH)
|
|
self.__cmd_execute(self.__CMD_RESTART)
|
|
raise LCMXO2PrimerException(e)
|
|
|
|
self.__cmd_execute(self.__CMD_RESTART)
|
|
|
|
|
|
class SC64Exception(Exception):
|
|
pass
|
|
|
|
class SC64:
|
|
__serial: Optional[serial.Serial] = None
|
|
__packets = queue.Queue()
|
|
|
|
SDRAM_SIZE = 64 * 1024 * 1024
|
|
|
|
class __UpdateStatus(IntEnum):
|
|
MCU = 1
|
|
FPGA = 2
|
|
BOOTLOADER = 3
|
|
DONE = 0x80
|
|
ERROR = 0xFF
|
|
|
|
def __init__(self, progress: Callable[[int, int, str], None]) -> None:
|
|
self.__progress = progress
|
|
SC64_VID = 0x0403
|
|
SC64_PID = 0x6014
|
|
SC64_SID = "SC64"
|
|
for p in list_ports.comports():
|
|
if (p.vid == SC64_VID and p.pid == SC64_PID and p.serial_number.startswith(SC64_SID)):
|
|
try:
|
|
self.__serial = serial.Serial(p.device, timeout=10.0, write_timeout=10.0)
|
|
except serial.SerialException:
|
|
if (self.__serial):
|
|
self.__serial.close()
|
|
continue
|
|
return
|
|
raise SC64Exception('No SC64 USB device found')
|
|
|
|
def __reset(self) -> None:
|
|
WAIT_DURATION = 0.01
|
|
RETRY_COUNT = 100
|
|
|
|
self.__serial.dtr = 1
|
|
for n in range(0, RETRY_COUNT + 1):
|
|
self.__serial.reset_input_buffer()
|
|
self.__serial.reset_output_buffer()
|
|
time.sleep(WAIT_DURATION)
|
|
if (self.__serial.dsr == 1):
|
|
break
|
|
if n == RETRY_COUNT:
|
|
raise SC64Exception('Couldn\'t reset SC64 device (on)')
|
|
|
|
self.__serial.dtr = 0
|
|
for n in range(0, RETRY_COUNT + 1):
|
|
time.sleep(WAIT_DURATION)
|
|
if (self.__serial.dsr == 0):
|
|
break
|
|
if n == RETRY_COUNT:
|
|
raise SC64Exception('Couldn\'t reset SC64 device (on)')
|
|
|
|
def __process_incoming_data(self, wait_for_response: bool) -> Optional[tuple[bytes, bytes]]:
|
|
while (wait_for_response or self.__serial.in_waiting >= 4):
|
|
buffer = self.__serial.read(4)
|
|
token = buffer[0:3]
|
|
id = buffer[3:4]
|
|
if (token == b'CMP'):
|
|
length = int.from_bytes(self.__serial.read(4), byteorder='big')
|
|
data = self.__serial.read(length)
|
|
return (id, data)
|
|
elif (token == b'PKT'):
|
|
length = int.from_bytes(self.__serial.read(4), byteorder='big')
|
|
data = self.__serial.read(length)
|
|
self.__packets.put((id, data))
|
|
if (not wait_for_response):
|
|
break
|
|
elif (token == b'ERR'):
|
|
raise SC64Exception('Command response error')
|
|
else:
|
|
raise SC64Exception('Invalid token received')
|
|
return None
|
|
|
|
def __execute_command(self, cmd: bytes, args: list[int]=[0, 0], data: bytes=b'') -> bytes:
|
|
if (len(cmd) != 1):
|
|
raise SC64Exception('Length of command is different than 1 byte')
|
|
if (len(args) != 2):
|
|
raise SC64Exception('Number of arguments is different than 2')
|
|
try:
|
|
self.__serial.write(b'CMD' + cmd)
|
|
self.__serial.write(args[0].to_bytes(4, byteorder='big'))
|
|
self.__serial.write(args[1].to_bytes(4, byteorder='big'))
|
|
if (len(data) > 0):
|
|
self.__serial.write(data)
|
|
self.__serial.flush()
|
|
(id, response) = self.__process_incoming_data(True)
|
|
if (cmd != id):
|
|
raise SC64Exception('Command response ID didn\'t match')
|
|
return response
|
|
except serial.SerialException as e:
|
|
raise SC64Exception(f'Serial exception: {e}')
|
|
|
|
def __receive_data_packet(self) -> Optional[tuple[bytes, bytes]]:
|
|
if (self.__packets.empty()):
|
|
try:
|
|
if (self.__process_incoming_data(False) != None):
|
|
raise SC64Exception('Unexpected command response')
|
|
except serial.SerialException as e:
|
|
raise SC64Exception(f'Serial exception: {e}')
|
|
if (not self.__packets.empty()):
|
|
packet = self.__packets.get()
|
|
self.__packets.task_done()
|
|
return packet
|
|
return None
|
|
|
|
def __cmd_state_reset(self) -> None:
|
|
self.__execute_command(b'R')
|
|
|
|
def __cmd_memory_read(self, address: int, length: int) -> bytes:
|
|
return self.__execute_command(b'm', [address, length])
|
|
|
|
def __cmd_memory_write(self, address: int, data: bytes) -> None:
|
|
self.__execute_command(b'M', [address, len(data)], data)
|
|
|
|
def __cmd_firmware_update(self, address: int, length: int) -> None:
|
|
self.__execute_command(b'F', [address, length])
|
|
|
|
def update_firmware(self, data: bytes, description: str) -> None:
|
|
FIRMWARE_ADDRESS = 0x00100000
|
|
FIRMWARE_UPDATE_TIMEOUT = 90.0
|
|
STEPS = 6
|
|
|
|
self.__progress(STEPS, 0, description)
|
|
|
|
self.__reset()
|
|
self.__progress(STEPS, 1, description)
|
|
|
|
self.__cmd_state_reset()
|
|
self.__progress(STEPS, 2, description)
|
|
|
|
self.__cmd_memory_write(FIRMWARE_ADDRESS, data)
|
|
self.__progress(STEPS, 3, description)
|
|
|
|
self.__cmd_firmware_update(FIRMWARE_ADDRESS, len(data))
|
|
self.__progress(STEPS, 4, description)
|
|
|
|
timeout = time.time() + FIRMWARE_UPDATE_TIMEOUT
|
|
while True:
|
|
if (time.time() > timeout):
|
|
raise SC64Exception('Firmware update timeout')
|
|
packet = self.__receive_data_packet()
|
|
if (packet == None):
|
|
time.sleep(0.001)
|
|
continue
|
|
(id, packet_data) = packet
|
|
if (id != b'F'):
|
|
raise SC64Exception('Unexpected packet id received')
|
|
status = self.__UpdateStatus(int.from_bytes(packet_data[0:4], byteorder='big'))
|
|
if (status == self.__UpdateStatus.BOOTLOADER):
|
|
self.__progress(STEPS, 5, description)
|
|
elif (status == self.__UpdateStatus.DONE):
|
|
self.__progress(STEPS, 6, description)
|
|
time.sleep(2)
|
|
break
|
|
elif (status == self.__UpdateStatus.ERROR):
|
|
raise SC64Exception('Firmware update error')
|
|
|
|
class __RamTestPattern(StrEnum):
|
|
OWN_ADDRESS = 'own address'
|
|
ALL_ZEROS = 'all zeros'
|
|
ALL_ONES = 'all ones'
|
|
RANDOM_DATA = 'random data'
|
|
|
|
def __create_ram_test_pattern(self, pattern: __RamTestPattern) -> bytes:
|
|
if (pattern == self.__RamTestPattern.OWN_ADDRESS):
|
|
addresses = list(range(0, self.SDRAM_SIZE, 4))
|
|
data = struct.pack(f'>{len(addresses)}I', *addresses)
|
|
elif (pattern == self.__RamTestPattern.ALL_ZEROS):
|
|
data = b'\x00' * self.SDRAM_SIZE
|
|
elif (pattern == self.__RamTestPattern.ALL_ONES):
|
|
data = b'\xFF' * self.SDRAM_SIZE
|
|
elif (pattern == self.__RamTestPattern.RANDOM_DATA):
|
|
data = os.urandom(self.SDRAM_SIZE)
|
|
return bytes(data)
|
|
|
|
def sdram_test(self, description: str) -> None:
|
|
CHUNK_LENGTH = 1 * 1024 * 1024
|
|
|
|
self.__reset()
|
|
|
|
self.__cmd_state_reset()
|
|
|
|
for pattern in self.__RamTestPattern:
|
|
write_description = f'{description} / Write {pattern.value}'
|
|
check_description = f'{description} / Check {pattern.value}'
|
|
|
|
test_data = self.__create_ram_test_pattern(pattern)
|
|
|
|
self.__progress(self.SDRAM_SIZE, 0, write_description)
|
|
for offset in range(0, self.SDRAM_SIZE, CHUNK_LENGTH):
|
|
self.__cmd_memory_write(offset, test_data[offset:offset+CHUNK_LENGTH])
|
|
self.__progress(self.SDRAM_SIZE, offset + CHUNK_LENGTH, write_description)
|
|
|
|
self.__progress(self.SDRAM_SIZE, 0, check_description)
|
|
for offset in range(0, self.SDRAM_SIZE, CHUNK_LENGTH):
|
|
check_data = self.__cmd_memory_read(offset, CHUNK_LENGTH)
|
|
if (check_data != test_data[offset:offset+CHUNK_LENGTH]):
|
|
for chunk_offset in range(0, CHUNK_LENGTH, 4):
|
|
test_address = offset + chunk_offset
|
|
expected_value = int.from_bytes(test_data[test_address:test_address+4], byteorder='big')
|
|
read_value = int.from_bytes(check_data[chunk_offset:chunk_offset+4], byteorder='big')
|
|
if (read_value != expected_value or test_address == 0x00100000):
|
|
raise SC64Exception(f'SDRAM test error at 0x{test_address:08X}: read 0x{read_value:08X} != expected 0x{expected_value:08X}')
|
|
self.__progress(self.SDRAM_SIZE, offset + CHUNK_LENGTH, check_description)
|
|
|
|
|
|
class SC64BringUp:
|
|
__SERIAL_BAUD: int = 115200
|
|
__SERIAL_TIMEOUT: float = 6.0
|
|
__INTERVAL_TIME: float = 0.5
|
|
|
|
def __init__(self, progress: Callable[[int, int, str], None]) -> None:
|
|
self.__progress = progress
|
|
|
|
def load_update_data(self, path: str) -> None:
|
|
self.__sc64_update_data = SC64UpdateData()
|
|
self.__sc64_update_data.load(path, require_all=True)
|
|
self.__bootloader_only_firmware = self.__sc64_update_data.create_bootloader_only_firmware()
|
|
|
|
def get_update_info(self) -> str:
|
|
return self.__sc64_update_data.get_update_info()
|
|
|
|
def start_bring_up(self, port: str, bootloader_only: bool=False) -> None:
|
|
link = None
|
|
sc64 = SC64(self.__progress)
|
|
|
|
try:
|
|
if (not bootloader_only):
|
|
link = serial.Serial(
|
|
port,
|
|
baudrate=self.__SERIAL_BAUD,
|
|
parity=serial.PARITY_EVEN,
|
|
timeout=self.__SERIAL_TIMEOUT,
|
|
write_timeout=self.__SERIAL_TIMEOUT
|
|
)
|
|
|
|
stm32_bootloader = STM32Bootloader(link.write, link.read, link.flush, self.__progress)
|
|
lcmxo2_primer = LCMXO2Primer(link.write, link.read, link.flush, self.__progress)
|
|
|
|
stm32_bootloader.connect(stm32_bootloader.DEV_ID_STM32G030XX)
|
|
stm32_bootloader.load_ram_and_run(self.__sc64_update_data.get_primer_data(), 'FPGA primer -> STM32 RAM')
|
|
time.sleep(self.__INTERVAL_TIME)
|
|
link.read_all()
|
|
|
|
lcmxo2_primer.connect(lcmxo2_primer.DEV_ID_LCMXO2_7000HC)
|
|
lcmxo2_primer.load_flash_and_run(self.__sc64_update_data.get_fpga_data(), 'FPGA configuration -> LCMXO2 FLASH')
|
|
time.sleep(self.__INTERVAL_TIME)
|
|
link.read_all()
|
|
|
|
stm32_bootloader.connect(stm32_bootloader.DEV_ID_STM32G030XX)
|
|
stm32_bootloader.load_flash_and_run(self.__sc64_update_data.get_mcu_data(), 'MCU software -> STM32 FLASH')
|
|
time.sleep(self.__INTERVAL_TIME)
|
|
link.read_all()
|
|
|
|
sc64.sdram_test('SC64 SDRAM test')
|
|
sc64.update_firmware(self.__bootloader_only_firmware, 'Bootloader -> SC64 FLASH')
|
|
finally:
|
|
if (link and link.is_open):
|
|
link.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
nargs = len(sys.argv)
|
|
if (nargs < 3 or nargs > 4):
|
|
Utils.die(f'Usage: {sys.argv[0]} serial_port update_file [--bootloader-only]')
|
|
|
|
port = sys.argv[1]
|
|
update_data_path = sys.argv[2]
|
|
bootloader_only = False
|
|
if (nargs == 4):
|
|
if (sys.argv[3] == '--bootloader-only'):
|
|
bootloader_only = True
|
|
else:
|
|
Utils.die(f'Unknown argument: {sys.argv[3]}')
|
|
|
|
utils = Utils()
|
|
sc64_bring_up = SC64BringUp(progress=utils.progress)
|
|
|
|
Utils.log()
|
|
Utils.info('[ Welcome to the SummerCart64 flashcart board bring-up! ]')
|
|
Utils.log()
|
|
|
|
Utils.log(f'Serial port: {port}')
|
|
Utils.log(f'Update data path: {os.path.abspath(update_data_path)}')
|
|
try:
|
|
sc64_bring_up.load_update_data(update_data_path)
|
|
except SC64UpdateDataException as e:
|
|
Utils.die(f'Provided \'{update_data_path}\' file is invalid: {e}')
|
|
Utils.log('Update info: ')
|
|
Utils.log(sc64_bring_up.get_update_info())
|
|
Utils.log()
|
|
|
|
if bootloader_only:
|
|
Utils.log('Running in "bootloader only" mode')
|
|
Utils.log()
|
|
|
|
Utils.warning('[ CAUTION ]')
|
|
Utils.warning('Configure FTDI chip with provided ft232h_config.xml before continuing')
|
|
Utils.warning('Connect SC64 USB port to the same computer you\'re running this script')
|
|
Utils.warning('Make sure SC64 USB port is recognized in system before continuing')
|
|
Utils.log()
|
|
|
|
Utils.warning('[ IMPORTANT ]')
|
|
Utils.warning('Unplug SC64 board from power and reconnect it before proceeding')
|
|
Utils.log()
|
|
|
|
try:
|
|
if (input('Type YES to continue: ') != 'YES'):
|
|
Utils.die('No confirmation received. Exiting')
|
|
Utils.log()
|
|
except KeyboardInterrupt:
|
|
Utils.log()
|
|
Utils.die('Aborted')
|
|
|
|
original_sigint_handler = signal.getsignal(signal.SIGINT)
|
|
try:
|
|
signal.signal(signal.SIGINT, lambda *kwargs: utils.exit_warning())
|
|
Utils.log('Starting SC64 flashcart board bring-up...')
|
|
sc64_bring_up.start_bring_up(port, bootloader_only)
|
|
except (serial.SerialException, STM32BootloaderException, LCMXO2PrimerException, SC64Exception) as e:
|
|
if (utils.get_progress_active):
|
|
Utils.log()
|
|
Utils.die(f'Error while running bring-up: {e}')
|
|
finally:
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
|
|
Utils.log()
|
|
Utils.info('[ SC64 flashcart board bring-up finished successfully! ]')
|
|
Utils.log()
|