diff --git a/electrum/json_db.py b/electrum/json_db.py index b49398338..144a958f9 100644 --- a/electrum/json_db.py +++ b/electrum/json_db.py @@ -25,7 +25,7 @@ import threading import copy import json -from typing import TYPE_CHECKING, Optional, Sequence, List, Union +from typing import TYPE_CHECKING, Optional, Sequence, List, Union, Any import jsonpatch import jsonpointer @@ -97,13 +97,16 @@ def stored_in(name, _type=dict): return func return decorator +_FLEX_KEY = str | int | None -def key_path(path: Sequence[Union[str, int]], key: Optional[str]) -> str: - def to_str(x): +def key_path(path: Sequence[_FLEX_KEY], key: _FLEX_KEY) -> str: + def to_str(x: _FLEX_KEY) -> str: + assert isinstance(x, _FLEX_KEY), repr(x) + assert x is not None if isinstance(x, int): return str(int(x)) else: - assert isinstance(x, str) + assert isinstance(x, str), f"unexpected key type for: {x!r}" return x items = [to_str(x) for x in path] if key is not None: @@ -113,15 +116,17 @@ def key_path(path: Sequence[Union[str, int]], key: Optional[str]) -> str: class BaseStoredObject: _db: 'JsonDB' = None - _key = None - _parent = None - _lock = None + _key: _FLEX_KEY = None + _parent: Optional['BaseStoredObject'] = None + _lock: threading.RLock = None def set_db(self, db): self._db = db self._lock = self._db.lock if self._db else threading.RLock() - def set_parent(self, key, parent): + def set_parent(self, *, key: _FLEX_KEY, parent: Optional['BaseStoredObject']) -> None: + assert (key == "") == (parent is None), f"{key=!r}, {parent=!r}" + assert isinstance(key, _FLEX_KEY), repr(key) self._key = key self._parent = parent @@ -130,7 +135,7 @@ class BaseStoredObject: return self._lock @property - def path(self) -> Sequence[str]: + def path(self) -> Sequence[_FLEX_KEY] | None: # return None iff we are pruned from root x = self s = [x._key] @@ -142,15 +147,18 @@ class BaseStoredObject: assert self._db is not None return s - def db_add(self, key, value): + def db_add(self, key: _FLEX_KEY, value) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) if self.path: self._db.add(self.path, key, value) - def db_replace(self, key, value): + def db_replace(self, key: _FLEX_KEY, value) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) if self.path: self._db.replace(self.path, key, value) - def db_remove(self, key): + def db_remove(self, key: _FLEX_KEY) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) if self.path: self._db.remove(self.path, key) @@ -158,7 +166,8 @@ class BaseStoredObject: class StoredObject(BaseStoredObject): """for attr.s objects """ - def __setattr__(self, key, value): + def __setattr__(self, key: str, value): + assert isinstance(key, str), repr(key) if self.path and not key.startswith('_'): if value != getattr(self, key): self.db_replace(key, value) @@ -185,7 +194,8 @@ class StoredDict(dict, BaseStoredObject): self.__setitem__(k, v) @locked - def __setitem__(self, key, v): + def __setitem__(self, key: _FLEX_KEY, v) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) is_new = key not in self # early return to prevent unnecessary disk writes if not is_new and self._db and json.dumps(v, cls=self._db.encoder) == json.dumps(self[key], cls=self._db.encoder): @@ -204,18 +214,20 @@ class StoredDict(dict, BaseStoredObject): v.set_db(self._db) # set parent if isinstance(v, BaseStoredObject): - v.set_parent(key, self) + v.set_parent(key=key, parent=self) # set item dict.__setitem__(self, key, v) self.db_add(key, v) if is_new else self.db_replace(key, v) @locked - def __delitem__(self, key): + def __delitem__(self, key: _FLEX_KEY) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) dict.__delitem__(self, key) self.db_remove(key) @locked - def pop(self, key, v=_RaiseKeyError): + def pop(self, key: _FLEX_KEY, v=_RaiseKeyError) -> Any: + assert isinstance(key, _FLEX_KEY), repr(key) if key not in self: if v is _RaiseKeyError: raise KeyError(key) @@ -227,7 +239,8 @@ class StoredDict(dict, BaseStoredObject): r._parent = None return r - def setdefault(self, key, default = None, /): + def setdefault(self, key: _FLEX_KEY, default = None, /): + assert isinstance(key, _FLEX_KEY), repr(key) if key not in self: self.__setitem__(key, default) return self[key] @@ -283,7 +296,7 @@ class JsonDB(Logger): data = self._convert_dict([], data) # convert dict to StoredDict self.data = StoredDict(data, self) - self.data.set_parent('', None) + self.data.set_parent(key='', parent=None) # write file in case there was a db upgrade if self.storage and self.storage.file_exists(): self.write_and_force_consolidation() @@ -357,13 +370,16 @@ class JsonDB(Logger): self.pending_changes.append(json.dumps(patch, cls=self.encoder)) self.set_modified(True) - def add(self, path, key, value): + def add(self, path, key: _FLEX_KEY, value) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) self.add_patch({'op': 'add', 'path': key_path(path, key), 'value': value}) - def replace(self, path, key, value): + def replace(self, path, key: _FLEX_KEY, value) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) self.add_patch({'op': 'replace', 'path': key_path(path, key), 'value': value}) - def remove(self, path, key): + def remove(self, path, key: _FLEX_KEY) -> None: + assert isinstance(key, _FLEX_KEY), repr(key) self.add_patch({'op': 'remove', 'path': key_path(path, key)}) @locked @@ -419,7 +435,9 @@ class JsonDB(Logger): def _should_convert_to_stored_dict(self, key) -> bool: return True - def _convert_dict_key(self, path): + def _convert_dict_key(self, path: List[str]) -> _FLEX_KEY: + """Maybe convert key from str to python type (typically int or IntEnum)""" + assert all(isinstance(x, str) for x in path), repr(path) key = path[-1] parent_key = path[-2] if len(path) > 1 else None gp_key = path[-3] if len(path) > 2 else None @@ -431,9 +449,11 @@ class JsonDB(Logger): convert_key = None if convert_key: key = convert_key(key) + assert isinstance(key, _FLEX_KEY), f"unexpected type for {key=!r} at {path=}" return key - def _convert_dict_value(self, path, v): + def _convert_dict_value(self, path: List[str], v) -> Any: + assert all(isinstance(x, str) for x in path), repr(path) key = path[-1] if key in registered_dicts: constructor, _type = registered_dicts[key] @@ -453,8 +473,9 @@ class JsonDB(Logger): v = self._convert_dict(path, v) return v - def _convert_dict(self, path, data: dict): - # recursively convert dict to StoredDict + def _convert_dict(self, path: List[str], data: dict): + # recursively convert json dict to StoredDict + assert all(isinstance(x, str) for x in path), repr(path) d = {} for k, v in list(data.items()): child_path = path + [k]