1
0

Persist LNWatcher transactions in wallet file:

- separate AddressSynchronizer from Wallet and LNWatcher
 - the AddressSynchronizer class is referred to as 'adb' (address database)
 - Use callbacks to replace overloaded methods
This commit is contained in:
ThomasV
2022-06-01 23:03:35 +02:00
parent b6de15b95d
commit 121d8732f1
23 changed files with 439 additions and 366 deletions

View File

@@ -16,6 +16,7 @@ from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGH
from .transaction import Transaction, TxOutpoint
from .transaction import match_script_against_template
from .lnutil import WITNESS_TEMPLATE_RECEIVED_HTLC, WITNESS_TEMPLATE_OFFERED_HTLC
from .logging import Logger
if TYPE_CHECKING:
@@ -135,24 +136,34 @@ class SweepStore(SqlDB):
class LNWatcher(AddressSynchronizer):
class LNWatcher(Logger):
LOGGING_SHORTCUT = 'W'
def __init__(self, network: 'Network'):
AddressSynchronizer.__init__(self, WalletDB({}, manual_upgrades=False))
def __init__(self, adb, network: 'Network'):
Logger.__init__(self)
self.adb = adb
self.config = network.config
self.callbacks = {} # address -> lambda: coroutine
self.network = network
util.register_callback(
self.on_network_update,
['network_updated', 'blockchain_updated', 'verified', 'wallet_updated', 'fee'])
util.register_callback(self.on_fee, ['fee'])
util.register_callback(self.on_blockchain_updated, ['blockchain_updated'])
util.register_callback(self.on_network_updated, ['network_updated'])
util.register_callback(self.on_adb_added_verified_tx, ['adb_added_verified_tx'])
util.register_callback(self.on_adb_set_up_to_date, ['adb_set_up_to_date'])
# status gets populated when we run
self.channel_status = {}
async def stop(self):
await super().stop()
util.unregister_callback(self.on_network_update)
util.unregister_callback(self.on_fee)
util.unregister_callback(self.on_blockchain_updated)
util.unregister_callback(self.on_network_updated)
util.unregister_callback(self.on_adb_added_verified_tx)
util.unregister_callback(self.on_adb_set_up_to_date)
def get_channel_status(self, outpoint):
return self.channel_status.get(outpoint, 'unknown')
@@ -171,15 +182,31 @@ class LNWatcher(AddressSynchronizer):
self.callbacks.pop(address, None)
def add_callback(self, address, callback):
self.add_address(address)
self.adb.add_address(address)
self.callbacks[address] = callback
async def on_fee(self, event, *args):
await self.trigger_callbacks()
async def on_network_updated(self, event, *args):
await self.trigger_callbacks()
async def on_blockchain_updated(self, event, *args):
await self.trigger_callbacks()
async def on_adb_added_verified_tx(self, event, adb, tx_hash):
if adb != self.adb:
return
await self.trigger_callbacks()
async def on_adb_set_up_to_date(self, event, adb):
if adb != self.adb:
return
await self.trigger_callbacks()
@log_exceptions
async def on_network_update(self, event, *args):
if event in ('verified', 'wallet_updated'):
if args[0] != self:
return
if not self.synchronizer:
async def trigger_callbacks(self):
if not self.adb.synchronizer:
self.logger.info("synchronizer not set yet")
return
for address, callback in list(self.callbacks.items()):
@@ -187,18 +214,18 @@ class LNWatcher(AddressSynchronizer):
async def check_onchain_situation(self, address, funding_outpoint):
# early return if address has not been added yet
if not self.is_mine(address):
if not self.adb.is_mine(address):
return
spenders = self.inspect_tx_candidate(funding_outpoint, 0)
# inspect_tx_candidate might have added new addresses, in which case we return ealy
if not self.is_up_to_date():
if not self.adb.is_up_to_date():
return
funding_txid = funding_outpoint.split(':')[0]
funding_height = self.get_tx_height(funding_txid)
funding_height = self.adb.get_tx_height(funding_txid)
closing_txid = spenders.get(funding_outpoint)
closing_height = self.get_tx_height(closing_txid)
closing_height = self.adb.get_tx_height(closing_txid)
if closing_txid:
closing_tx = self.db.get_transaction(closing_txid)
closing_tx = self.adb.get_transaction(closing_txid)
if closing_tx:
keep_watching = await self.do_breach_remedy(funding_outpoint, closing_tx, spenders)
else:
@@ -233,18 +260,18 @@ class LNWatcher(AddressSynchronizer):
n==2 => outpoint is a second-stage htlc
"""
prev_txid, index = outpoint.split(':')
spender_txid = self.db.get_spent_outpoint(prev_txid, int(index))
spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index))
result = {outpoint:spender_txid}
if n == 0:
if spender_txid is None:
self.channel_status[outpoint] = 'open'
elif not self.is_deeply_mined(spender_txid):
self.channel_status[outpoint] = 'closed (%d)' % self.get_tx_height(spender_txid).conf
self.channel_status[outpoint] = 'closed (%d)' % self.adb.get_tx_height(spender_txid).conf
else:
self.channel_status[outpoint] = 'closed (deep)'
if spender_txid is None:
return result
spender_tx = self.db.get_transaction(spender_txid)
spender_tx = self.adb.get_transaction(spender_txid)
if n == 1:
# if tx input is not a first-stage HTLC, we can stop recursion
if len(spender_tx.inputs()) != 1:
@@ -263,8 +290,8 @@ class LNWatcher(AddressSynchronizer):
for i, o in enumerate(spender_tx.outputs()):
if o.address is None:
continue
if not self.is_mine(o.address):
self.add_address(o.address)
if not self.adb.is_mine(o.address):
self.adb.add_address(o.address)
elif n < 2:
r = self.inspect_tx_candidate(spender_txid+':%d'%i, n+1)
result.update(r)
@@ -273,7 +300,7 @@ class LNWatcher(AddressSynchronizer):
def get_tx_mined_depth(self, txid: str):
if not txid:
return TxMinedDepth.FREE
tx_mined_depth = self.get_tx_height(txid)
tx_mined_depth = self.adb.get_tx_height(txid)
height, conf = tx_mined_depth.height, tx_mined_depth.conf
if conf > 100:
return TxMinedDepth.DEEP
@@ -298,13 +325,19 @@ class WatchTower(LNWatcher):
LOGGING_SHORTCUT = 'W'
def __init__(self, network):
LNWatcher.__init__(self, network)
adb = AddressSynchronizer(WalletDB({}, manual_upgrades=False))
adb.start_network(network)
LNWatcher.__init__(self, adb, network)
self.network = network
self.sweepstore = SweepStore(os.path.join(self.network.config.path, "watchtower_db"), network)
# this maps funding_outpoints to ListenerItems, which have an event for when the watcher is done,
# and a queue for seeing which txs are being published
self.tx_progress = {} # type: Dict[str, ListenerItem]
async def stop(self):
await super().stop()
await self.adb.stop()
def diagnostic_name(self):
return "local_tower"
@@ -327,7 +360,7 @@ class WatchTower(LNWatcher):
return keep_watching
async def broadcast_or_log(self, funding_outpoint: str, tx: Transaction):
height = self.get_tx_height(tx.txid()).height
height = self.adb.get_tx_height(tx.txid()).height
if height != TX_HEIGHT_LOCAL:
return
try:
@@ -379,7 +412,7 @@ class LNWalletWatcher(LNWatcher):
def __init__(self, lnworker: 'LNWallet', network: 'Network'):
self.network = network
self.lnworker = lnworker
LNWatcher.__init__(self, network)
LNWatcher.__init__(self, lnworker.wallet.adb, network)
def diagnostic_name(self):
return f"{self.lnworker.wallet.diagnostic_name()}-LNW"
@@ -412,7 +445,7 @@ class LNWalletWatcher(LNWatcher):
name = sweep_info.name + ' ' + chan.get_id_for_log()
spender_txid = spenders.get(prevout)
if spender_txid is not None:
spender_tx = self.db.get_transaction(spender_txid)
spender_tx = self.adb.get_transaction(spender_txid)
if not spender_tx:
keep_watching = True
continue
@@ -446,7 +479,7 @@ class LNWalletWatcher(LNWatcher):
broadcast = False
reason = 'waiting for {}: CLTV ({} > {}), prevout {}'.format(name, local_height, sweep_info.cltv_expiry, prevout)
if sweep_info.csv_delay:
prev_height = self.get_tx_height(prev_txid)
prev_height = self.adb.get_tx_height(prev_txid)
wanted_height = sweep_info.csv_delay + prev_height.height - 1
if prev_height.height <= 0 or wanted_height - local_height > 0:
broadcast = False
@@ -460,24 +493,16 @@ class LNWalletWatcher(LNWatcher):
if broadcast:
await self.network.try_broadcasting(tx, name)
else:
if txid in self.lnworker.wallet.future_tx:
if txid in self.adb.future_tx:
return
self.logger.debug(f'(chan {chan_id_for_log}) trying to redeem {name}: {prevout}')
self.logger.info(reason)
# it's OK to add local transaction, the fee will be recomputed
try:
tx_was_added = self.lnworker.wallet.add_future_tx(tx, wanted_height)
tx_was_added = self.adb.add_future_tx(tx, wanted_height)
except Exception as e:
self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}')
tx_was_added = False
if tx_was_added:
self.logger.info(f'added future tx: {name}. prevout: {prevout}')
util.trigger_callback('wallet_updated', self.lnworker.wallet)
def add_verified_tx(self, tx_hash: str, info: TxMinedInfo):
# this method is overloaded so that we have the GUI refreshed
# TODO: LNWatcher should not be an AddressSynchronizer,
# we should use the existing wallet instead, and results would be persisted
super().add_verified_tx(tx_hash, info)
tx_mined_status = self.get_tx_height(tx_hash)
util.trigger_callback('verified', self.lnworker.wallet, tx_hash, tx_mined_status)