From 5be646dfd25dc76f1d36d257ea57e21cb0132882 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Tue, 6 May 2025 11:59:53 +0200 Subject: [PATCH] call lnwatcher callbacks in asyncio thread This partially reverts fbebe7de1a lnwatcher.trigger_callbacks is called manually in commands.py --- electrum/commands.py | 3 +++ electrum/lnwatcher.py | 42 ++++++++++++++++++++++-------------------- electrum/lnworker.py | 6 +++--- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/electrum/commands.py b/electrum/commands.py index 8ee37873b..e4601497a 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -1061,6 +1061,9 @@ class Commands(Logger): arg:bool:show_fees:Show miner fees paid by transactions arg:int:year:Show history for a given year """ + # trigger lnwatcher callbacks for their side effects: setting labels and accounting_addresses + await wallet.lnworker.lnwatcher.trigger_callbacks(requires_synchronizer=False) + #'from_height': (None, "Only show transactions that confirmed after given block height"), #'to_height': (None, "Only show transactions that confirmed before given block height"), kwargs = self.get_year_timestamps(year) diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py index 27958f11d..28e8f771c 100644 --- a/electrum/lnwatcher.py +++ b/electrum/lnwatcher.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING from .util import TxMinedInfo, BelowDustLimit -from .util import EventListener, event_listener +from .util import EventListener, event_listener, log_exceptions, ignore_exceptions from .transaction import Transaction, TxOutpoint from .logging import Logger @@ -48,12 +48,12 @@ class LNWatcher(Logger, EventListener): self.adb.add_address(address) self.callbacks[address] = callback - def trigger_callbacks(self): - if not self.adb.synchronizer: + async def trigger_callbacks(self, *, requires_synchronizer=True): + if requires_synchronizer and not self.adb.synchronizer: self.logger.info("synchronizer not set yet") return for address, callback in list(self.callbacks.items()): - callback() + await callback() @event_listener async def on_event_blockchain_updated(self, *args): @@ -62,32 +62,31 @@ class LNWatcher(Logger, EventListener): # (hold invoice preimage revealed, MPP completed, etc) for chan in self.lnworker.channels.values(): chan._sweep_info.clear() - self.trigger_callbacks() + await self.trigger_callbacks() @event_listener - def on_event_wallet_updated(self, wallet): + async def on_event_wallet_updated(self, wallet): # called if we add local tx if wallet.adb != self.adb: return - self.trigger_callbacks() + await self.trigger_callbacks() @event_listener - def on_event_adb_added_verified_tx(self, adb, tx_hash): + async def on_event_adb_added_verified_tx(self, adb, tx_hash): if adb != self.adb: return - self.trigger_callbacks() + await self.trigger_callbacks() @event_listener - def on_event_adb_set_up_to_date(self, adb): + async def on_event_adb_set_up_to_date(self, adb): if adb != self.adb: return - self.trigger_callbacks() + await self.trigger_callbacks() def add_channel(self, chan: 'AbstractChannel') -> None: outpoint = chan.funding_outpoint.to_str() address = chan.get_funding_address() callback = lambda: self.check_onchain_situation(address, outpoint) - callback() # run once, for side effects if chan.need_to_subscribe(): self.add_callback(address, callback) @@ -95,7 +94,9 @@ class LNWatcher(Logger, EventListener): self.logger.info(f'unwatching {funding_outpoint}') self.remove_callback(address) - def check_onchain_situation(self, address, funding_outpoint): + @ignore_exceptions + @log_exceptions + async def check_onchain_situation(self, address, funding_outpoint): # early return if address has not been added yet if not self.adb.is_mine(address): return @@ -107,13 +108,13 @@ class LNWatcher(Logger, EventListener): if closing_txid: closing_tx = self.adb.get_transaction(closing_txid) if closing_tx: - keep_watching = self.sweep_commitment_transaction(funding_outpoint, closing_tx) + keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx) else: self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...") keep_watching = True else: keep_watching = True - self.update_channel_state( + await self.update_channel_state( funding_outpoint=funding_outpoint, funding_txid=funding_txid, funding_height=funding_height, @@ -126,9 +127,10 @@ class LNWatcher(Logger, EventListener): def diagnostic_name(self): return f"{self.lnworker.wallet.diagnostic_name()}-LNW" - def update_channel_state(self, *, funding_outpoint: str, funding_txid: str, - funding_height: TxMinedInfo, closing_txid: str, - closing_height: TxMinedInfo, keep_watching: bool) -> None: + async def update_channel_state( + self, *, funding_outpoint: str, funding_txid: str, + funding_height: TxMinedInfo, closing_txid: str, + closing_height: TxMinedInfo, keep_watching: bool) -> None: chan = self.lnworker.channel_by_txo(funding_outpoint) if not chan: return @@ -138,9 +140,9 @@ class LNWatcher(Logger, EventListener): closing_txid=closing_txid, closing_height=closing_height, keep_watching=keep_watching) - self.lnworker.handle_onchain_state(chan) + await self.lnworker.handle_onchain_state(chan) - def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool: + async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool: """This function is called when a channel was closed. In this case we need to check for redeemable outputs of the commitment transaction or spenders down the line (HTLC-timeout/success transactions). diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 57ff15b45..7be4673a1 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -1207,7 +1207,7 @@ class LNWallet(LNWorker): if chan.funding_outpoint.to_str() == txo: return chan - def handle_onchain_state(self, chan: Channel): + async def handle_onchain_state(self, chan: Channel): if self.network is None: # network not started yet return @@ -1219,7 +1219,7 @@ class LNWallet(LNWorker): if (chan.get_state() in (ChannelState.OPEN, ChannelState.SHUTDOWN) and chan.should_be_closed_due_to_expiring_htlcs(self.wallet.adb.get_local_height())): self.logger.info(f"force-closing due to expiring htlcs") - asyncio.ensure_future(self.schedule_force_closing(chan.channel_id)) + await self.schedule_force_closing(chan.channel_id) elif chan.get_state() == ChannelState.FUNDED: peer = self._peers.get(chan.node_id) @@ -1238,7 +1238,7 @@ class LNWallet(LNWorker): height = self.lnwatcher.adb.get_tx_height(txid).height if height == TX_HEIGHT_LOCAL: self.logger.info('REBROADCASTING CLOSING TX') - asyncio.ensure_future(self.network.try_broadcasting(force_close_tx, 'force-close')) + await self.network.try_broadcasting(force_close_tx, 'force-close') def get_peer_by_static_jit_scid_alias(self, scid_alias: bytes) -> Optional[Peer]: for nodeid, peer in self.peers.items():