diff --git a/electrum/gui/qml/components/NostrSwapServersDialog.qml b/electrum/gui/qml/components/NostrSwapServersDialog.qml index b134b4f23..e31b455a8 100644 --- a/electrum/gui/qml/components/NostrSwapServersDialog.qml +++ b/electrum/gui/qml/components/NostrSwapServersDialog.qml @@ -51,6 +51,16 @@ ElDialog { clip: true model: swaphelper.availableSwapServers + Connections { + target: swaphelper + function onOffersUpdated() { + if (dialog.selectedPubkey) { + listview.currentIndex = swaphelper.availableSwapServers.indexFor(dialog.selectedPubkey) + } + console.log("swapserver list refreshed") + } + } + delegate: ItemDelegate { width: ListView.view.width height: itemLayout.height diff --git a/electrum/gui/qml/components/SwapDialog.qml b/electrum/gui/qml/components/SwapDialog.qml index 68c4f0d6e..a0ab14cd3 100644 --- a/electrum/gui/qml/components/SwapDialog.qml +++ b/electrum/gui/qml/components/SwapDialog.qml @@ -271,7 +271,7 @@ ElDialog { dialog.accepted.connect(function() { if (Config.swapServerNPub != dialog.selectedPubkey) { Config.swapServerNPub = dialog.selectedPubkey - _swaphelper.init_swap_manager() + _swaphelper.setReadyState() } }) dialog.open() diff --git a/electrum/gui/qml/components/main.qml b/electrum/gui/qml/components/main.qml index 3b50d00d2..826d7a27d 100644 --- a/electrum/gui/qml/components/main.qml +++ b/electrum/gui/qml/components/main.qml @@ -500,7 +500,7 @@ ApplicationWindow }) dialog.accepted.connect(function() { Config.swapServerNPub = dialog.selectedPubkey - _swaphelper.init_swap_manager() + _swaphelper.setReadyState() }) dialog.rejected.connect(function() { _swaphelper.npubSelectionCancelled() diff --git a/electrum/gui/qml/qeswaphelper.py b/electrum/gui/qml/qeswaphelper.py index b694b7962..edce023ad 100644 --- a/electrum/gui/qml/qeswaphelper.py +++ b/electrum/gui/qml/qeswaphelper.py @@ -1,6 +1,4 @@ import asyncio -import concurrent -import threading from enum import IntEnum from typing import Union, Optional, TYPE_CHECKING, Sequence @@ -11,7 +9,8 @@ from electrum.i18n import _ from electrum.bitcoin import DummyAddress from electrum.logging import get_logger from electrum.transaction import PartialTxOutput, PartialTransaction -from electrum.util import NotEnoughFunds, NoDynamicFeeEstimates, profiler, get_asyncio_loop, age +from electrum.util import (NotEnoughFunds, NoDynamicFeeEstimates, profiler, get_asyncio_loop, age, + wait_for2) from electrum.submarine_swaps import NostrTransport, SwapServerTransport from electrum.fee_policy import FeePolicy @@ -67,9 +66,8 @@ class QESwapServerNPubListModel(QAbstractListModel): self._services = [] self.endResetModel() - def initModel(self, items: Sequence['SwapOffer']): - self.beginInsertRows(QModelIndex(), len(items), len(items)) - self._services = [{ + def offer_to_model(self, x: 'SwapOffer'): + return { 'npub': x.server_npub, 'percentage_fee': x.pairs.percentage, 'mining_fee': x.pairs.mining_fee, @@ -77,9 +75,39 @@ class QESwapServerNPubListModel(QAbstractListModel): 'max_forward_amount': x.pairs.max_forward, 'max_reverse_amount': x.pairs.max_reverse, 'timestamp': age(x.timestamp), - } for x in items] - self.endInsertRows() - self.countChanged.emit() + } + + def updateModel(self, items: Sequence['SwapOffer']): + offers = items.copy() + + remove = [] + + for i, x in enumerate(self._services): + if matches := list(filter(lambda offer: offer.server_npub == x['npub'], offers)): + # update + self._services[i] = self.offer_to_model(matches[0]) + index = self.index(i, 0) + self.dataChanged.emit(index, index, self._ROLE_KEYS) + offers.remove(matches[0]) + else: + # add offer to remove items + remove.append(i) + + # # remove offers from model + for ri in reversed(remove): + self.beginRemoveRows(QModelIndex(), ri, ri) + self._services.pop(ri) + self.endRemoveRows() + + # add new offers + if offers: + self.beginInsertRows(QModelIndex(), len(self._services), len(self._services) + len(offers) - 1) + for offer in offers: + self._services.append(self.offer_to_model(offer)) + self.endInsertRows() + + if offers or remove: + self.countChanged.emit() @pyqtSlot(str, result=int) def indexFor(self, npub: str): @@ -153,8 +181,14 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): self.requestTxUpdate.connect(self.tx_update_pushback_timer) self.offersUpdated.connect(self.on_offers_updated) + self.transport_task: Optional[asyncio.Task] = None + self.swap_transport: Optional[SwapServerTransport] = None + self.recent_offers = [] def on_destroy(self): + if self.transport_task is not None: + self.transport_task.cancel() + self.transport_task = None self.unregister_callbacks() walletChanged = pyqtSignal() @@ -166,7 +200,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): def wallet(self, wallet: QEWallet): if self._wallet != wallet: self._wallet = wallet - self.init_swap_manager() + self.run_swap_manager() self.walletChanged.emit() sliderPosChanged = pyqtSignal() @@ -332,15 +366,14 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): return self._available_swapservers def on_offers_updated(self): - self.availableSwapServers.initModel(self.recent_offers) + self.availableSwapServers.updateModel(self.recent_offers) @pyqtSlot(result=bool) def isNostr(self): return True # TODO - @pyqtSlot() - def init_swap_manager(self): - self._logger.debug('init_swap_manager') + def run_swap_manager(self): + self._logger.debug('run_swap_manager') if (lnworker := self._wallet.wallet.lnworker) is None: return swap_manager = lnworker.swap_manager @@ -356,73 +389,107 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): swap_transport = swap_manager.create_transport() - def query_task(transport: SwapServerTransport): - with transport: - try: - async def wait_initialized(): - try: - await asyncio.wait_for(swap_manager.is_initialized.wait(), timeout=15) - self._logger.debug('swapmanager initialized') - self.state = QESwapHelper.State.Initialized - except asyncio.TimeoutError: - self._logger.debug('swapmanager init timeout') - self.state = QESwapHelper.State.NoService - return - - if not swap_manager.is_initialized.is_set(): - self.userinfo = _('Initializing...') - fut = asyncio.run_coroutine_threadsafe(wait_initialized(), get_asyncio_loop()) - fut.result() - except Exception as e: - try: # swaphelper might be destroyed at this point - self.userinfo = _('Error') + ': ' + str(e) - self.state = QESwapHelper.State.NoService - self._logger.error(str(e)) - except RuntimeError: - pass - if isinstance(transport, NostrTransport): - if not swap_manager.is_initialized.is_set(): - if not transport.is_connected.is_set(): + async def swap_transport_task(transport: SwapServerTransport): + async with transport: + self.swap_transport = transport + if not swap_manager.is_initialized.is_set(): + self.userinfo = _('Initializing...') + try: + # is_initialized is set if we receive the event of our configured SWAPSERVER_NPUB + # This will timeout if no server is configured, or our server didn't publish recently. + timeout = transport.connect_timeout + 1 + await wait_for2(swap_manager.is_initialized.wait(), timeout=timeout) + self._logger.debug('swapmanager initialized') + self.state = QESwapHelper.State.Initialized + except asyncio.TimeoutError: + # only fail if we didn't get any offers or couldn't connect at all + # otherwise the timeout just means that no offer of the selected npub has + # been found (or that there is no npub selected at all), so the prompt should open + if isinstance(transport, NostrTransport) and not transport.is_connected.is_set(): self.userinfo = _('Error') + ': ' + '\n'.join([ _('Could not connect to a Nostr relay.'), _('Please check your relays and network connection') ]) self.state = QESwapHelper.State.NoService return - self.recent_offers = [x for x in transport.get_recent_offers()] - if not self.recent_offers: + elif not isinstance(transport, NostrTransport) or not transport.get_recent_offers(): + self._logger.debug('Could not find a swap provider.') self.userinfo = _('Could not find a swap provider.') self.state = QESwapHelper.State.NoService return - - self.offersUpdated.emit() - self.undefinedNPub.emit() + except Exception as e: + try: # swaphelper might be destroyed at this point + self.userinfo = _('Error') + ': ' + str(e) + self.state = QESwapHelper.State.NoService + self._logger.error(str(e)) + except RuntimeError: + pass return - else: - self.recent_offers = [x for x in transport.get_recent_offers()] - if not self.recent_offers: - self.userinfo = _('Could not find a swap provider.') - self.state = QESwapHelper.State.NoService - return - self.offersUpdated.emit() + if isinstance(transport, NostrTransport) and not swap_manager.is_initialized.is_set(): + # not is_initialized.is_set() = configured provider was not found (or no provider configured) + # prompt user to select a swapserver + self.recent_offers = transport.get_recent_offers() + self.offersUpdated.emit() + self.undefinedNPub.emit() + elif swap_manager.is_initialized.is_set(): + self.setReadyState() - self.state = QESwapHelper.State.ServiceReady - self.userinfo = QESwapHelper.MESSAGE_SWAP_HOWTO - self.init_swap_slider_range() + while True: + # keep fetching new incoming offer events + # the slider range will not get updated continuously as it would irritate the user + if isinstance(transport, NostrTransport): + if (recent_offers := transport.get_recent_offers()) != self.recent_offers: + self._logger.debug(f"received new swap offer") + self.recent_offers = recent_offers + self.offersUpdated.emit() + await asyncio.sleep(1) - threading.Thread(target=query_task, args=(swap_transport,), daemon=True).start() + async def handle_swap_transport(transport: SwapServerTransport): + # ensures that swap_transport is always set None if transport closes + try: + await swap_transport_task(transport) + finally: + self.swap_transport = None + + self.transport_task = asyncio.run_coroutine_threadsafe( + handle_swap_transport(swap_transport), + get_asyncio_loop() + ) @pyqtSlot() def npubSelectionCancelled(self): - if not self._wallet.wallet.config.SWAPSERVER_NPUB: + if (self._wallet.wallet.config.SWAPSERVER_NPUB + not in [offer.server_npub for offer in self.recent_offers]): self._logger.debug('nostr is preferred but swapserver npub still undefined') - self.userinfo = _('No swap provider selected.') + if not self._wallet.wallet.config.SWAPSERVER_NPUB: + self.userinfo = _('No swap provider selected.') + else: + self.userinfo = _('Select one of the available swap providers.') self.state = QESwapHelper.State.NoService - def init_swap_slider_range(self): + @pyqtSlot() + def setReadyState(self): + if self._wallet.wallet.config.SWAPSERVER_NPUB \ + or not isinstance(self.swap_transport, NostrTransport): + self.state = QESwapHelper.State.ServiceReady + self.userinfo = QESwapHelper.MESSAGE_SWAP_HOWTO + self.initSwapSliderRange() + + def update_swap_manager_pair(self): + """Updates the swap manager pairs to the recent pairs of the selected server""" + assert self.swap_transport is not None, "No swap transport" + if isinstance(self.swap_transport, NostrTransport): + swap_manager = self._wallet.wallet.lnworker.swap_manager + pair = self.swap_transport.get_offer(self._wallet.wallet.config.SWAPSERVER_NPUB) + swap_manager.update_pairs(pair.pairs) + + @pyqtSlot() + def initSwapSliderRange(self): lnworker = self._wallet.wallet.lnworker swap_manager = lnworker.swap_manager + # update the swap_manager pair so the newest available data is used below + self.update_swap_manager_pair() """Sets the minimal and maximal amount that can be swapped for the swap slider.""" @@ -443,7 +510,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): max_onchain_spend)) # we expect range to adjust the value of the swap slider to be in the # correct range, i.e., to correct an overflow when reducing the limits - self._logger.debug(f'Slider range {-reverse} - {forward}') + self._logger.debug(f'Slider range {-reverse} - {forward}. Pos {self._sliderPos}') self.rangeMin = -reverse self.rangeMax = forward # percentage of void, right or left @@ -459,6 +526,12 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): self.leftVoidChanged.emit() self.rightVoidChanged.emit() + if not self.rangeMin <= self._sliderPos <= self.rangeMax: + # clamp the slider pos into the given limits + if abs(self._sliderPos - self.rangeMin) < abs(self._sliderPos - self.rangeMax): + self._sliderPos = self.rangeMin + else: + self._sliderPos = self.rangeMax self.swap_slider_moved() @profiler @@ -504,7 +577,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): self._receive_amount = swap_manager.get_recv_amount(send_amount=self._send_amount, is_reverse=self.isReverse) self.toreceive = QEAmount(amount_sat=self._receive_amount) # fee breakdown - self.serverfeeperc = f'{swap_manager.percentage:0.1f}%' + self.serverfeeperc = f'{swap_manager.percentage:0.2f}%' server_miningfee = swap_manager.mining_fee self.serverMiningfee = QEAmount(amount_sat=server_miningfee) if self.isReverse: @@ -540,57 +613,54 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): assert self._tx if lightning_amount is None or onchain_amount is None: return - loop = get_asyncio_loop() - def swap_task(): - with self._wallet.wallet.lnworker.swap_manager.create_transport() as transport: - coro = self._wallet.wallet.lnworker.swap_manager.request_normal_swap( - transport, + async def swap_task(): + assert self.swap_transport is not None, "Swap transport not available" + try: + dummy_tx = self._create_tx(onchain_amount) + self.userinfo = _('Performing swap...') + self.state = QESwapHelper.State.Started + self._swap, invoice = await self._wallet.wallet.lnworker.swap_manager.request_normal_swap( + self.swap_transport, lightning_amount_sat=lightning_amount, expected_onchain_amount_sat=onchain_amount, ) - try: - dummy_tx = self._create_tx(onchain_amount) - fut = asyncio.run_coroutine_threadsafe(coro, loop) - self.userinfo = _('Performing swap...') - self.state = QESwapHelper.State.Started - self._swap, invoice = fut.result() - tx = self._wallet.wallet.lnworker.swap_manager.create_funding_tx(self._swap, dummy_tx, password=self._wallet.password) - coro2 = self._wallet.wallet.lnworker.swap_manager.wait_for_htlcs_and_broadcast(transport, swap=self._swap, invoice=invoice, tx=tx) - self._fut_htlc_wait = fut = asyncio.run_coroutine_threadsafe(coro2, loop) + tx = self._wallet.wallet.lnworker.swap_manager.create_funding_tx(self._swap, dummy_tx, password=self._wallet.password) + coro2 = self._wallet.wallet.lnworker.swap_manager.wait_for_htlcs_and_broadcast(self.swap_transport, swap=self._swap, invoice=invoice, tx=tx) + self._fut_htlc_wait = fut = asyncio.create_task(coro2) - self.canCancel = True - txid = fut.result() - try: # swaphelper might be destroyed at this point - if txid: - self.userinfo = _('Success!') - self.state = QESwapHelper.State.Success - else: - self.userinfo = _('Swap failed!') - self.state = QESwapHelper.State.Failed - except RuntimeError: - pass - except concurrent.futures.CancelledError: - self._wallet.wallet.lnworker.swap_manager.cancel_normal_swap(self._swap) - self.userinfo = _('Swap cancelled') - self.state = QESwapHelper.State.Cancelled - except Exception as e: - try: # swaphelper might be destroyed at this point + self.canCancel = True + txid = await fut + try: # swaphelper might be destroyed at this point + if txid: + self.userinfo = _('Success!') + self.state = QESwapHelper.State.Success + else: + self.userinfo = _('Swap failed!') self.state = QESwapHelper.State.Failed - self.userinfo = _('Error') + ': ' + str(e) - self._logger.error(str(e)) - except RuntimeError: - pass - finally: - try: # swaphelper might be destroyed at this point - self.canCancel = False - self._swap = None - self._fut_htlc_wait = None - except RuntimeError: - pass + except RuntimeError: + pass + except asyncio.CancelledError: + self._wallet.wallet.lnworker.swap_manager.cancel_normal_swap(self._swap) + self.userinfo = _('Swap cancelled') + self.state = QESwapHelper.State.Cancelled + except Exception as e: + try: # swaphelper might be destroyed at this point + self.state = QESwapHelper.State.Failed + self.userinfo = _('Error') + ': ' + str(e) + self._logger.error(str(e)) + except RuntimeError: + pass + finally: + try: # swaphelper might be destroyed at this point + self.canCancel = False + self._swap = None + self._fut_htlc_wait = None + except RuntimeError: + pass - threading.Thread(target=swap_task, daemon=True).start() + asyncio.run_coroutine_threadsafe(swap_task(), get_asyncio_loop()) def _create_tx(self, onchain_amount: Union[int, str, None]) -> PartialTransaction: # TODO: func taken from qt GUI, this should be common code @@ -623,41 +693,37 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener): if lightning_amount is None or onchain_amount is None: return - def swap_task(): + async def swap_task(): + assert self.swap_transport is not None, "Swap transport not available" swap_manager = self._wallet.wallet.lnworker.swap_manager - loop = get_asyncio_loop() - with self._wallet.wallet.lnworker.swap_manager.create_transport() as transport: - async def coro(): - await swap_manager.is_initialized.wait() - return await swap_manager.reverse_swap( - transport, - lightning_amount_sat=lightning_amount, - expected_onchain_amount_sat=onchain_amount + swap_manager.get_swap_tx_fee(), - ) - try: - fut = asyncio.run_coroutine_threadsafe(coro(), loop) - self.userinfo = _('Performing swap...') - self.state = QESwapHelper.State.Started - txid = fut.result() - try: # swaphelper might be destroyed at this point - if txid: - self.userinfo = _('Success!') - self.state = QESwapHelper.State.Success - else: - self.userinfo = _('Swap failed!') - self.state = QESwapHelper.State.Failed - except RuntimeError: - pass - except Exception as e: - try: # swaphelper might be destroyed at this point + try: + self.userinfo = _('Performing swap...') + self.state = QESwapHelper.State.Started + await swap_manager.is_initialized.wait() + txid = await swap_manager.reverse_swap( + self.swap_transport, + lightning_amount_sat=lightning_amount, + expected_onchain_amount_sat=onchain_amount + swap_manager.get_swap_tx_fee(), + ) + try: # swaphelper might be destroyed at this point + if txid: + self.userinfo = _('Success!') + self.state = QESwapHelper.State.Success + else: + self.userinfo = _('Swap failed!') self.state = QESwapHelper.State.Failed - msg = _('Timeout') if isinstance(e, TimeoutError) else str(e) - self.userinfo = _('Error') + ': ' + msg - self._logger.error(str(e)) - except RuntimeError: - pass + except RuntimeError: + pass + except Exception as e: + try: # swaphelper might be destroyed at this point + self.state = QESwapHelper.State.Failed + msg = _('Timeout') if isinstance(e, TimeoutError) else str(e) + self.userinfo = _('Error') + ': ' + msg + self._logger.error(str(e)) + except RuntimeError: + pass - threading.Thread(target=swap_task, daemon=True).start() + asyncio.run_coroutine_threadsafe(swap_task(), get_asyncio_loop()) @pyqtSlot() def executeSwap(self): diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index 876e37485..fa1904f6f 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -1333,6 +1333,12 @@ class SwapServerTransport(Logger): def __exit__(self, ex_type, ex, tb): pass + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict: pass @@ -1358,6 +1364,13 @@ class HttpTransport(SwapServerTransport): def __exit__(self, ex_type, ex, tb): pass + async def __aenter__(self): + asyncio.create_task(self.get_pairs()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + async def send_request_to_server(self, method, request_data): response = await self.network.async_send_http_on_proxy( 'post' if request_data else 'get', @@ -1418,6 +1431,13 @@ class NostrTransport(SwapServerTransport): fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop) fut.result(timeout=5) + async def __aenter__(self): + asyncio.create_task(self.main_loop()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await wait_for2(self.stop(), timeout=5) + @log_exceptions async def main_loop(self): self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}')