From 3a39abc41560219042d5e161897d2a25b30710c2 Mon Sep 17 00:00:00 2001 From: f321x Date: Thu, 5 Jun 2025 10:28:57 +0200 Subject: [PATCH 1/2] fix: use single long lived transport in qeswaphelper changes qeswaphelper to shate a single, long lived transport instance instead of opening new transports to do swaps and fetch offers. This allows to continuosly fetch offers, so events which get returned later by slow relays don't get missed and the fee values stay updated. Also fixes a race causing the list to miss some swapservers, as the current implementation fetches only until `swap_manager.is_initialized()` is set, which will get set as soon as an event of the configured swapserver is received. So if the event of the configured swapserver is received as first, all server events coming in after it would get ignored. --- .../qml/components/NostrSwapServersDialog.qml | 13 + electrum/gui/qml/components/SwapDialog.qml | 2 +- electrum/gui/qml/components/main.qml | 2 +- electrum/gui/qml/qeswaphelper.py | 297 ++++++++++-------- electrum/submarine_swaps.py | 20 ++ 5 files changed, 202 insertions(+), 132 deletions(-) diff --git a/electrum/gui/qml/components/NostrSwapServersDialog.qml b/electrum/gui/qml/components/NostrSwapServersDialog.qml index b134b4f23..3d9b46fb5 100644 --- a/electrum/gui/qml/components/NostrSwapServersDialog.qml +++ b/electrum/gui/qml/components/NostrSwapServersDialog.qml @@ -51,6 +51,19 @@ ElDialog { clip: true model: swaphelper.availableSwapServers + Connections { + target: swaphelper + function onOffersUpdated() { + listview.model = null + listview.model = swaphelper.availableSwapServers + listview.forceLayout() + if (dialog.selectedPubkey) { + listview.currentIndex = swaphelper.availableSwapServers.indexFor(dialog.selectedPubkey) + } + console.log("swapserver list refreshed") + } + } + delegate: ItemDelegate { width: ListView.view.width height: itemLayout.height diff --git a/electrum/gui/qml/components/SwapDialog.qml b/electrum/gui/qml/components/SwapDialog.qml index 68c4f0d6e..a0ab14cd3 100644 --- a/electrum/gui/qml/components/SwapDialog.qml +++ b/electrum/gui/qml/components/SwapDialog.qml @@ -271,7 +271,7 @@ ElDialog { dialog.accepted.connect(function() { if (Config.swapServerNPub != dialog.selectedPubkey) { Config.swapServerNPub = dialog.selectedPubkey - _swaphelper.init_swap_manager() + _swaphelper.setReadyState() } }) dialog.open() diff --git a/electrum/gui/qml/components/main.qml b/electrum/gui/qml/components/main.qml index 3b50d00d2..826d7a27d 100644 --- a/electrum/gui/qml/components/main.qml +++ b/electrum/gui/qml/components/main.qml @@ -500,7 +500,7 @@ ApplicationWindow }) dialog.accepted.connect(function() { Config.swapServerNPub = dialog.selectedPubkey - _swaphelper.init_swap_manager() + _swaphelper.setReadyState() }) dialog.rejected.connect(function() { _swaphelper.npubSelectionCancelled() diff --git a/electrum/gui/qml/qeswaphelper.py b/electrum/gui/qml/qeswaphelper.py index b694b7962..666dc32f1 100644 --- a/electrum/gui/qml/qeswaphelper.py +++ b/electrum/gui/qml/qeswaphelper.py @@ -1,6 +1,4 @@ import asyncio -import concurrent -import threading from enum import IntEnum from typing import Union, Optional, TYPE_CHECKING, Sequence @@ -11,7 +9,8 @@ from electrum.i18n import _ from electrum.bitcoin import DummyAddress from electrum.logging import get_logger from electrum.transaction import PartialTxOutput, PartialTransaction -from electrum.util import NotEnoughFunds, NoDynamicFeeEstimates, profiler, get_asyncio_loop, age +from electrum.util import (NotEnoughFunds, NoDynamicFeeEstimates, profiler, get_asyncio_loop, age, + wait_for2) from electrum.submarine_swaps import NostrTransport, SwapServerTransport from electrum.fee_policy import FeePolicy @@ -153,8 +152,14 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): self.requestTxUpdate.connect(self.tx_update_pushback_timer) self.offersUpdated.connect(self.on_offers_updated) + self.transport_task: Optional[asyncio.Task] = None + self.swap_transport: Optional[SwapServerTransport] = None + self.recent_offers = [] def on_destroy(self): + if self.transport_task is not None: + self.transport_task.cancel() + self.transport_task = None self.unregister_callbacks() walletChanged = pyqtSignal() @@ -166,7 +171,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): def wallet(self, wallet: QEWallet): if self._wallet != wallet: self._wallet = wallet - self.init_swap_manager() + self.run_swap_manager() self.walletChanged.emit() sliderPosChanged = pyqtSignal() @@ -338,9 +343,8 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): def isNostr(self): return True # TODO - @pyqtSlot() - def init_swap_manager(self): - self._logger.debug('init_swap_manager') + def run_swap_manager(self): + self._logger.debug('run_swap_manager') if (lnworker := self._wallet.wallet.lnworker) is None: return swap_manager = lnworker.swap_manager @@ -356,73 +360,107 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): swap_transport = swap_manager.create_transport() - def query_task(transport: SwapServerTransport): - with transport: - try: - async def wait_initialized(): - try: - await asyncio.wait_for(swap_manager.is_initialized.wait(), timeout=15) - self._logger.debug('swapmanager initialized') - self.state = QESwapHelper.State.Initialized - except asyncio.TimeoutError: - self._logger.debug('swapmanager init timeout') - self.state = QESwapHelper.State.NoService - return - - if not swap_manager.is_initialized.is_set(): - self.userinfo = _('Initializing...') - fut = asyncio.run_coroutine_threadsafe(wait_initialized(), get_asyncio_loop()) - fut.result() - except Exception as e: - try: # swaphelper might be destroyed at this point - self.userinfo = _('Error') + ': ' + str(e) - self.state = QESwapHelper.State.NoService - self._logger.error(str(e)) - except RuntimeError: - pass - if isinstance(transport, NostrTransport): - if not swap_manager.is_initialized.is_set(): - if not transport.is_connected.is_set(): + async def swap_transport_task(transport: SwapServerTransport): + async with transport: + self.swap_transport = transport + if not swap_manager.is_initialized.is_set(): + self.userinfo = _('Initializing...') + try: + # is_initialized is set if we receive the event of our configured SWAPSERVER_NPUB + # This will timeout if no server is configured, or our server didn't publish recently. + timeout = transport.connect_timeout + 1 + await wait_for2(swap_manager.is_initialized.wait(), timeout=timeout) + self._logger.debug('swapmanager initialized') + self.state = QESwapHelper.State.Initialized + except asyncio.TimeoutError: + # only fail if we didn't get any offers or couldn't connect at all + # otherwise the timeout just means that no offer of the selected npub has + # been found (or that there is no npub selected at all), so the prompt should open + if isinstance(transport, NostrTransport) and not transport.is_connected.is_set(): self.userinfo = _('Error') + ': ' + '\n'.join([ _('Could not connect to a Nostr relay.'), _('Please check your relays and network connection') ]) self.state = QESwapHelper.State.NoService return - self.recent_offers = [x for x in transport.get_recent_offers()] - if not self.recent_offers: + elif not isinstance(transport, NostrTransport) or not transport.get_recent_offers(): + self._logger.debug('Could not find a swap provider.') self.userinfo = _('Could not find a swap provider.') self.state = QESwapHelper.State.NoService return - - self.offersUpdated.emit() - self.undefinedNPub.emit() + except Exception as e: + try: # swaphelper might be destroyed at this point + self.userinfo = _('Error') + ': ' + str(e) + self.state = QESwapHelper.State.NoService + self._logger.error(str(e)) + except RuntimeError: + pass return - else: - self.recent_offers = [x for x in transport.get_recent_offers()] - if not self.recent_offers: - self.userinfo = _('Could not find a swap provider.') - self.state = QESwapHelper.State.NoService - return - self.offersUpdated.emit() + if isinstance(transport, NostrTransport) and not swap_manager.is_initialized.is_set(): + # not is_initialized.is_set() = configured provider was not found (or no provider configured) + # prompt user to select a swapserver + self.recent_offers = transport.get_recent_offers() + self.offersUpdated.emit() + self.undefinedNPub.emit() + elif swap_manager.is_initialized.is_set(): + self.setReadyState() - self.state = QESwapHelper.State.ServiceReady - self.userinfo = QESwapHelper.MESSAGE_SWAP_HOWTO - self.init_swap_slider_range() + while True: + # keep fetching new incoming offer events + # the slider range will not get updated continuously as it would irritate the user + if isinstance(transport, NostrTransport): + if (recent_offers := transport.get_recent_offers()) != self.recent_offers: + self._logger.debug(f"received new swap offer") + self.recent_offers = recent_offers + self.offersUpdated.emit() + await asyncio.sleep(1) - threading.Thread(target=query_task, args=(swap_transport,), daemon=True).start() + async def handle_swap_transport(transport: SwapServerTransport): + # ensures that swap_transport is always set None if transport closes + try: + await swap_transport_task(transport) + finally: + self.swap_transport = None + + self.transport_task = asyncio.run_coroutine_threadsafe( + handle_swap_transport(swap_transport), + get_asyncio_loop() + ) @pyqtSlot() def npubSelectionCancelled(self): - if not self._wallet.wallet.config.SWAPSERVER_NPUB: + if (self._wallet.wallet.config.SWAPSERVER_NPUB + not in [offer.server_npub for offer in self.recent_offers]): self._logger.debug('nostr is preferred but swapserver npub still undefined') - self.userinfo = _('No swap provider selected.') + if not self._wallet.wallet.config.SWAPSERVER_NPUB: + self.userinfo = _('No swap provider selected.') + else: + self.userinfo = _('Select one of the available swap providers.') self.state = QESwapHelper.State.NoService - def init_swap_slider_range(self): + @pyqtSlot() + def setReadyState(self): + if self._wallet.wallet.config.SWAPSERVER_NPUB \ + or not isinstance(self.swap_transport, NostrTransport): + self.state = QESwapHelper.State.ServiceReady + self.userinfo = QESwapHelper.MESSAGE_SWAP_HOWTO + self.initSwapSliderRange() + + def update_swap_manager_pair(self): + """Updates the swap manager pairs to the recent pairs of the selected server""" + assert self.swap_transport is not None, "No swap transport" + if isinstance(self.swap_transport, NostrTransport): + swap_manager = self._wallet.wallet.lnworker.swap_manager + pair = self.swap_transport.get_offer(self._wallet.wallet.config.SWAPSERVER_NPUB) + swap_manager.update_pairs(pair.pairs) + + @pyqtSlot() + def initSwapSliderRange(self): lnworker = self._wallet.wallet.lnworker swap_manager = lnworker.swap_manager + # update the swap_manager pair so the newest available data is used below + self.update_swap_manager_pair() """Sets the minimal and maximal amount that can be swapped for the swap slider.""" @@ -443,7 +481,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): max_onchain_spend)) # we expect range to adjust the value of the swap slider to be in the # correct range, i.e., to correct an overflow when reducing the limits - self._logger.debug(f'Slider range {-reverse} - {forward}') + self._logger.debug(f'Slider range {-reverse} - {forward}. Pos {self._sliderPos}') self.rangeMin = -reverse self.rangeMax = forward # percentage of void, right or left @@ -459,6 +497,12 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): self.leftVoidChanged.emit() self.rightVoidChanged.emit() + if not self.rangeMin <= self._sliderPos <= self.rangeMax: + # clamp the slider pos into the given limits + if abs(self._sliderPos - self.rangeMin) < abs(self._sliderPos - self.rangeMax): + self._sliderPos = self.rangeMin + else: + self._sliderPos = self.rangeMax self.swap_slider_moved() @profiler @@ -504,7 +548,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): self._receive_amount = swap_manager.get_recv_amount(send_amount=self._send_amount, is_reverse=self.isReverse) self.toreceive = QEAmount(amount_sat=self._receive_amount) # fee breakdown - self.serverfeeperc = f'{swap_manager.percentage:0.1f}%' + self.serverfeeperc = f'{swap_manager.percentage:0.2f}%' server_miningfee = swap_manager.mining_fee self.serverMiningfee = QEAmount(amount_sat=server_miningfee) if self.isReverse: @@ -540,57 +584,54 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): assert self._tx if lightning_amount is None or onchain_amount is None: return - loop = get_asyncio_loop() - def swap_task(): - with self._wallet.wallet.lnworker.swap_manager.create_transport() as transport: - coro = self._wallet.wallet.lnworker.swap_manager.request_normal_swap( - transport, + async def swap_task(): + assert self.swap_transport is not None, "Swap transport not available" + try: + dummy_tx = self._create_tx(onchain_amount) + self.userinfo = _('Performing swap...') + self.state = QESwapHelper.State.Started + self._swap, invoice = await self._wallet.wallet.lnworker.swap_manager.request_normal_swap( + self.swap_transport, lightning_amount_sat=lightning_amount, expected_onchain_amount_sat=onchain_amount, ) - try: - dummy_tx = self._create_tx(onchain_amount) - fut = asyncio.run_coroutine_threadsafe(coro, loop) - self.userinfo = _('Performing swap...') - self.state = QESwapHelper.State.Started - self._swap, invoice = fut.result() - tx = self._wallet.wallet.lnworker.swap_manager.create_funding_tx(self._swap, dummy_tx, password=self._wallet.password) - coro2 = self._wallet.wallet.lnworker.swap_manager.wait_for_htlcs_and_broadcast(transport, swap=self._swap, invoice=invoice, tx=tx) - self._fut_htlc_wait = fut = asyncio.run_coroutine_threadsafe(coro2, loop) + tx = self._wallet.wallet.lnworker.swap_manager.create_funding_tx(self._swap, dummy_tx, password=self._wallet.password) + coro2 = self._wallet.wallet.lnworker.swap_manager.wait_for_htlcs_and_broadcast(self.swap_transport, swap=self._swap, invoice=invoice, tx=tx) + self._fut_htlc_wait = fut = asyncio.create_task(coro2) - self.canCancel = True - txid = fut.result() - try: # swaphelper might be destroyed at this point - if txid: - self.userinfo = _('Success!') - self.state = QESwapHelper.State.Success - else: - self.userinfo = _('Swap failed!') - self.state = QESwapHelper.State.Failed - except RuntimeError: - pass - except concurrent.futures.CancelledError: - self._wallet.wallet.lnworker.swap_manager.cancel_normal_swap(self._swap) - self.userinfo = _('Swap cancelled') - self.state = QESwapHelper.State.Cancelled - except Exception as e: - try: # swaphelper might be destroyed at this point + self.canCancel = True + txid = await fut + try: # swaphelper might be destroyed at this point + if txid: + self.userinfo = _('Success!') + self.state = QESwapHelper.State.Success + else: + self.userinfo = _('Swap failed!') self.state = QESwapHelper.State.Failed - self.userinfo = _('Error') + ': ' + str(e) - self._logger.error(str(e)) - except RuntimeError: - pass - finally: - try: # swaphelper might be destroyed at this point - self.canCancel = False - self._swap = None - self._fut_htlc_wait = None - except RuntimeError: - pass + except RuntimeError: + pass + except asyncio.CancelledError: + self._wallet.wallet.lnworker.swap_manager.cancel_normal_swap(self._swap) + self.userinfo = _('Swap cancelled') + self.state = QESwapHelper.State.Cancelled + except Exception as e: + try: # swaphelper might be destroyed at this point + self.state = QESwapHelper.State.Failed + self.userinfo = _('Error') + ': ' + str(e) + self._logger.error(str(e)) + except RuntimeError: + pass + finally: + try: # swaphelper might be destroyed at this point + self.canCancel = False + self._swap = None + self._fut_htlc_wait = None + except RuntimeError: + pass - threading.Thread(target=swap_task, daemon=True).start() + asyncio.run_coroutine_threadsafe(swap_task(), get_asyncio_loop()) def _create_tx(self, onchain_amount: Union[int, str, None]) -> PartialTransaction: # TODO: func taken from qt GUI, this should be common code @@ -623,41 +664,37 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): if lightning_amount is None or onchain_amount is None: return - def swap_task(): + async def swap_task(): + assert self.swap_transport is not None, "Swap transport not available" swap_manager = self._wallet.wallet.lnworker.swap_manager - loop = get_asyncio_loop() - with self._wallet.wallet.lnworker.swap_manager.create_transport() as transport: - async def coro(): - await swap_manager.is_initialized.wait() - return await swap_manager.reverse_swap( - transport, - lightning_amount_sat=lightning_amount, - expected_onchain_amount_sat=onchain_amount + swap_manager.get_swap_tx_fee(), - ) - try: - fut = asyncio.run_coroutine_threadsafe(coro(), loop) - self.userinfo = _('Performing swap...') - self.state = QESwapHelper.State.Started - txid = fut.result() - try: # swaphelper might be destroyed at this point - if txid: - self.userinfo = _('Success!') - self.state = QESwapHelper.State.Success - else: - self.userinfo = _('Swap failed!') - self.state = QESwapHelper.State.Failed - except RuntimeError: - pass - except Exception as e: - try: # swaphelper might be destroyed at this point + try: + self.userinfo = _('Performing swap...') + self.state = QESwapHelper.State.Started + await swap_manager.is_initialized.wait() + txid = await swap_manager.reverse_swap( + self.swap_transport, + lightning_amount_sat=lightning_amount, + expected_onchain_amount_sat=onchain_amount + swap_manager.get_swap_tx_fee(), + ) + try: # swaphelper might be destroyed at this point + if txid: + self.userinfo = _('Success!') + self.state = QESwapHelper.State.Success + else: + self.userinfo = _('Swap failed!') self.state = QESwapHelper.State.Failed - msg = _('Timeout') if isinstance(e, TimeoutError) else str(e) - self.userinfo = _('Error') + ': ' + msg - self._logger.error(str(e)) - except RuntimeError: - pass + except RuntimeError: + pass + except Exception as e: + try: # swaphelper might be destroyed at this point + self.state = QESwapHelper.State.Failed + msg = _('Timeout') if isinstance(e, TimeoutError) else str(e) + self.userinfo = _('Error') + ': ' + msg + self._logger.error(str(e)) + except RuntimeError: + pass - threading.Thread(target=swap_task, daemon=True).start() + asyncio.run_coroutine_threadsafe(swap_task(), get_asyncio_loop()) @pyqtSlot() def executeSwap(self): diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index 876e37485..fa1904f6f 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -1333,6 +1333,12 @@ class SwapServerTransport(Logger): def __exit__(self, ex_type, ex, tb): pass + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict: pass @@ -1358,6 +1364,13 @@ class HttpTransport(SwapServerTransport): def __exit__(self, ex_type, ex, tb): pass + async def __aenter__(self): + asyncio.create_task(self.get_pairs()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + async def send_request_to_server(self, method, request_data): response = await self.network.async_send_http_on_proxy( 'post' if request_data else 'get', @@ -1418,6 +1431,13 @@ class NostrTransport(SwapServerTransport): fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop) fut.result(timeout=5) + async def __aenter__(self): + asyncio.create_task(self.main_loop()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await wait_for2(self.stop(), timeout=5) + @log_exceptions async def main_loop(self): self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}') From 89cab31b81108100ab96d86d2d28053595c47c30 Mon Sep 17 00:00:00 2001 From: Sander van Grieken Date: Thu, 5 Jun 2025 20:55:08 +0200 Subject: [PATCH 2/2] qml: swaphelper: update offer model properly --- .../qml/components/NostrSwapServersDialog.qml | 3 -- electrum/gui/qml/qeswaphelper.py | 43 ++++++++++++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/electrum/gui/qml/components/NostrSwapServersDialog.qml b/electrum/gui/qml/components/NostrSwapServersDialog.qml index 3d9b46fb5..e31b455a8 100644 --- a/electrum/gui/qml/components/NostrSwapServersDialog.qml +++ b/electrum/gui/qml/components/NostrSwapServersDialog.qml @@ -54,9 +54,6 @@ ElDialog { Connections { target: swaphelper function onOffersUpdated() { - listview.model = null - listview.model = swaphelper.availableSwapServers - listview.forceLayout() if (dialog.selectedPubkey) { listview.currentIndex = swaphelper.availableSwapServers.indexFor(dialog.selectedPubkey) } diff --git a/electrum/gui/qml/qeswaphelper.py b/electrum/gui/qml/qeswaphelper.py index 666dc32f1..edce023ad 100644 --- a/electrum/gui/qml/qeswaphelper.py +++ b/electrum/gui/qml/qeswaphelper.py @@ -66,9 +66,8 @@ class QESwapServerNPubListModel(QAbstractListModel): self._services = [] self.endResetModel() - def initModel(self, items: Sequence['SwapOffer']): - self.beginInsertRows(QModelIndex(), len(items), len(items)) - self._services = [{ + def offer_to_model(self, x: 'SwapOffer'): + return { 'npub': x.server_npub, 'percentage_fee': x.pairs.percentage, 'mining_fee': x.pairs.mining_fee, @@ -76,9 +75,39 @@ class QESwapServerNPubListModel(QAbstractListModel): 'max_forward_amount': x.pairs.max_forward, 'max_reverse_amount': x.pairs.max_reverse, 'timestamp': age(x.timestamp), - } for x in items] - self.endInsertRows() - self.countChanged.emit() + } + + def updateModel(self, items: Sequence['SwapOffer']): + offers = items.copy() + + remove = [] + + for i, x in enumerate(self._services): + if matches := list(filter(lambda offer: offer.server_npub == x['npub'], offers)): + # update + self._services[i] = self.offer_to_model(matches[0]) + index = self.index(i, 0) + self.dataChanged.emit(index, index, self._ROLE_KEYS) + offers.remove(matches[0]) + else: + # add offer to remove items + remove.append(i) + + # # remove offers from model + for ri in reversed(remove): + self.beginRemoveRows(QModelIndex(), ri, ri) + self._services.pop(ri) + self.endRemoveRows() + + # add new offers + if offers: + self.beginInsertRows(QModelIndex(), len(self._services), len(self._services) + len(offers) - 1) + for offer in offers: + self._services.append(self.offer_to_model(offer)) + self.endInsertRows() + + if offers or remove: + self.countChanged.emit() @pyqtSlot(str, result=int) def indexFor(self, npub: str): @@ -337,7 +366,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): return self._available_swapservers def on_offers_updated(self): - self.availableSwapServers.initModel(self.recent_offers) + self.availableSwapServers.updateModel(self.recent_offers) @pyqtSlot(result=bool) def isNostr(self):