Files
sphaira/tools/usb_export.py

93 lines
2.6 KiB
Python

import crc32c
import sys
import os
from pathlib import Path
from usb_common import *
def get_file_name(usb: Usb, name_length: int) -> str:
return bytes(usb.read(name_length)).decode('utf-8')
def create_file_folder(root: Path, file_path: Path) -> Path:
# todo: check if it already exists.
full_path = Path(root + "/" + file_path)
full_path.parent.mkdir(exist_ok=True, parents=True)
print("created folder")
return full_path
def wait_for_input(usb: Usb, path: Path) -> None:
print("now waiting for intput\n")
with open(path, "wb") as file:
print("opened file {}".format(path))
while True:
[off, size, crc32c_want] = usb.get_send_data_header()
# todo: this isn't needed really.
usb.send_result(RESULT_OK)
# check if we should finish now.
if (off == 0 and size == 0):
break
# read the buffer and calculate the crc32c.
buf = usb.read(size)
crc32c_got = crc32c.crc32c(buf)
# validate the crc32c matches.
if (crc32c_want != crc32c_got):
usb.send_result(RESULT_ERROR)
continue
try:
file.seek(off)
file.write(buf)
usb.send_result(RESULT_OK)
except BlockingIOError as e:
print("Error: failed to write: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e)))
usb.send_result(RESULT_ERROR)
if __name__ == '__main__':
print("hello world")
# check which mode the user has selected.
args = len(sys.argv)
if (args != 2):
print("pass the folder path")
sys.exit(1)
root_path = sys.argv[1]
if (not os.path.isdir(root_path)):
raise ValueError('must be a dir!')
usb = Usb()
try:
# get usb endpoints.
usb.wait_for_connect()
# wait for command.
while True:
[cmd, arg3, arg4] = usb.get_send_header()
if (cmd == CMD_QUIT):
usb.send_result(RESULT_OK)
break
elif (cmd == CMD_EXPORT):
usb.send_result(RESULT_OK)
# todo: handle and return errors here.
file_name = get_file_name(usb, arg3)
full_path = create_file_folder(root_path, file_name)
usb.send_result(RESULT_OK)
wait_for_input(usb, full_path)
else:
usb.send_result(RESULT_ERROR)
break
except Exception as inst:
print("An exception occurred " + str(inst))