Merge pull request #10026 from SomberNight/202507_jsonpatch_monkeypatch_exc
JsonDB: monkeypatch jsonpatch exceptions to avoid leaking secrets
This commit is contained in:
@@ -28,14 +28,30 @@ import json
|
||||
from typing import TYPE_CHECKING, Optional, Sequence, List, Union
|
||||
|
||||
import jsonpatch
|
||||
import jsonpointer
|
||||
|
||||
from . import util
|
||||
from .util import WalletFileException, profiler
|
||||
from .util import WalletFileException, profiler, sticky_property
|
||||
from .logging import Logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .storage import WalletStorage
|
||||
|
||||
|
||||
# We monkeypatch exceptions in the jsonpatch package to ensure they do not contain secrets from the DB.
|
||||
# We often log exceptions and offer to send them to the crash reporter, so they must not contain secrets.
|
||||
jsonpointer.JsonPointerException.__str__ = lambda self: """(JPE) 'redacted'"""
|
||||
jsonpointer.JsonPointerException.__repr__ = lambda self: """<JsonPointerException 'redacted'>"""
|
||||
setattr(jsonpointer.JsonPointerException, '__cause__', sticky_property(None))
|
||||
setattr(jsonpointer.JsonPointerException, '__context__', sticky_property(None))
|
||||
setattr(jsonpointer.JsonPointerException, '__suppress_context__', sticky_property(True))
|
||||
jsonpatch.JsonPatchException.__str__ = lambda self: """(JPE) 'redacted'"""
|
||||
jsonpatch.JsonPatchException.__repr__ = lambda self: """<JsonPatchException 'redacted'>"""
|
||||
setattr(jsonpatch.JsonPatchException, '__cause__', sticky_property(None))
|
||||
setattr(jsonpatch.JsonPatchException, '__context__', sticky_property(None))
|
||||
setattr(jsonpatch.JsonPatchException, '__suppress_context__', sticky_property(True))
|
||||
|
||||
|
||||
def modifier(func):
|
||||
def wrapper(self, *args, **kwargs):
|
||||
with self.lock:
|
||||
|
||||
@@ -2199,6 +2199,30 @@ class classproperty(property):
|
||||
return self.fget(owner_cls)
|
||||
|
||||
|
||||
def sticky_property(val):
|
||||
"""Creates a 'property' whose value cannot be changed and that cannot be deleted.
|
||||
Attempts to change the value are silently ignored.
|
||||
|
||||
>>> class C: pass
|
||||
...
|
||||
>>> setattr(C, 'x', sticky_property(3))
|
||||
>>> c = C()
|
||||
>>> c.x
|
||||
3
|
||||
>>> c.x = 2
|
||||
>>> c.x
|
||||
3
|
||||
>>> del c.x
|
||||
>>> c.x
|
||||
3
|
||||
"""
|
||||
return property(
|
||||
fget=lambda self: val,
|
||||
fset=lambda *args, **kwargs: None,
|
||||
fdel=lambda *args, **kwargs: None,
|
||||
)
|
||||
|
||||
|
||||
def get_running_loop() -> Optional[asyncio.AbstractEventLoop]:
|
||||
"""Returns the asyncio event loop that is *running in this thread*, if any."""
|
||||
try:
|
||||
|
||||
86
tests/test_jsondb.py
Normal file
86
tests/test_jsondb.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import contextlib
|
||||
import copy
|
||||
import traceback
|
||||
|
||||
import jsonpatch
|
||||
from jsonpatch import JsonPatchException
|
||||
from jsonpointer import JsonPointerException
|
||||
|
||||
from . import ElectrumTestCase
|
||||
|
||||
|
||||
class TestJsonpatch(ElectrumTestCase):
|
||||
|
||||
async def test_op_replace(self):
|
||||
data1 = {'foo': 'bar', 'numbers': [1, 3, 4, 8], 'dictlevelA1': {'secret1': 2, 'secret2': 4, 'secret3': 6}}
|
||||
patches = [{"op": "replace", "path": "/dictlevelA1/secret2", "value": 2222}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
data2 = jpatch.apply(data1)
|
||||
self.assertEqual(
|
||||
{'foo': 'bar', 'numbers': [1, 3, 4, 8], 'dictlevelA1': {'secret1': 2, 'secret2': 2222, 'secret3': 6}},
|
||||
data2
|
||||
)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _customAssertRaises(self, *args, **kwargs):
|
||||
with self.assertRaises(*args, **kwargs) as ctx:
|
||||
try:
|
||||
yield ctx
|
||||
except Exception as e:
|
||||
# save original traceback now, as assertRaises will destroy most of it imminently:
|
||||
ctx._customctx_original_tb = "".join(traceback.format_exception(e))
|
||||
raise
|
||||
|
||||
async def test_patch_does_not_leak_privatekeys(self):
|
||||
data1 = {
|
||||
'dictlevelB1': 'secret77',
|
||||
'dictlevelC1': [1, "secret99", 4, 8],
|
||||
'dictlevelA1': {"dictlevelA2_aa": "secret11", "dictlevelA2_bb": "secret12", "dictlevelA2_cc": "secret13"}}
|
||||
def fail_if_leaking_secret(ctx) -> None:
|
||||
self.assertNotIn("secret", str(ctx.exception))
|
||||
self.assertNotIn("secret", repr(ctx.exception))
|
||||
self.assertNotIn("secret", ctx._customctx_original_tb)
|
||||
self.assertNotIn("dictlevel", str(ctx.exception))
|
||||
self.assertNotIn("dictlevel", repr(ctx.exception))
|
||||
self.assertNotIn("dictlevel", ctx._customctx_original_tb)
|
||||
self.assertIn("redacted", str(ctx.exception)) # injected by our monkeypatching
|
||||
self.assertIn("redacted", repr(ctx.exception)) # injected by our monkeypatching
|
||||
# op "replace"
|
||||
with self.subTest(msg="replace_dict_inner_key_missing"):
|
||||
patches = [{"op": "replace", "path": "/dictlevelA1/dictlevelX2", "value": "nakamoto_secret"}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
with self._customAssertRaises(JsonPatchException) as ctx:
|
||||
data2 = jpatch.apply(data1)
|
||||
fail_if_leaking_secret(ctx)
|
||||
with self.subTest(msg="replace_dict_outer_key_missing"):
|
||||
patches = [{"op": "replace", "path": "/dictlevelX1/dictlevelX2", "value": "nakamoto_secret"}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
with self._customAssertRaises(JsonPointerException) as ctx:
|
||||
data2 = jpatch.apply(data1)
|
||||
fail_if_leaking_secret(ctx)
|
||||
# op "remove"
|
||||
with self.subTest(msg="remove_dict_inner_key_missing"):
|
||||
patches = [{"op": "remove", "path": "/dictlevelA1/dictlevelX2"}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
with self._customAssertRaises(JsonPatchException) as ctx:
|
||||
data2 = jpatch.apply(data1)
|
||||
fail_if_leaking_secret(ctx)
|
||||
with self.subTest(msg="remove_dict_outer_key_missing"):
|
||||
patches = [{"op": "remove", "path": "/dictlevelX1/dictlevelX2"}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
with self._customAssertRaises(JsonPointerException) as ctx:
|
||||
data2 = jpatch.apply(data1)
|
||||
fail_if_leaking_secret(ctx)
|
||||
# op "add"
|
||||
with self.subTest(msg="add_dict_inner_key_missing"):
|
||||
patches = [{"op": "add", "path": "/dictlevelA1/dictlevelX2/dictlevelX3/dictlevelX4", "value": "nakamoto_secret"}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
with self._customAssertRaises(JsonPointerException) as ctx:
|
||||
data2 = jpatch.apply(data1)
|
||||
fail_if_leaking_secret(ctx)
|
||||
with self.subTest(msg="add_dict_outer_key_missing"):
|
||||
patches = [{"op": "add", "path": "/dictlevelX1/dictlevelX2/dictlevelX3/dictlevelX4", "value": "nakamoto_secret"}]
|
||||
jpatch = jsonpatch.JsonPatch(patches)
|
||||
with self._customAssertRaises(JsonPointerException) as ctx:
|
||||
data2 = jpatch.apply(data1)
|
||||
fail_if_leaking_secret(ctx)
|
||||
Reference in New Issue
Block a user