1
0

call lnwatcher callbacks in asyncio thread

This partially reverts fbebe7de1a
lnwatcher.trigger_callbacks is called manually in commands.py
This commit is contained in:
ThomasV
2025-05-06 11:59:53 +02:00
parent 855aff7c44
commit 5be646dfd2
3 changed files with 28 additions and 23 deletions

View File

@@ -1061,6 +1061,9 @@ class Commands(Logger):
arg:bool:show_fees:Show miner fees paid by transactions arg:bool:show_fees:Show miner fees paid by transactions
arg:int:year:Show history for a given year arg:int:year:Show history for a given year
""" """
# trigger lnwatcher callbacks for their side effects: setting labels and accounting_addresses
await wallet.lnworker.lnwatcher.trigger_callbacks(requires_synchronizer=False)
#'from_height': (None, "Only show transactions that confirmed after given block height"), #'from_height': (None, "Only show transactions that confirmed after given block height"),
#'to_height': (None, "Only show transactions that confirmed before given block height"), #'to_height': (None, "Only show transactions that confirmed before given block height"),
kwargs = self.get_year_timestamps(year) kwargs = self.get_year_timestamps(year)

View File

@@ -5,7 +5,7 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .util import TxMinedInfo, BelowDustLimit from .util import TxMinedInfo, BelowDustLimit
from .util import EventListener, event_listener from .util import EventListener, event_listener, log_exceptions, ignore_exceptions
from .transaction import Transaction, TxOutpoint from .transaction import Transaction, TxOutpoint
from .logging import Logger from .logging import Logger
@@ -48,12 +48,12 @@ class LNWatcher(Logger, EventListener):
self.adb.add_address(address) self.adb.add_address(address)
self.callbacks[address] = callback self.callbacks[address] = callback
def trigger_callbacks(self): async def trigger_callbacks(self, *, requires_synchronizer=True):
if not self.adb.synchronizer: if requires_synchronizer and not self.adb.synchronizer:
self.logger.info("synchronizer not set yet") self.logger.info("synchronizer not set yet")
return return
for address, callback in list(self.callbacks.items()): for address, callback in list(self.callbacks.items()):
callback() await callback()
@event_listener @event_listener
async def on_event_blockchain_updated(self, *args): async def on_event_blockchain_updated(self, *args):
@@ -62,32 +62,31 @@ class LNWatcher(Logger, EventListener):
# (hold invoice preimage revealed, MPP completed, etc) # (hold invoice preimage revealed, MPP completed, etc)
for chan in self.lnworker.channels.values(): for chan in self.lnworker.channels.values():
chan._sweep_info.clear() chan._sweep_info.clear()
self.trigger_callbacks() await self.trigger_callbacks()
@event_listener @event_listener
def on_event_wallet_updated(self, wallet): async def on_event_wallet_updated(self, wallet):
# called if we add local tx # called if we add local tx
if wallet.adb != self.adb: if wallet.adb != self.adb:
return return
self.trigger_callbacks() await self.trigger_callbacks()
@event_listener @event_listener
def on_event_adb_added_verified_tx(self, adb, tx_hash): async def on_event_adb_added_verified_tx(self, adb, tx_hash):
if adb != self.adb: if adb != self.adb:
return return
self.trigger_callbacks() await self.trigger_callbacks()
@event_listener @event_listener
def on_event_adb_set_up_to_date(self, adb): async def on_event_adb_set_up_to_date(self, adb):
if adb != self.adb: if adb != self.adb:
return return
self.trigger_callbacks() await self.trigger_callbacks()
def add_channel(self, chan: 'AbstractChannel') -> None: def add_channel(self, chan: 'AbstractChannel') -> None:
outpoint = chan.funding_outpoint.to_str() outpoint = chan.funding_outpoint.to_str()
address = chan.get_funding_address() address = chan.get_funding_address()
callback = lambda: self.check_onchain_situation(address, outpoint) callback = lambda: self.check_onchain_situation(address, outpoint)
callback() # run once, for side effects
if chan.need_to_subscribe(): if chan.need_to_subscribe():
self.add_callback(address, callback) self.add_callback(address, callback)
@@ -95,7 +94,9 @@ class LNWatcher(Logger, EventListener):
self.logger.info(f'unwatching {funding_outpoint}') self.logger.info(f'unwatching {funding_outpoint}')
self.remove_callback(address) self.remove_callback(address)
def check_onchain_situation(self, address, funding_outpoint): @ignore_exceptions
@log_exceptions
async def check_onchain_situation(self, address, funding_outpoint):
# early return if address has not been added yet # early return if address has not been added yet
if not self.adb.is_mine(address): if not self.adb.is_mine(address):
return return
@@ -107,13 +108,13 @@ class LNWatcher(Logger, EventListener):
if closing_txid: if closing_txid:
closing_tx = self.adb.get_transaction(closing_txid) closing_tx = self.adb.get_transaction(closing_txid)
if closing_tx: if closing_tx:
keep_watching = self.sweep_commitment_transaction(funding_outpoint, closing_tx) keep_watching = await self.sweep_commitment_transaction(funding_outpoint, closing_tx)
else: else:
self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...") self.logger.info(f"channel {funding_outpoint} closed by {closing_txid}. still waiting for tx itself...")
keep_watching = True keep_watching = True
else: else:
keep_watching = True keep_watching = True
self.update_channel_state( await self.update_channel_state(
funding_outpoint=funding_outpoint, funding_outpoint=funding_outpoint,
funding_txid=funding_txid, funding_txid=funding_txid,
funding_height=funding_height, funding_height=funding_height,
@@ -126,9 +127,10 @@ class LNWatcher(Logger, EventListener):
def diagnostic_name(self): def diagnostic_name(self):
return f"{self.lnworker.wallet.diagnostic_name()}-LNW" return f"{self.lnworker.wallet.diagnostic_name()}-LNW"
def update_channel_state(self, *, funding_outpoint: str, funding_txid: str, async def update_channel_state(
funding_height: TxMinedInfo, closing_txid: str, self, *, funding_outpoint: str, funding_txid: str,
closing_height: TxMinedInfo, keep_watching: bool) -> None: funding_height: TxMinedInfo, closing_txid: str,
closing_height: TxMinedInfo, keep_watching: bool) -> None:
chan = self.lnworker.channel_by_txo(funding_outpoint) chan = self.lnworker.channel_by_txo(funding_outpoint)
if not chan: if not chan:
return return
@@ -138,9 +140,9 @@ class LNWatcher(Logger, EventListener):
closing_txid=closing_txid, closing_txid=closing_txid,
closing_height=closing_height, closing_height=closing_height,
keep_watching=keep_watching) keep_watching=keep_watching)
self.lnworker.handle_onchain_state(chan) await self.lnworker.handle_onchain_state(chan)
def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool: async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool:
"""This function is called when a channel was closed. In this case """This function is called when a channel was closed. In this case
we need to check for redeemable outputs of the commitment transaction we need to check for redeemable outputs of the commitment transaction
or spenders down the line (HTLC-timeout/success transactions). or spenders down the line (HTLC-timeout/success transactions).

