1
0

fix: use single long lived transport in qeswaphelper

changes qeswaphelper to shate a single, long lived transport instance
instead of opening new transports to do swaps and fetch offers.
This allows to continuosly fetch offers, so events which get returned
later by slow relays don't get missed and the fee values stay updated.
Also fixes a race causing the list to miss some swapservers, as the
current implementation fetches only until
`swap_manager.is_initialized()` is set, which will get set as soon as an
event of the configured swapserver is received. So if the event of the
configured swapserver is received as first, all server events coming in
after it would get ignored.
This commit is contained in:
f321x
2025-06-05 10:28:57 +02:00
parent e3ccee6d63
commit 3a39abc415
5 changed files with 202 additions and 132 deletions

View File

@@ -51,6 +51,19 @@ ElDialog {
clip: true
model: swaphelper.availableSwapServers
Connections {
target: swaphelper
function onOffersUpdated() {
listview.model = null
listview.model = swaphelper.availableSwapServers
listview.forceLayout()
if (dialog.selectedPubkey) {
listview.currentIndex = swaphelper.availableSwapServers.indexFor(dialog.selectedPubkey)
}
console.log("swapserver list refreshed")
}
}
delegate: ItemDelegate {
width: ListView.view.width
height: itemLayout.height

View File

@@ -271,7 +271,7 @@ ElDialog {
dialog.accepted.connect(function() {
if (Config.swapServerNPub != dialog.selectedPubkey) {
Config.swapServerNPub = dialog.selectedPubkey
_swaphelper.init_swap_manager()
_swaphelper.setReadyState()
}
})
dialog.open()

View File

@@ -500,7 +500,7 @@ ApplicationWindow
})
dialog.accepted.connect(function() {
Config.swapServerNPub = dialog.selectedPubkey
_swaphelper.init_swap_manager()
_swaphelper.setReadyState()
})
dialog.rejected.connect(function() {
_swaphelper.npubSelectionCancelled()

View File

@@ -1,6 +1,4 @@
import asyncio
import concurrent
import threading
from enum import IntEnum
from typing import Union, Optional, TYPE_CHECKING, Sequence
@@ -11,7 +9,8 @@ from electrum.i18n import _
from electrum.bitcoin import DummyAddress
from electrum.logging import get_logger
from electrum.transaction import PartialTxOutput, PartialTransaction
from electrum.util import NotEnoughFunds, NoDynamicFeeEstimates, profiler, get_asyncio_loop, age
from electrum.util import (NotEnoughFunds, NoDynamicFeeEstimates, profiler, get_asyncio_loop, age,
wait_for2)
from electrum.submarine_swaps import NostrTransport, SwapServerTransport
from electrum.fee_policy import FeePolicy
@@ -153,8 +152,14 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
self.requestTxUpdate.connect(self.tx_update_pushback_timer)
self.offersUpdated.connect(self.on_offers_updated)
self.transport_task: Optional[asyncio.Task] = None
self.swap_transport: Optional[SwapServerTransport] = None
self.recent_offers = []
def on_destroy(self):
if self.transport_task is not None:
self.transport_task.cancel()
self.transport_task = None
self.unregister_callbacks()
walletChanged = pyqtSignal()
@@ -166,7 +171,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
def wallet(self, wallet: QEWallet):
if self._wallet != wallet:
self._wallet = wallet
self.init_swap_manager()
self.run_swap_manager()
self.walletChanged.emit()
sliderPosChanged = pyqtSignal()
@@ -338,9 +343,8 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
def isNostr(self):
return True # TODO
@pyqtSlot()
def init_swap_manager(self):
self._logger.debug('init_swap_manager')
def run_swap_manager(self):
self._logger.debug('run_swap_manager')
if (lnworker := self._wallet.wallet.lnworker) is None:
return
swap_manager = lnworker.swap_manager
@@ -356,73 +360,107 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
swap_transport = swap_manager.create_transport()
def query_task(transport: SwapServerTransport):
with transport:
try:
async def wait_initialized():
try:
await asyncio.wait_for(swap_manager.is_initialized.wait(), timeout=15)
self._logger.debug('swapmanager initialized')
self.state = QESwapHelper.State.Initialized
except asyncio.TimeoutError:
self._logger.debug('swapmanager init timeout')
self.state = QESwapHelper.State.NoService
return
if not swap_manager.is_initialized.is_set():
self.userinfo = _('Initializing...')
fut = asyncio.run_coroutine_threadsafe(wait_initialized(), get_asyncio_loop())
fut.result()
except Exception as e:
try: # swaphelper might be destroyed at this point
self.userinfo = _('Error') + ': ' + str(e)
self.state = QESwapHelper.State.NoService
self._logger.error(str(e))
except RuntimeError:
pass
if isinstance(transport, NostrTransport):
if not swap_manager.is_initialized.is_set():
if not transport.is_connected.is_set():
async def swap_transport_task(transport: SwapServerTransport):
async with transport:
self.swap_transport = transport
if not swap_manager.is_initialized.is_set():
self.userinfo = _('Initializing...')
try:
# is_initialized is set if we receive the event of our configured SWAPSERVER_NPUB
# This will timeout if no server is configured, or our server didn't publish recently.
timeout = transport.connect_timeout + 1
await wait_for2(swap_manager.is_initialized.wait(), timeout=timeout)
self._logger.debug('swapmanager initialized')
self.state = QESwapHelper.State.Initialized
except asyncio.TimeoutError:
# only fail if we didn't get any offers or couldn't connect at all
# otherwise the timeout just means that no offer of the selected npub has
# been found (or that there is no npub selected at all), so the prompt should open
if isinstance(transport, NostrTransport) and not transport.is_connected.is_set():
self.userinfo = _('Error') + ': ' + '\n'.join([
_('Could not connect to a Nostr relay.'),
_('Please check your relays and network connection')
])
self.state = QESwapHelper.State.NoService
return
self.recent_offers = [x for x in transport.get_recent_offers()]
if not self.recent_offers:
elif not isinstance(transport, NostrTransport) or not transport.get_recent_offers():
self._logger.debug('Could not find a swap provider.')
self.userinfo = _('Could not find a swap provider.')
self.state = QESwapHelper.State.NoService
return
self.offersUpdated.emit()
self.undefinedNPub.emit()
except Exception as e:
try: # swaphelper might be destroyed at this point
self.userinfo = _('Error') + ': ' + str(e)
self.state = QESwapHelper.State.NoService
self._logger.error(str(e))
except RuntimeError:
pass
return
else:
self.recent_offers = [x for x in transport.get_recent_offers()]
if not self.recent_offers:
self.userinfo = _('Could not find a swap provider.')
self.state = QESwapHelper.State.NoService
return
self.offersUpdated.emit()
if isinstance(transport, NostrTransport) and not swap_manager.is_initialized.is_set():
# not is_initialized.is_set() = configured provider was not found (or no provider configured)
# prompt user to select a swapserver
self.recent_offers = transport.get_recent_offers()
self.offersUpdated.emit()
self.undefinedNPub.emit()
elif swap_manager.is_initialized.is_set():
self.setReadyState()
self.state = QESwapHelper.State.ServiceReady
self.userinfo = QESwapHelper.MESSAGE_SWAP_HOWTO
self.init_swap_slider_range()
while True:
# keep fetching new incoming offer events
# the slider range will not get updated continuously as it would irritate the user
if isinstance(transport, NostrTransport):
if (recent_offers := transport.get_recent_offers()) != self.recent_offers:
self._logger.debug(f"received new swap offer")
self.recent_offers = recent_offers
self.offersUpdated.emit()
await asyncio.sleep(1)
threading.Thread(target=query_task, args=(swap_transport,), daemon=True).start()
async def handle_swap_transport(transport: SwapServerTransport):
# ensures that swap_transport is always set None if transport closes
try:
await swap_transport_task(transport)
finally:
self.swap_transport = None
self.transport_task = asyncio.run_coroutine_threadsafe(
handle_swap_transport(swap_transport),
get_asyncio_loop()
)
@pyqtSlot()
def npubSelectionCancelled(self):
if not self._wallet.wallet.config.SWAPSERVER_NPUB:
if (self._wallet.wallet.config.SWAPSERVER_NPUB
not in [offer.server_npub for offer in self.recent_offers]):
self._logger.debug('nostr is preferred but swapserver npub still undefined')
self.userinfo = _('No swap provider selected.')
if not self._wallet.wallet.config.SWAPSERVER_NPUB:
self.userinfo = _('No swap provider selected.')
else:
self.userinfo = _('Select one of the available swap providers.')
self.state = QESwapHelper.State.NoService
def init_swap_slider_range(self):
@pyqtSlot()
def setReadyState(self):
if self._wallet.wallet.config.SWAPSERVER_NPUB \
or not isinstance(self.swap_transport, NostrTransport):
self.state = QESwapHelper.State.ServiceReady
self.userinfo = QESwapHelper.MESSAGE_SWAP_HOWTO
self.initSwapSliderRange()
def update_swap_manager_pair(self):
"""Updates the swap manager pairs to the recent pairs of the selected server"""
assert self.swap_transport is not None, "No swap transport"
if isinstance(self.swap_transport, NostrTransport):
swap_manager = self._wallet.wallet.lnworker.swap_manager
pair = self.swap_transport.get_offer(self._wallet.wallet.config.SWAPSERVER_NPUB)
swap_manager.update_pairs(pair.pairs)
@pyqtSlot()
def initSwapSliderRange(self):
lnworker = self._wallet.wallet.lnworker
swap_manager = lnworker.swap_manager
# update the swap_manager pair so the newest available data is used below
self.update_swap_manager_pair()
"""Sets the minimal and maximal amount that can be swapped for the swap
slider."""
@@ -443,7 +481,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
max_onchain_spend))
# we expect range to adjust the value of the swap slider to be in the
# correct range, i.e., to correct an overflow when reducing the limits
self._logger.debug(f'Slider range {-reverse} - {forward}')
self._logger.debug(f'Slider range {-reverse} - {forward}. Pos {self._sliderPos}')
self.rangeMin = -reverse
self.rangeMax = forward
# percentage of void, right or left
@@ -459,6 +497,12 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
self.leftVoidChanged.emit()
self.rightVoidChanged.emit()
if not self.rangeMin <= self._sliderPos <= self.rangeMax:
# clamp the slider pos into the given limits
if abs(self._sliderPos - self.rangeMin) < abs(self._sliderPos - self.rangeMax):
self._sliderPos = self.rangeMin
else:
self._sliderPos = self.rangeMax
self.swap_slider_moved()
@profiler
@@ -504,7 +548,7 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
self._receive_amount = swap_manager.get_recv_amount(send_amount=self._send_amount, is_reverse=self.isReverse)
self.toreceive = QEAmount(amount_sat=self._receive_amount)
# fee breakdown
self.serverfeeperc = f'{swap_manager.percentage:0.1f}%'
self.serverfeeperc = f'{swap_manager.percentage:0.2f}%'
server_miningfee = swap_manager.mining_fee
self.serverMiningfee = QEAmount(amount_sat=server_miningfee)
if self.isReverse:
@@ -540,57 +584,54 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
assert self._tx
if lightning_amount is None or onchain_amount is None:
return
loop = get_asyncio_loop()
def swap_task():
with self._wallet.wallet.lnworker.swap_manager.create_transport() as transport:
coro = self._wallet.wallet.lnworker.swap_manager.request_normal_swap(
transport,
async def swap_task():
assert self.swap_transport is not None, "Swap transport not available"
try:
dummy_tx = self._create_tx(onchain_amount)
self.userinfo = _('Performing swap...')
self.state = QESwapHelper.State.Started
self._swap, invoice = await self._wallet.wallet.lnworker.swap_manager.request_normal_swap(
self.swap_transport,
lightning_amount_sat=lightning_amount,
expected_onchain_amount_sat=onchain_amount,
)
try:
dummy_tx = self._create_tx(onchain_amount)
fut = asyncio.run_coroutine_threadsafe(coro, loop)
self.userinfo = _('Performing swap...')
self.state = QESwapHelper.State.Started
self._swap, invoice = fut.result()
tx = self._wallet.wallet.lnworker.swap_manager.create_funding_tx(self._swap, dummy_tx, password=self._wallet.password)
coro2 = self._wallet.wallet.lnworker.swap_manager.wait_for_htlcs_and_broadcast(transport, swap=self._swap, invoice=invoice, tx=tx)
self._fut_htlc_wait = fut = asyncio.run_coroutine_threadsafe(coro2, loop)
tx = self._wallet.wallet.lnworker.swap_manager.create_funding_tx(self._swap, dummy_tx, password=self._wallet.password)
coro2 = self._wallet.wallet.lnworker.swap_manager.wait_for_htlcs_and_broadcast(self.swap_transport, swap=self._swap, invoice=invoice, tx=tx)
self._fut_htlc_wait = fut = asyncio.create_task(coro2)
self.canCancel = True
txid = fut.result()
try: # swaphelper might be destroyed at this point
if txid:
self.userinfo = _('Success!')
self.state = QESwapHelper.State.Success
else:
self.userinfo = _('Swap failed!')
self.state = QESwapHelper.State.Failed
except RuntimeError:
pass
except concurrent.futures.CancelledError:
self._wallet.wallet.lnworker.swap_manager.cancel_normal_swap(self._swap)
self.userinfo = _('Swap cancelled')
self.state = QESwapHelper.State.Cancelled
except Exception as e:
try: # swaphelper might be destroyed at this point
self.canCancel = True
txid = await fut
try: # swaphelper might be destroyed at this point
if txid:
self.userinfo = _('Success!')
self.state = QESwapHelper.State.Success
else:
self.userinfo = _('Swap failed!')
self.state = QESwapHelper.State.Failed
self.userinfo = _('Error') + ': ' + str(e)
self._logger.error(str(e))
except RuntimeError:
pass
finally:
try: # swaphelper might be destroyed at this point
self.canCancel = False
self._swap = None
self._fut_htlc_wait = None
except RuntimeError:
pass
except RuntimeError:
pass
except asyncio.CancelledError:
self._wallet.wallet.lnworker.swap_manager.cancel_normal_swap(self._swap)
self.userinfo = _('Swap cancelled')
self.state = QESwapHelper.State.Cancelled
except Exception as e:
try: # swaphelper might be destroyed at this point
self.state = QESwapHelper.State.Failed
self.userinfo = _('Error') + ': ' + str(e)
self._logger.error(str(e))
except RuntimeError:
pass
finally:
try: # swaphelper might be destroyed at this point
self.canCancel = False
self._swap = None
self._fut_htlc_wait = None
except RuntimeError:
pass
threading.Thread(target=swap_task, daemon=True).start()
asyncio.run_coroutine_threadsafe(swap_task(), get_asyncio_loop())
def _create_tx(self, onchain_amount: Union[int, str, None]) -> PartialTransaction:
# TODO: func taken from qt GUI, this should be common code
@@ -623,41 +664,37 @@ class QESwapHelper(AuthMixin, QObject, QtEventListener):
if lightning_amount is None or onchain_amount is None:
return
def swap_task():
async def swap_task():
assert self.swap_transport is not None, "Swap transport not available"
swap_manager = self._wallet.wallet.lnworker.swap_manager
loop = get_asyncio_loop()
with self._wallet.wallet.lnworker.swap_manager.create_transport() as transport:
async def coro():
await swap_manager.is_initialized.wait()
return await swap_manager.reverse_swap(
transport,
lightning_amount_sat=lightning_amount,
expected_onchain_amount_sat=onchain_amount + swap_manager.get_swap_tx_fee(),
)
try:
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
self.userinfo = _('Performing swap...')
self.state = QESwapHelper.State.Started
txid = fut.result()
try: # swaphelper might be destroyed at this point
if txid:
self.userinfo = _('Success!')
self.state = QESwapHelper.State.Success
else:
self.userinfo = _('Swap failed!')
self.state = QESwapHelper.State.Failed
except RuntimeError:
pass
except Exception as e:
try: # swaphelper might be destroyed at this point
try:
self.userinfo = _('Performing swap...')
self.state = QESwapHelper.State.Started
await swap_manager.is_initialized.wait()
txid = await swap_manager.reverse_swap(
self.swap_transport,
lightning_amount_sat=lightning_amount,
expected_onchain_amount_sat=onchain_amount + swap_manager.get_swap_tx_fee(),
)
try: # swaphelper might be destroyed at this point
if txid:
self.userinfo = _('Success!')
self.state = QESwapHelper.State.Success
else:
self.userinfo = _('Swap failed!')
self.state = QESwapHelper.State.Failed
msg = _('Timeout') if isinstance(e, TimeoutError) else str(e)
self.userinfo = _('Error') + ': ' + msg
self._logger.error(str(e))
except RuntimeError:
pass
except RuntimeError:
pass
except Exception as e:
try: # swaphelper might be destroyed at this point
self.state = QESwapHelper.State.Failed
msg = _('Timeout') if isinstance(e, TimeoutError) else str(e)
self.userinfo = _('Error') + ': ' + msg
self._logger.error(str(e))
except RuntimeError:
pass
threading.Thread(target=swap_task, daemon=True).start()
asyncio.run_coroutine_threadsafe(swap_task(), get_asyncio_loop())
@pyqtSlot()
def executeSwap(self):

View File

@@ -1333,6 +1333,12 @@ class SwapServerTransport(Logger):
def __exit__(self, ex_type, ex, tb):
pass
async def __aenter__(self):
pass
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict:
pass
@@ -1358,6 +1364,13 @@ class HttpTransport(SwapServerTransport):
def __exit__(self, ex_type, ex, tb):
pass
async def __aenter__(self):
asyncio.create_task(self.get_pairs())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def send_request_to_server(self, method, request_data):
response = await self.network.async_send_http_on_proxy(
'post' if request_data else 'get',
@@ -1418,6 +1431,13 @@ class NostrTransport(SwapServerTransport):
fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop)
fut.result(timeout=5)
async def __aenter__(self):
asyncio.create_task(self.main_loop())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await wait_for2(self.stop(), timeout=5)
@log_exceptions
async def main_loop(self):
self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}')