add a simple test framework for testing QObjects and their signal/slot mechanism
This commit is contained in:
85
electrum/tests/test_qml_types.py
Normal file
85
electrum/tests/test_qml_types.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from electrum import SimpleConfig
|
||||
from electrum.gui.qml.qetypes import QEAmount
|
||||
from electrum.tests.test_qt_base import QETestCase, QEventReceiver, qt_test
|
||||
|
||||
|
||||
class WalletMock:
|
||||
def __init__(self, electrum_path):
|
||||
self.config = SimpleConfig({
|
||||
'electrum_path': electrum_path,
|
||||
'decimal_point': 5
|
||||
})
|
||||
self.contacts = None
|
||||
|
||||
|
||||
class QETestTypes(QETestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.electrum_path = tempfile.mkdtemp()
|
||||
self.wallet = WalletMock(self.electrum_path)
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
shutil.rmtree(self.electrum_path)
|
||||
|
||||
@qt_test
|
||||
def test_qeamount(self):
|
||||
a = QEAmount()
|
||||
self.assertTrue(a.isEmpty)
|
||||
a_er = QEventReceiver(a.valueChanged)
|
||||
a.satsInt = 1
|
||||
self.assertTrue(bool(a_er.received))
|
||||
self.assertFalse(a.isEmpty)
|
||||
|
||||
a_er.clear()
|
||||
a.clear()
|
||||
self.assertTrue(a.isEmpty)
|
||||
self.assertTrue(bool(a_er.received))
|
||||
|
||||
a_er.clear()
|
||||
a.isMax = True
|
||||
self.assertTrue(a.isMax)
|
||||
self.assertFalse(a.isEmpty)
|
||||
self.assertTrue(bool(a_er.received))
|
||||
|
||||
@qt_test
|
||||
def test_qeamount_copy(self):
|
||||
a = QEAmount()
|
||||
b = QEAmount()
|
||||
b.satsInt = 1
|
||||
c = QEAmount()
|
||||
c.msatsInt = 1
|
||||
d = QEAmount()
|
||||
d.isMax = True
|
||||
|
||||
t = QEAmount()
|
||||
t_er = QEventReceiver(t.valueChanged)
|
||||
|
||||
t.copyFrom(a)
|
||||
self.assertTrue(t.isEmpty)
|
||||
self.assertEqual(0, len(t_er.received))
|
||||
|
||||
t.clear()
|
||||
t_er.clear()
|
||||
t.copyFrom(b)
|
||||
self.assertFalse(t.isEmpty)
|
||||
self.assertEqual(t.satsInt, 1)
|
||||
self.assertEqual(1, len(t_er.received))
|
||||
|
||||
t.clear()
|
||||
t_er.clear()
|
||||
t.copyFrom(c)
|
||||
self.assertFalse(t.isEmpty)
|
||||
self.assertEqual(t.msatsInt, 1)
|
||||
self.assertEqual(1, len(t_er.received))
|
||||
|
||||
t.clear()
|
||||
t_er.clear()
|
||||
t.copyFrom(d)
|
||||
self.assertFalse(t.isEmpty)
|
||||
self.assertTrue(t.isMax)
|
||||
self.assertEqual(1, len(t_er.received))
|
||||
92
electrum/tests/test_qt_base.py
Normal file
92
electrum/tests/test_qt_base.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import threading
|
||||
import unittest
|
||||
from functools import wraps, partial
|
||||
from unittest import SkipTest
|
||||
|
||||
from PyQt6.QtCore import QCoreApplication, QTimer, QMetaObject, Qt, pyqtSlot, QObject
|
||||
|
||||
|
||||
class TestQCoreApplication(QCoreApplication):
|
||||
@pyqtSlot()
|
||||
def doInvoke(self):
|
||||
getattr(self._instance, self._method)()
|
||||
|
||||
|
||||
class QEventReceiver(QObject):
|
||||
def __init__(self, *signals):
|
||||
super().__init__()
|
||||
self.received = []
|
||||
self.signals = []
|
||||
for signal in signals:
|
||||
self.signals.append(signal)
|
||||
signal.connect(partial(self.doReceive, signal))
|
||||
|
||||
# intentionally no pyqtSlot decorator, to catch all
|
||||
def doReceive(self, signal, *args):
|
||||
self.received.append((signal, args))
|
||||
|
||||
def receivedForSignal(self, signal):
|
||||
return list(filter(lambda x: x[0] == signal, self.received))
|
||||
|
||||
def clear(self):
|
||||
self.received.clear()
|
||||
|
||||
|
||||
class QETestCase(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app = None
|
||||
self._e = None
|
||||
self._event = threading.Event()
|
||||
|
||||
def start_qt_task():
|
||||
self.app = TestQCoreApplication([])
|
||||
|
||||
self.timer = QTimer(self.app)
|
||||
self.timer.setSingleShot(False)
|
||||
self.timer.setInterval(500) # msec
|
||||
self.timer.timeout.connect(lambda: None) # periodically enter python scope
|
||||
|
||||
self.app.exec()
|
||||
self.app = None
|
||||
|
||||
self.qt_thread = threading.Thread(target=start_qt_task)
|
||||
self.qt_thread.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.app.exit()
|
||||
if self.qt_thread.is_alive():
|
||||
self.qt_thread.join()
|
||||
|
||||
|
||||
def qt_test(func):
|
||||
@wraps(func)
|
||||
def decorator(self, *args):
|
||||
if threading.current_thread().name == 'MainThread':
|
||||
self.app._instance = self
|
||||
self.app._method = func.__name__
|
||||
QMetaObject.invokeMethod(self.app, 'doInvoke', Qt.ConnectionType.QueuedConnection)
|
||||
self._event.wait(15)
|
||||
if self._e:
|
||||
print(f'raising ex: {self._e!r}')
|
||||
# deallocate stored exception from qt thread otherwise we SEGV garbage collector
|
||||
# instead, re-create using the exception message, special casing AssertionError and SkipTest
|
||||
e = None
|
||||
if isinstance(self._e, AssertionError):
|
||||
e = AssertionError(str(self._e))
|
||||
elif isinstance(self._e, SkipTest):
|
||||
e = SkipTest(str(self._e))
|
||||
else:
|
||||
e = Exception(str(self._e))
|
||||
self._e = None
|
||||
raise e
|
||||
return
|
||||
try:
|
||||
func(self, *args)
|
||||
except Exception as e:
|
||||
self._e = e
|
||||
finally:
|
||||
self._event.set()
|
||||
self._event.clear()
|
||||
return decorator
|
||||
Reference in New Issue
Block a user