View File

@@ -1207,7 +1207,7 @@ class LNWallet(LNWorker):
if chan.funding_outpoint.to_str() == txo: if chan.funding_outpoint.to_str() == txo:
return chan return chan
def handle_onchain_state(self, chan: Channel): async def handle_onchain_state(self, chan: Channel):
if self.network is None: if self.network is None:
# network not started yet # network not started yet
return return
@@ -1219,7 +1219,7 @@ class LNWallet(LNWorker):
if (chan.get_state() in (ChannelState.OPEN, ChannelState.SHUTDOWN) if (chan.get_state() in (ChannelState.OPEN, ChannelState.SHUTDOWN)
and chan.should_be_closed_due_to_expiring_htlcs(self.wallet.adb.get_local_height())): and chan.should_be_closed_due_to_expiring_htlcs(self.wallet.adb.get_local_height())):
self.logger.info(f"force-closing due to expiring htlcs") self.logger.info(f"force-closing due to expiring htlcs")
asyncio.ensure_future(self.schedule_force_closing(chan.channel_id)) await self.schedule_force_closing(chan.channel_id)
elif chan.get_state() == ChannelState.FUNDED: elif chan.get_state() == ChannelState.FUNDED:
peer = self._peers.get(chan.node_id) peer = self._peers.get(chan.node_id)
@@ -1238,7 +1238,7 @@ class LNWallet(LNWorker):
height = self.lnwatcher.adb.get_tx_height(txid).height height = self.lnwatcher.adb.get_tx_height(txid).height
if height == TX_HEIGHT_LOCAL: if height == TX_HEIGHT_LOCAL:
self.logger.info('REBROADCASTING CLOSING TX') self.logger.info('REBROADCASTING CLOSING TX')
asyncio.ensure_future(self.network.try_broadcasting(force_close_tx, 'force-close')) await self.network.try_broadcasting(force_close_tx, 'force-close')
def get_peer_by_static_jit_scid_alias(self, scid_alias: bytes) -> Optional[Peer]: def get_peer_by_static_jit_scid_alias(self, scid_alias: bytes) -> Optional[Peer]:
for nodeid, peer in self.peers.items(): for nodeid, peer in self.peers.items():