2023-06-26 05:39:56 +02:00
|
|
|
"""
|
|
|
|
Module for deserializing/serializing to and from VDF
|
|
|
|
"""
|
|
|
|
__version__ = "3.4"
|
|
|
|
__author__ = "Rossen Georgiev"
|
2023-06-26 05:39:11 +02:00
|
|
|
|
2023-06-26 05:39:56 +02:00
|
|
|
import re
|
|
|
|
import sys
|
|
|
|
import struct
|
|
|
|
from binascii import crc32
|
|
|
|
from io import BytesIO
|
|
|
|
from io import StringIO as unicodeIO
|
|
|
|
|
|
|
|
try:
|
|
|
|
from collections.abc import Mapping
|
|
|
|
except:
|
|
|
|
from collections import Mapping
|
|
|
|
|
|
|
|
# Py2 & Py3 compatibility
|
|
|
|
if sys.version_info[0] >= 3:
|
|
|
|
string_type = str
|
|
|
|
int_type = int
|
|
|
|
BOMS = '\ufffe\ufeff'
|
|
|
|
|
|
|
|
def strip_bom(line):
|
|
|
|
return line.lstrip(BOMS)
|
|
|
|
else:
|
|
|
|
from StringIO import StringIO as strIO
|
|
|
|
string_type = basestring
|
|
|
|
int_type = long
|
|
|
|
BOMS = '\xef\xbb\xbf\xff\xfe\xfe\xff'
|
|
|
|
BOMS_UNICODE = '\\ufffe\\ufeff'.decode('unicode-escape')
|
|
|
|
|
|
|
|
def strip_bom(line):
|
|
|
|
return line.lstrip(BOMS if isinstance(line, str) else BOMS_UNICODE)
|
|
|
|
|
|
|
|
# string escaping
|
|
|
|
_unescape_char_map = {
|
|
|
|
r"\n": "\n",
|
|
|
|
r"\t": "\t",
|
|
|
|
r"\v": "\v",
|
|
|
|
r"\b": "\b",
|
|
|
|
r"\r": "\r",
|
|
|
|
r"\f": "\f",
|
|
|
|
r"\a": "\a",
|
|
|
|
r"\\": "\\",
|
|
|
|
r"\?": "?",
|
|
|
|
r"\"": "\"",
|
|
|
|
r"\'": "\'",
|
|
|
|
}
|
|
|
|
_escape_char_map = {v: k for k, v in _unescape_char_map.items()}
|
|
|
|
|
|
|
|
def _re_escape_match(m):
|
|
|
|
return _escape_char_map[m.group()]
|
|
|
|
|
|
|
|
def _re_unescape_match(m):
|
|
|
|
return _unescape_char_map[m.group()]
|
|
|
|
|
|
|
|
def _escape(text):
|
|
|
|
return re.sub(r"[\n\t\v\b\r\f\a\\\?\"']", _re_escape_match, text)
|
|
|
|
|
|
|
|
def _unescape(text):
|
|
|
|
return re.sub(r"(\\n|\\t|\\v|\\b|\\r|\\f|\\a|\\\\|\\\?|\\\"|\\')", _re_unescape_match, text)
|
|
|
|
|
|
|
|
# parsing and dumping for KV1
|
|
|
|
def parse(fp, mapper=dict, merge_duplicate_keys=True, escaped=True):
|
|
|
|
"""
|
|
|
|
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a VDF)
|
|
|
|
to a Python object.
|
|
|
|
|
|
|
|
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
|
|
|
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
|
|
|
wish to preserve key order. Or any object that acts like a ``dict``.
|
|
|
|
|
|
|
|
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
|
|
|
same key into one instead of overwriting. You can se this to ``False`` if you are
|
|
|
|
using ``VDFDict`` and need to preserve the duplicates.
|
|
|
|
"""
|
|
|
|
if not issubclass(mapper, Mapping):
|
|
|
|
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
|
|
|
|
if not hasattr(fp, 'readline'):
|
|
|
|
raise TypeError("Expected fp to be a file-like object supporting line iteration")
|
|
|
|
|
|
|
|
stack = [mapper()]
|
|
|
|
expect_bracket = False
|
|
|
|
|
|
|
|
re_keyvalue = re.compile(r'^("(?P<qkey>(?:\\.|[^\\"])*)"|(?P<key>#?[a-z0-9\-\_\\\?$%<>]+))'
|
|
|
|
r'([ \t]*('
|
|
|
|
r'"(?P<qval>(?:\\.|[^\\"])*)(?P<vq_end>")?'
|
|
|
|
r'|(?P<val>(?:(?<!/)/(?!/)|[a-z0-9\-\_\\\?\*\.$<> ])+)'
|
|
|
|
r'|(?P<sblock>{[ \t]*)(?P<eblock>})?'
|
|
|
|
r'))?',
|
|
|
|
flags=re.I)
|
|
|
|
|
|
|
|
for lineno, line in enumerate(fp, 1):
|
|
|
|
if lineno == 1:
|
|
|
|
line = strip_bom(line)
|
|
|
|
|
|
|
|
line = line.lstrip()
|
|
|
|
|
|
|
|
# skip empty and comment lines
|
|
|
|
if line == "" or line[0] == '/':
|
|
|
|
continue
|
|
|
|
|
|
|
|
# one level deeper
|
|
|
|
if line[0] == "{":
|
|
|
|
expect_bracket = False
|
|
|
|
continue
|
|
|
|
|
|
|
|
if expect_bracket:
|
|
|
|
raise SyntaxError("vdf.parse: expected openning bracket",
|
|
|
|
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 1, line))
|
|
|
|
|
|
|
|
# one level back
|
|
|
|
if line[0] == "}":
|
|
|
|
if len(stack) > 1:
|
|
|
|
stack.pop()
|
|
|
|
continue
|
|
|
|
|
|
|
|
raise SyntaxError("vdf.parse: one too many closing parenthasis",
|
|
|
|
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
|
|
|
|
|
|
|
# parse keyvalue pairs
|
|
|
|
while True:
|
|
|
|
match = re_keyvalue.match(line)
|
|
|
|
|
|
|
|
if not match:
|
|
|
|
try:
|
|
|
|
line += next(fp)
|
|
|
|
continue
|
|
|
|
except StopIteration:
|
|
|
|
raise SyntaxError("vdf.parse: unexpected EOF (open key quote?)",
|
|
|
|
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
|
|
|
|
|
|
|
key = match.group('key') if match.group('qkey') is None else match.group('qkey')
|
|
|
|
val = match.group('qval')
|
|
|
|
if val is None:
|
|
|
|
val = match.group('val')
|
|
|
|
if val is not None:
|
|
|
|
val = val.rstrip()
|
|
|
|
if val == "":
|
|
|
|
val = None
|
|
|
|
|
|
|
|
if escaped:
|
|
|
|
key = _unescape(key)
|
|
|
|
|
|
|
|
# we have a key with value in parenthesis, so we make a new dict obj (level deeper)
|
|
|
|
if val is None:
|
|
|
|
if merge_duplicate_keys and key in stack[-1]:
|
|
|
|
_m = stack[-1][key]
|
|
|
|
# we've descended a level deeper, if value is str, we have to overwrite it to mapper
|
|
|
|
if not isinstance(_m, mapper):
|
|
|
|
_m = stack[-1][key] = mapper()
|
|
|
|
else:
|
|
|
|
_m = mapper()
|
|
|
|
stack[-1][key] = _m
|
|
|
|
|
|
|
|
if match.group('eblock') is None:
|
|
|
|
# only expect a bracket if it's not already closed or on the same line
|
|
|
|
stack.append(_m)
|
|
|
|
if match.group('sblock') is None:
|
|
|
|
expect_bracket = True
|
|
|
|
|
|
|
|
# we've matched a simple keyvalue pair, map it to the last dict obj in the stack
|
|
|
|
else:
|
|
|
|
# if the value is line consume one more line and try to match again,
|
|
|
|
# until we get the KeyValue pair
|
|
|
|
if match.group('vq_end') is None and match.group('qval') is not None:
|
|
|
|
try:
|
|
|
|
line += next(fp)
|
|
|
|
continue
|
|
|
|
except StopIteration:
|
|
|
|
raise SyntaxError("vdf.parse: unexpected EOF (open quote for value?)",
|
|
|
|
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
|
|
|
|
|
|
|
stack[-1][key] = _unescape(val) if escaped else val
|
|
|
|
|
|
|
|
# exit the loop
|
|
|
|
break
|
|
|
|
|
|
|
|
if len(stack) != 1:
|
|
|
|
raise SyntaxError("vdf.parse: unclosed parenthasis or quotes (EOF)",
|
|
|
|
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
|
|
|
|
|
|
|
return stack.pop()
|
|
|
|
|
|
|
|
|
|
|
|
def loads(s, **kwargs):
|
|
|
|
"""
|
|
|
|
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a JSON
|
|
|
|
document) to a Python object.
|
|
|
|
"""
|
|
|
|
if not isinstance(s, string_type):
|
|
|
|
raise TypeError("Expected s to be a str, got %s" % type(s))
|
|
|
|
|
|
|
|
try:
|
|
|
|
fp = unicodeIO(s)
|
|
|
|
except TypeError:
|
|
|
|
fp = strIO(s)
|
|
|
|
|
|
|
|
return parse(fp, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
def load(fp, **kwargs):
|
|
|
|
"""
|
|
|
|
Deserialize ``fp`` (a ``.readline()``-supporting file-like object containing
|
|
|
|
a JSON document) to a Python object.
|
|
|
|
"""
|
|
|
|
return parse(fp, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
def dumps(obj, pretty=False, escaped=True):
|
|
|
|
"""
|
|
|
|
Serialize ``obj`` to a VDF formatted ``str``.
|
|
|
|
"""
|
|
|
|
if not isinstance(obj, Mapping):
|
|
|
|
raise TypeError("Expected data to be an instance of``dict``")
|
|
|
|
if not isinstance(pretty, bool):
|
|
|
|
raise TypeError("Expected pretty to be of type bool")
|
|
|
|
if not isinstance(escaped, bool):
|
|
|
|
raise TypeError("Expected escaped to be of type bool")
|
|
|
|
|
|
|
|
return ''.join(_dump_gen(obj, pretty, escaped))
|
|
|
|
|
|
|
|
|
|
|
|
def dump(obj, fp, pretty=False, escaped=True):
|
|
|
|
"""
|
|
|
|
Serialize ``obj`` as a VDF formatted stream to ``fp`` (a
|
|
|
|
``.write()``-supporting file-like object).
|
|
|
|
"""
|
|
|
|
if not isinstance(obj, Mapping):
|
|
|
|
raise TypeError("Expected data to be an instance of``dict``")
|
|
|
|
if not hasattr(fp, 'write'):
|
|
|
|
raise TypeError("Expected fp to have write() method")
|
|
|
|
if not isinstance(pretty, bool):
|
|
|
|
raise TypeError("Expected pretty to be of type bool")
|
|
|
|
if not isinstance(escaped, bool):
|
|
|
|
raise TypeError("Expected escaped to be of type bool")
|
|
|
|
|
|
|
|
for chunk in _dump_gen(obj, pretty, escaped):
|
|
|
|
fp.write(chunk)
|
|
|
|
|
|
|
|
|
|
|
|
def _dump_gen(data, pretty=False, escaped=True, level=0):
|
|
|
|
indent = "\t"
|
|
|
|
line_indent = ""
|
|
|
|
|
|
|
|
if pretty:
|
|
|
|
line_indent = indent * level
|
|
|
|
|
|
|
|
for key, value in data.items():
|
|
|
|
if escaped and isinstance(key, string_type):
|
|
|
|
key = _escape(key)
|
|
|
|
|
|
|
|
if isinstance(value, Mapping):
|
|
|
|
yield '%s"%s"\n%s{\n' % (line_indent, key, line_indent)
|
|
|
|
for chunk in _dump_gen(value, pretty, escaped, level+1):
|
|
|
|
yield chunk
|
|
|
|
yield "%s}\n" % line_indent
|
|
|
|
else:
|
|
|
|
if escaped and isinstance(value, string_type):
|
|
|
|
value = _escape(value)
|
|
|
|
|
|
|
|
yield '%s"%s" "%s"\n' % (line_indent, key, value)
|
|
|
|
|
|
|
|
|
|
|
|
# binary VDF
|
|
|
|
class BASE_INT(int_type):
|
|
|
|
def __repr__(self):
|
|
|
|
return "%s(%d)" % (self.__class__.__name__, self)
|
|
|
|
|
|
|
|
class UINT_64(BASE_INT):
|
|
|
|
pass
|
|
|
|
|
|
|
|
class INT_64(BASE_INT):
|
|
|
|
pass
|
|
|
|
|
|
|
|
class POINTER(BASE_INT):
|
|
|
|
pass
|
|
|
|
|
|
|
|
class COLOR(BASE_INT):
|
|
|
|
pass
|
|
|
|
|
|
|
|
BIN_NONE = b'\x00'
|
|
|
|
BIN_STRING = b'\x01'
|
|
|
|
BIN_INT32 = b'\x02'
|
|
|
|
BIN_FLOAT32 = b'\x03'
|
|
|
|
BIN_POINTER = b'\x04'
|
|
|
|
BIN_WIDESTRING = b'\x05'
|
|
|
|
BIN_COLOR = b'\x06'
|
|
|
|
BIN_UINT64 = b'\x07'
|
|
|
|
BIN_END = b'\x08'
|
|
|
|
BIN_INT64 = b'\x0A'
|
|
|
|
BIN_END_ALT = b'\x0B'
|
|
|
|
|
|
|
|
def binary_loads(b, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=True):
|
|
|
|
"""
|
|
|
|
Deserialize ``b`` (``bytes`` containing a VDF in "binary form")
|
|
|
|
to a Python object.
|
|
|
|
|
|
|
|
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
|
|
|
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
|
|
|
wish to preserve key order. Or any object that acts like a ``dict``.
|
|
|
|
|
|
|
|
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
|
|
|
same key into one instead of overwriting. You can se this to ``False`` if you are
|
|
|
|
using ``VDFDict`` and need to preserve the duplicates.
|
|
|
|
"""
|
|
|
|
if not isinstance(b, bytes):
|
|
|
|
raise TypeError("Expected s to be bytes, got %s" % type(b))
|
|
|
|
|
|
|
|
return binary_load(BytesIO(b), mapper, merge_duplicate_keys, alt_format, raise_on_remaining)
|
|
|
|
|
|
|
|
def binary_load(fp, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=False):
|
|
|
|
"""
|
|
|
|
Deserialize ``fp`` (a ``.read()``-supporting file-like object containing
|
|
|
|
binary VDF) to a Python object.
|
|
|
|
|
|
|
|
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
|
|
|
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
|
|
|
wish to preserve key order. Or any object that acts like a ``dict``.
|
|
|
|
|
|
|
|
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
|
|
|
same key into one instead of overwriting. You can se this to ``False`` if you are
|
|
|
|
using ``VDFDict`` and need to preserve the duplicates.
|
|
|
|
"""
|
|
|
|
if not hasattr(fp, 'read') or not hasattr(fp, 'tell') or not hasattr(fp, 'seek'):
|
|
|
|
raise TypeError("Expected fp to be a file-like object with tell()/seek() and read() returning bytes")
|
|
|
|
if not issubclass(mapper, Mapping):
|
|
|
|
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
|
|
|
|
|
|
|
|
# helpers
|
|
|
|
int32 = struct.Struct('<i')
|
|
|
|
uint64 = struct.Struct('<Q')
|
|
|
|
int64 = struct.Struct('<q')
|
|
|
|
float32 = struct.Struct('<f')
|
|
|
|
|
|
|
|
def read_string(fp, wide=False):
|
|
|
|
buf, end = b'', -1
|
|
|
|
offset = fp.tell()
|
|
|
|
|
|
|
|
# locate string end
|
|
|
|
while end == -1:
|
|
|
|
chunk = fp.read(64)
|
|
|
|
|
|
|
|
if chunk == b'':
|
|
|
|
raise SyntaxError("Unterminated cstring (offset: %d)" % offset)
|
|
|
|
|
|
|
|
buf += chunk
|
|
|
|
end = buf.find(b'\x00\x00' if wide else b'\x00')
|
|
|
|
|
|
|
|
if wide:
|
|
|
|
end += end % 2
|
|
|
|
|
|
|
|
# rewind fp
|
|
|
|
fp.seek(end - len(buf) + (2 if wide else 1), 1)
|
|
|
|
|
|
|
|
# decode string
|
|
|
|
result = buf[:end]
|
|
|
|
|
|
|
|
if wide:
|
|
|
|
result = result.decode('utf-16')
|
|
|
|
elif bytes is not str:
|
|
|
|
result = result.decode('utf-8', 'replace')
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
result.decode('ascii')
|
|
|
|
except:
|
|
|
|
result = result.decode('utf-8', 'replace')
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
stack = [mapper()]
|
|
|
|
CURRENT_BIN_END = BIN_END if not alt_format else BIN_END_ALT
|
|
|
|
|
|
|
|
for t in iter(lambda: fp.read(1), b''):
|
|
|
|
if t == CURRENT_BIN_END:
|
|
|
|
if len(stack) > 1:
|
|
|
|
stack.pop()
|
|
|
|
continue
|
|
|
|
break
|
|
|
|
|
|
|
|
key = read_string(fp)
|
|
|
|
|
|
|
|
if t == BIN_NONE:
|
|
|
|
if merge_duplicate_keys and key in stack[-1]:
|
|
|
|
_m = stack[-1][key]
|
|
|
|
else:
|
|
|
|
_m = mapper()
|
|
|
|
stack[-1][key] = _m
|
|
|
|
stack.append(_m)
|
|
|
|
elif t == BIN_STRING:
|
|
|
|
stack[-1][key] = read_string(fp)
|
|
|
|
elif t == BIN_WIDESTRING:
|
|
|
|
stack[-1][key] = read_string(fp, wide=True)
|
|
|
|
elif t in (BIN_INT32, BIN_POINTER, BIN_COLOR):
|
|
|
|
val = int32.unpack(fp.read(int32.size))[0]
|
|
|
|
|
|
|
|
if t == BIN_POINTER:
|
|
|
|
val = POINTER(val)
|
|
|
|
elif t == BIN_COLOR:
|
|
|
|
val = COLOR(val)
|
|
|
|
|
|
|
|
stack[-1][key] = val
|
|
|
|
elif t == BIN_UINT64:
|
|
|
|
stack[-1][key] = UINT_64(uint64.unpack(fp.read(int64.size))[0])
|
|
|
|
elif t == BIN_INT64:
|
|
|
|
stack[-1][key] = INT_64(int64.unpack(fp.read(int64.size))[0])
|
|
|
|
elif t == BIN_FLOAT32:
|
|
|
|
stack[-1][key] = float32.unpack(fp.read(float32.size))[0]
|
|
|
|
else:
|
|
|
|
raise SyntaxError("Unknown data type at offset %d: %s" % (fp.tell() - 1, repr(t)))
|
|
|
|
|
|
|
|
if len(stack) != 1:
|
|
|
|
raise SyntaxError("Reached EOF, but Binary VDF is incomplete")
|
|
|
|
if raise_on_remaining and fp.read(1) != b'':
|
|
|
|
fp.seek(-1, 1)
|
|
|
|
raise SyntaxError("Binary VDF ended at offset %d, but there is more data remaining" % (fp.tell() - 1))
|
|
|
|
|
|
|
|
return stack.pop()
|
|
|
|
|
|
|
|
def binary_dumps(obj, alt_format=False):
|
|
|
|
"""
|
|
|
|
Serialize ``obj`` to a binary VDF formatted ``bytes``.
|
|
|
|
"""
|
|
|
|
buf = BytesIO()
|
|
|
|
binary_dump(obj, buf, alt_format)
|
|
|
|
return buf.getvalue()
|
|
|
|
|
|
|
|
def binary_dump(obj, fp, alt_format=False):
|
|
|
|
"""
|
|
|
|
Serialize ``obj`` to a binary VDF formatted ``bytes`` and write it to ``fp`` filelike object
|
|
|
|
"""
|
|
|
|
if not isinstance(obj, Mapping):
|
|
|
|
raise TypeError("Expected obj to be type of Mapping")
|
|
|
|
if not hasattr(fp, 'write'):
|
|
|
|
raise TypeError("Expected fp to have write() method")
|
|
|
|
|
|
|
|
for chunk in _binary_dump_gen(obj, alt_format=alt_format):
|
|
|
|
fp.write(chunk)
|
|
|
|
|
|
|
|
def _binary_dump_gen(obj, level=0, alt_format=False):
|
|
|
|
if level == 0 and len(obj) == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
int32 = struct.Struct('<i')
|
|
|
|
uint64 = struct.Struct('<Q')
|
|
|
|
int64 = struct.Struct('<q')
|
|
|
|
float32 = struct.Struct('<f')
|
|
|
|
|
|
|
|
for key, value in obj.items():
|
|
|
|
if isinstance(key, string_type):
|
|
|
|
key = key.encode('utf-8')
|
|
|
|
else:
|
|
|
|
raise TypeError("dict keys must be of type str, got %s" % type(key))
|
|
|
|
|
|
|
|
if isinstance(value, Mapping):
|
|
|
|
yield BIN_NONE + key + BIN_NONE
|
|
|
|
for chunk in _binary_dump_gen(value, level+1, alt_format=alt_format):
|
|
|
|
yield chunk
|
|
|
|
elif isinstance(value, UINT_64):
|
|
|
|
yield BIN_UINT64 + key + BIN_NONE + uint64.pack(value)
|
|
|
|
elif isinstance(value, INT_64):
|
|
|
|
yield BIN_INT64 + key + BIN_NONE + int64.pack(value)
|
|
|
|
elif isinstance(value, string_type):
|
|
|
|
try:
|
|
|
|
value = value.encode('utf-8') + BIN_NONE
|
|
|
|
yield BIN_STRING
|
|
|
|
except:
|
|
|
|
value = value.encode('utf-16') + BIN_NONE*2
|
|
|
|
yield BIN_WIDESTRING
|
|
|
|
yield key + BIN_NONE + value
|
|
|
|
elif isinstance(value, float):
|
|
|
|
yield BIN_FLOAT32 + key + BIN_NONE + float32.pack(value)
|
|
|
|
elif isinstance(value, (COLOR, POINTER, int, int_type)):
|
|
|
|
if isinstance(value, COLOR):
|
|
|
|
yield BIN_COLOR
|
|
|
|
elif isinstance(value, POINTER):
|
|
|
|
yield BIN_POINTER
|
|
|
|
else:
|
|
|
|
yield BIN_INT32
|
|
|
|
yield key + BIN_NONE
|
|
|
|
yield int32.pack(value)
|
|
|
|
else:
|
|
|
|
raise TypeError("Unsupported type: %s" % type(value))
|
|
|
|
|
|
|
|
yield BIN_END if not alt_format else BIN_END_ALT
|
|
|
|
|
|
|
|
|
|
|
|
def vbkv_loads(s, mapper=dict, merge_duplicate_keys=True):
|
|
|
|
"""
|
|
|
|
Deserialize ``s`` (``bytes`` containing a VBKV to a Python object.
|
|
|
|
|
|
|
|
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
|
|
|
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
|
|
|
wish to preserve key order. Or any object that acts like a ``dict``.
|
|
|
|
|
|
|
|
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
|
|
|
same key into one instead of overwriting. You can se this to ``False`` if you are
|
|
|
|
using ``VDFDict`` and need to preserve the duplicates.
|
|
|
|
"""
|
|
|
|
if s[:4] != b'VBKV':
|
|
|
|
raise ValueError("Invalid header")
|
|
|
|
|
|
|
|
checksum, = struct.unpack('<i', s[4:8])
|
|
|
|
|
|
|
|
if checksum != crc32(s[8:]):
|
|
|
|
raise ValueError("Invalid checksum")
|
|
|
|
|
|
|
|
return binary_loads(s[8:], mapper, merge_duplicate_keys, alt_format=True)
|
|
|
|
|
|
|
|
def vbkv_dumps(obj):
|
|
|
|
"""
|
|
|
|
Serialize ``obj`` to a VBKV formatted ``bytes``.
|
|
|
|
"""
|
|
|
|
data = b''.join(_binary_dump_gen(obj, alt_format=True))
|
|
|
|
checksum = crc32(data)
|
|
|
|
|
|
|
|
return b'VBKV' + struct.pack('<i', checksum) + data
|