@@ -50,6 +50,9 @@ class StorageEncryptionVersion(IntEnum):
|
||||
XPUB_PASSWORD = 2
|
||||
|
||||
|
||||
class StorageReadWriteError(Exception): pass
|
||||
|
||||
|
||||
class WalletStorage(Logger):
|
||||
|
||||
def __init__(self, path, *, manual_upgrades=False):
|
||||
@@ -61,7 +64,7 @@ class WalletStorage(Logger):
|
||||
DB_Class = JsonDB
|
||||
self.logger.info(f"wallet path {self.path}")
|
||||
self.pubkey = None
|
||||
# TODO we should test r/w permissions here (whether file exists or not)
|
||||
self._test_read_write_permissions(self.path)
|
||||
if self.file_exists():
|
||||
with open(self.path, "r", encoding='utf-8') as f:
|
||||
self.raw = f.read()
|
||||
@@ -74,6 +77,27 @@ class WalletStorage(Logger):
|
||||
# avoid new wallets getting 'upgraded'
|
||||
self.db = DB_Class('', manual_upgrades=False)
|
||||
|
||||
@classmethod
|
||||
def _test_read_write_permissions(cls, path):
|
||||
# note: There might already be a file at 'path'.
|
||||
# Make sure we do NOT overwrite/corrupt that!
|
||||
temp_path = "%s.tmptest.%s" % (path, os.getpid())
|
||||
echo = "fs r/w test"
|
||||
try:
|
||||
# test READ permissions for actual path
|
||||
if os.path.exists(path):
|
||||
with open(path, "r", encoding='utf-8') as f:
|
||||
f.read(1) # read 1 byte
|
||||
# test R/W sanity for "similar" path
|
||||
with open(temp_path, "w", encoding='utf-8') as f:
|
||||
f.write(echo)
|
||||
with open(temp_path, "r", encoding='utf-8') as f:
|
||||
echo2 = f.read()
|
||||
os.remove(temp_path)
|
||||
except Exception as e:
|
||||
raise StorageReadWriteError(e) from e
|
||||
if echo != echo2:
|
||||
raise StorageReadWriteError('echo sanity-check failed')
|
||||
|
||||
def load_plugins(self):
|
||||
wallet_type = self.db.get('wallet_type')
|
||||
|
||||
Reference in New Issue
Block a user