From bdb7a8222030752207a8e855ea2bbb47934bf761 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Fri, 14 Feb 2025 14:12:12 +0100 Subject: [PATCH] batch payment manager: The class TxBatcher handles the creation, broadcast and replacement of replaceable transactions. Callers (LNWatcher, SwapManager) use methods add_payment_output and add_sweep_info. Transactions created by TxBatcher may combine sweeps and outgoing payments. Transactions created by TxBatcher will have their fee bumped automatically (this was only the case for sweeps before). TxBatcher manages several TxBatches. TxBatches are created dynamically when needed. The GUI does not touch txbatcher transactions: - wallet.get_candidates_for_batching excludes txbatcher transactions - RBF dialogs do not work with txbatcher transactions wallet: - instead of reading config variables, make_unsigned_transaction takes new parameters: base_tx, send_change_to_lighting tests: - unit tests in test_txbatcher.py (replaces test_sswaps.py) - force all regtests to use MPP, so that we sweep transactions with several HTLCs. This forces the payment manager to aggregate first-stage HTLC tx inputs. second-stage are not batched for now. --- electrum/gui/qt/main_window.py | 5 + electrum/lnsweep.py | 23 +- electrum/lnwatcher.py | 156 ++--------- electrum/simple_config.py | 4 +- electrum/submarine_swaps.py | 163 +++++------- electrum/transaction.py | 3 + electrum/txbatcher.py | 459 +++++++++++++++++++++++++++++++++ electrum/wallet.py | 52 +++- tests/regtest.py | 2 + tests/regtest/regtest.sh | 3 + tests/test_sswaps.py | 80 ------ tests/test_txbatcher.py | 216 ++++++++++++++++ 12 files changed, 840 insertions(+), 326 deletions(-) create mode 100644 electrum/txbatcher.py delete mode 100644 tests/test_sswaps.py create mode 100644 tests/test_txbatcher.py diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 92a9a36a2..4ce831971 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -461,6 +461,11 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener): def on_event_banner(self, *args): self.console.showMessage(args[0]) + @qt_event_listener + def on_event_adb_set_future_tx(self, adb, txid): + if adb == self.wallet.adb: + self.history_model.refresh('set_future_tx') + @qt_event_listener def on_event_verified(self, *args): wallet, tx_hash, tx_mined_status = args diff --git a/electrum/lnsweep.py b/electrum/lnsweep.py index c0e3229b2..722434ad5 100644 --- a/electrum/lnsweep.py +++ b/electrum/lnsweep.py @@ -43,6 +43,8 @@ class SweepInfo(NamedTuple): cltv_abs: Optional[int] # set to None only if the script has no cltv txin: PartialTxInput txout: Optional[PartialTxOutput] # only for first-stage htlc tx + can_be_batched: bool # todo: this could be more fine-grained + def sweep_their_ctx_watchtower( chan: 'Channel', @@ -251,7 +253,8 @@ def sweep_their_htlctx_justice( csv_delay=0, cltv_abs=None, txin=txin, - txout=None + txout=None, + can_be_batched=False, ) return index_to_sweepinfo @@ -329,6 +332,7 @@ def sweep_our_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # to_local @@ -350,6 +354,7 @@ def sweep_our_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) we_breached = ctn < chan.get_oldest_unrevoked_ctn(LOCAL) if we_breached: @@ -384,7 +389,11 @@ def sweep_our_ctx( csv_delay=0, cltv_abs=htlc_tx.locktime, txin=htlc_tx.inputs()[0], - txout=htlc_tx.outputs()[0]) + txout=htlc_tx.outputs()[0], + can_be_batched=False, # both parties can spend + # actually, we might want to batch depending on the context + # f(amount in htlc, remaining_time, number of available utxos for anchors) + ) else: # second-stage address = bitcoin.script_to_p2wsh(htlctx_witness_script) @@ -404,6 +413,9 @@ def sweep_our_ctx( cltv_abs=0, txin=sweep_txin, txout=None, + # this is safe to batch, we are the only ones who can spend + # (assuming we did not broadcast a revoked state) + can_be_batched=True, ) # offered HTLCs, in our ctx --> "timeout" @@ -541,6 +553,7 @@ def sweep_their_ctx_to_remote_backup( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # to_remote @@ -562,6 +575,7 @@ def sweep_their_ctx_to_remote_backup( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) return txs @@ -619,6 +633,7 @@ def sweep_their_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # to_local is handled by lnwatcher @@ -631,6 +646,7 @@ def sweep_their_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=False, ) # to_remote @@ -656,12 +672,14 @@ def sweep_their_ctx( our_payment_privkey=our_payment_privkey, has_anchors=chan.has_anchors() ): + # todo: we might not want to sweep this at all, if we add it to the wallet addresses txs[prevout] = SweepInfo( name='their_ctx_to_remote', csv_delay=csv_delay, cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # HTLCs @@ -701,6 +719,7 @@ def sweep_their_ctx( cltv_abs=cltv_abs, txin=txin, txout=None, + can_be_batched=False, ) # received HTLCs, in their ctx --> "timeout" # offered HTLCs, in their ctx --> "success" diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py index 70acd7692..086000362 100644 --- a/electrum/lnwatcher.py +++ b/electrum/lnwatcher.py @@ -2,20 +2,14 @@ # Distributed under the MIT software license, see the accompanying # file LICENCE or http://www.opensource.org/licenses/mit-license.php -from typing import NamedTuple, Iterable, TYPE_CHECKING -import copy -import asyncio +from typing import TYPE_CHECKING from enum import IntEnum, auto -from typing import NamedTuple, Dict -from . import util -from .util import log_exceptions, ignore_exceptions, TxMinedInfo +from .util import log_exceptions, ignore_exceptions, TxMinedInfo, BelowDustLimit from .util import EventListener, event_listener from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_FUTURE -from .transaction import Transaction, TxOutpoint, PartialTransaction +from .transaction import Transaction, TxOutpoint from .logging import Logger -from .bitcoin import dust_threshold -from .fee_policy import FeePolicy if TYPE_CHECKING: @@ -46,7 +40,6 @@ class LNWatcher(Logger, EventListener): self.register_callbacks() # status gets populated when we run self.channel_status = {} - self.fee_policy = FeePolicy('eta:2') async def stop(self): self.unregister_callbacks() @@ -75,6 +68,13 @@ class LNWatcher(Logger, EventListener): async def on_event_blockchain_updated(self, *args): await self.trigger_callbacks() + @event_listener + async def on_event_wallet_updated(self, wallet): + # called if we add local tx + if wallet.adb != self.adb: + return + await self.trigger_callbacks() + @event_listener async def on_event_adb_added_verified_tx(self, adb, tx_hash): if adb != self.adb: @@ -141,6 +141,10 @@ class LNWatcher(Logger, EventListener): """ prev_txid, index = outpoint.split(':') spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index)) + # discard local spenders + tx_mined_status = self.adb.get_tx_height(spender_txid) + if tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + spender_txid = None if not spender_txid: return spender_tx = self.adb.get_transaction(spender_txid) @@ -211,18 +215,6 @@ class LNWalletWatcher(LNWatcher): keep_watching=keep_watching) await self.lnworker.handle_onchain_state(chan) - def is_dust(self, sweep_info): - if sweep_info.name in ['local_anchor', 'remote_anchor']: - return False - if sweep_info.txout is not None: - return False - value = sweep_info.txin._trusted_value_sats - witness_size = len(sweep_info.txin.make_witness(71*b'\x00')) - tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh - self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}') - fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.network, allow_fallback_to_static_rates=True) - return value - fee <= dust_threshold() - @log_exceptions async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool: """This function is called when a channel was closed. In this case @@ -235,19 +227,16 @@ class LNWalletWatcher(LNWatcher): return False # detect who closed and get information about how to claim outputs sweep_info_dict = chan.sweep_ctx(closing_tx) - self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}") + #self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}") keep_watching = False if sweep_info_dict else not self.is_deeply_mined(closing_tx.txid()) - # create and broadcast transactions for prevout, sweep_info in sweep_info_dict.items(): - if self.is_dust(sweep_info): - continue prev_txid, prev_index = prevout.split(':') name = sweep_info.name + ' ' + chan.get_id_for_log() self.lnworker.wallet.set_default_label(prevout, name) if not self.adb.get_transaction(prev_txid): # do not keep watching if prevout does not exist - self.logger.info(f'prevout does not exist for {name}: {prev_txid}') + self.logger.info(f'prevout does not exist for {name}: {prevout}') continue spender_txid = self.get_spender(prevout) spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None @@ -260,115 +249,20 @@ class LNWalletWatcher(LNWatcher): if htlc_tx_spender: keep_watching |= not self.is_deeply_mined(htlc_tx_spender) else: - keep_watching = True - await self.maybe_redeem(prevout2, htlc_sweep_info, name) + keep_watching |= self.maybe_redeem(htlc_sweep_info) keep_watching |= not self.is_deeply_mined(spender_txid) self.maybe_extract_preimage(chan, spender_tx, prevout) else: - keep_watching = True - # broadcast or maybe update our own tx - await self.maybe_redeem(prevout, sweep_info, name) - + keep_watching |= self.maybe_redeem(sweep_info) return keep_watching - def get_redeem_tx(self, prevout: str, sweep_info: 'SweepInfo', name: str): - # check if redeem tx needs to be updated - # if it is in the mempool, we need to check fee rise - txid = self.get_spender(prevout) - old_tx = self.adb.get_transaction(txid) - assert old_tx is not None or txid is None - tx_depth = self.get_tx_mined_depth(txid) if txid else None - if txid and tx_depth not in [TxMinedDepth.FREE, TxMinedDepth.MEMPOOL]: - assert old_tx is not None - return old_tx, None - # fixme: deepcopy is needed because tx.serialize() is destructive - inputs = [copy.deepcopy(sweep_info.txin)] - outputs = [sweep_info.txout] if sweep_info.txout else [] - if sweep_info.name == 'first-stage-htlc': - new_tx = PartialTransaction.from_io(inputs, outputs, locktime=sweep_info.cltv_abs, version=2) - self.lnworker.wallet.sign_transaction(new_tx, password=None, ignore_warnings=True) - else: - # password is needed for 1st stage htlc tx with anchors because we add inputs - password = self.lnworker.wallet.get_unlocked_password() - new_tx = self.lnworker.wallet.create_transaction( - fee_policy = self.fee_policy, - inputs = inputs, - outputs = outputs, - password = password, - locktime = sweep_info.cltv_abs, - BIP69_sort=False, - ) - if new_tx is None: - self.logger.info(f'{name} could not claim output: {prevout}, dust') - assert old_tx is not None - return old_tx, None - if txid is None: - return None, new_tx - elif tx_depth == TxMinedDepth.MEMPOOL: - delta = new_tx.get_fee() - self.adb.get_tx_fee(txid) - if delta > 1: - self.logger.info(f'increasing fee of mempool tx {name}: {prevout}') - return old_tx, new_tx - else: - assert old_tx is not None - return old_tx, None - elif tx_depth == TxMinedDepth.FREE: - # return new tx, even if it is equal to old_tx, - # because we need to test if it can be broadcast - return old_tx, new_tx - else: - assert old_tx is not None - return old_tx, None - - async def maybe_redeem(self, prevout, sweep_info: 'SweepInfo', name: str) -> None: - old_tx, new_tx = self.get_redeem_tx(prevout, sweep_info, name) - if new_tx is None: - return - prev_txid, prev_index = prevout.split(':') - can_broadcast = True - local_height = self.network.get_local_height() - if sweep_info.cltv_abs: - wanted_height = sweep_info.cltv_abs - if wanted_height - local_height > 0: - can_broadcast = False - # self.logger.debug(f"pending redeem for {prevout}. waiting for {name}: CLTV ({local_height=}, {wanted_height=})") - if sweep_info.csv_delay: - prev_height = self.adb.get_tx_height(prev_txid) - if prev_height.height > 0: - wanted_height = prev_height.height + sweep_info.csv_delay - 1 - else: - wanted_height = local_height + sweep_info.csv_delay - if wanted_height - local_height > 0: - can_broadcast = False - # self.logger.debug( - # f"pending redeem for {prevout}. waiting for {name}: CSV " - # f"({local_height=}, {wanted_height=}, {prev_height.height=}, {sweep_info.csv_delay=})") - if can_broadcast: - self.logger.info(f'we can broadcast: {name}') - if await self.network.try_broadcasting(new_tx, name): - tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None)) - else: - tx_was_added = False - else: - # we may have a tx with a different fee, in which case it will be replaced - if not old_tx or (old_tx and old_tx.txid() != new_tx.txid()): - try: - tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None)) - except Exception as e: - self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}') - tx_was_added = False - if tx_was_added: - self.logger.info(f'added redeem tx: {name}. prevout: {prevout}') - else: - tx_was_added = False - # set future tx regardless of tx_was_added, because it is not persisted - # (and wanted_height can change if input of CSV was not mined before) - self.adb.set_future_tx(new_tx.txid(), wanted_height=wanted_height) - if tx_was_added: - self.lnworker.wallet.set_label(new_tx.txid(), name) - if old_tx and old_tx.txid() != new_tx.txid(): - self.lnworker.wallet.set_label(old_tx.txid(), None) - util.trigger_callback('wallet_updated', self.lnworker.wallet) + def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool: + """ returns False if it was dust """ + try: + self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info, self.config.FEE_POLICY_LIGHTNING) + except BelowDustLimit: + return False + return True def maybe_extract_preimage(self, chan: 'AbstractChannel', spender_tx: Transaction, prevout: str): txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout)) diff --git a/electrum/simple_config.py b/electrum/simple_config.py index ad2036715..66d7d55c8 100644 --- a/electrum/simple_config.py +++ b/electrum/simple_config.py @@ -675,7 +675,9 @@ Warning: setting this to too low will result in lots of payment failures."""), TEST_SHUTDOWN_FEE_RANGE = ConfigVar('test_shutdown_fee_range', default=None) TEST_SHUTDOWN_LEGACY = ConfigVar('test_shutdown_legacy', default=False, type_=bool) - FEE_POLICY = ConfigVar('fee_policy', default='eta:2', type_=str) + FEE_POLICY = ConfigVar('fee_policy', default='eta:2', type_=str) # exposed to GUI + FEE_POLICY_LIGHTNING = ConfigVar('fee_policy_lightning', default='eta:2', type_=str) # for txbatcher (sweeping) + FEE_POLICY_SWAPS = ConfigVar('fee_policy_swaps', default='eta:2', type_=str) # for txbatcher (sweeping and sending if we are a swapserver) RPC_USERNAME = ConfigVar('rpcuser', default=None, type_=str) RPC_PASSWORD = ConfigVar('rpcpassword', default=None, type_=str) diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index fab40f8c0..cf00bcd7d 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -35,7 +35,7 @@ from .lnutil import hex_to_bytes from .lnaddr import lndecode from .json_db import StoredObject, stored_in from . import constants -from .address_synchronizer import TX_HEIGHT_LOCAL +from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE from .i18n import _ from .fee_policy import FeePolicy @@ -44,6 +44,7 @@ from .crypto import ripemd from .invoices import Invoice from .network import TxBroadcastError from .lnonion import OnionRoutingFailure, OnionFailureCode +from .lnsweep import SweepInfo if TYPE_CHECKING: @@ -154,38 +155,14 @@ class SwapData(StoredObject): _funding_prevout = None # type: Optional[TxOutpoint] # for RBF _payment_hash = None _zeroconf = False + _payment_pending = False # for forward swaps @property def payment_hash(self) -> bytes: return self._payment_hash def is_funded(self) -> bool: - return self.funding_txid is not None - - -def create_claim_tx( - *, - txin: PartialTxInput, - swap: SwapData, - network: 'Network', - fee_policy: FeePolicy, -) -> PartialTransaction: - """Create tx to either claim successful reverse-swap, - or to get refunded for timed-out forward-swap. - """ - # FIXME the mining fee should depend on swap.is_reverse. - # the txs are not the same size... - amount_sat = txin.value_sats() - SwapManager._get_fee(size=SWAP_TX_SIZE, fee_policy=fee_policy, network=network) - if amount_sat < dust_threshold(): - raise BelowDustLimit() - txin, locktime = SwapManager.create_claim_txin(txin=txin, swap=swap) - txout = PartialTxOutput.from_address_and_value(swap.receive_address, amount_sat) - tx = PartialTransaction.from_io([txin], [txout], version=2, locktime=locktime) - sig = tx.sign_txin(0, txin.privkey) - txin.script_sig = b'' - txin.witness = txin.make_witness(sig) - assert tx.is_complete() - return tx + return self._payment_pending or bool(self.funding_txid) class SwapManager(Logger): @@ -202,7 +179,6 @@ class SwapManager(Logger): self.wallet = wallet self.config = wallet.config - self.fee_policy = FeePolicy(wallet.config.FEE_POLICY) self.lnworker = lnworker self.config = wallet.config self.taskgroup = OldTaskGroup() @@ -371,55 +347,35 @@ class SwapManager(Logger): self._add_or_reindex_swap(swap) # to update _swaps_by_funding_outpoint funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex()) spent_height = txin.spent_height - should_bump_fee = False + # set spending_txid (even if tx is local), for GUI grouping + swap.spending_txid = txin.spent_txid + # discard local spenders + if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + spent_height = None if spent_height is not None: - swap.spending_txid = txin.spent_txid if spent_height > 0: if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY: self.logger.info(f'stop watching swap {swap.lockup_address}') self.lnwatcher.remove_callback(swap.lockup_address) swap.is_redeemed = True - elif spent_height == TX_HEIGHT_LOCAL: - if funding_height.conf > 0 or (swap.is_reverse and swap._zeroconf): - tx = self.lnwatcher.adb.get_transaction(txin.spent_txid) - try: - await self.network.broadcast_transaction(tx) - except TxBroadcastError: - self.logger.info(f'error broadcasting claim tx {txin.spent_txid}') - elif funding_height.height == TX_HEIGHT_LOCAL: - # the funding tx was double spent. - # this will remove both funding and child (spending tx) from adb - self.lnwatcher.adb.remove_transaction(swap.funding_txid) - swap.funding_txid = None - swap.spending_txid = None - else: - # spending tx is in mempool - pass if not swap.is_reverse: if swap.preimage is None and spent_height is not None: # extract the preimage, add it to lnwatcher claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid) - preimage = claim_tx.inputs()[0].witness_elements()[1] - if sha256(preimage) == swap.payment_hash: - swap.preimage = preimage - self.logger.info(f'found preimage: {preimage.hex()}') - self.lnworker.preimages[swap.payment_hash.hex()] = preimage.hex() - # note: we must check the payment secret before we broadcast the funding tx + for txin in claim_tx.inputs(): + preimage = txin.witness_elements()[1] + if sha256(preimage) == swap.payment_hash: + swap.preimage = preimage + self.logger.info(f'found preimage: {preimage.hex()}') + self.lnworker.preimages[swap.payment_hash.hex()] = preimage.hex() + break else: # this is our refund tx if spent_height > 0: self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}') self._fail_swap(swap, 'refund tx confirmed') return - else: - claim_tx.add_info_from_wallet(self.wallet) - claim_tx_fee = claim_tx.get_fee() - recommended_fee = self.get_swap_tx_fee() - if claim_tx_fee * 1.1 < recommended_fee: - should_bump_fee = True - self.logger.info(f'claim tx fee too low {claim_tx_fee} < {recommended_fee}. we will bump the fee') - if remaining_time > 0: # too early for refund return @@ -447,32 +403,36 @@ class SwapManager(Logger): # for testing: do not create claim tx return - if spent_height is not None and not should_bump_fee: + if spent_height is not None and spent_height > 0: return + txin, locktime = self.create_claim_txin(txin=txin, swap=swap) + # note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation + csv = 1 if (swap.is_reverse and not swap._zeroconf) else 0 + name = 'swap claim' if swap.is_reverse else 'swap refund' + can_be_batched = bool(csv) if swap.is_reverse else True + sweep_info = SweepInfo( + txin=txin, + csv_delay=csv, + cltv_abs=locktime, + txout=None, + name=name, + can_be_batched=can_be_batched, + ) try: - tx = create_claim_tx(txin=txin, swap=swap, fee_policy=self.fee_policy, network=self.network) + self.wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.config.FEE_POLICY_SWAPS) except BelowDustLimit: self.logger.info('utxo value below dust threshold') return - self.logger.info(f'adding claim tx {tx.txid()}') - self.wallet.adb.add_transaction(tx) - swap.spending_txid = tx.txid() - if funding_height.conf > 0 or (swap.is_reverse and swap._zeroconf): - try: - await self.network.broadcast_transaction(tx) - except TxBroadcastError: - self.logger.info(f'error broadcasting claim tx {txin.spent_txid}') def get_swap_tx_fee(self): - return self.get_fee(SWAP_TX_SIZE) + return self._get_tx_fee(self.config.FEE_POLICY) - def get_fee(self, size): - # note: 'size' is in vbytes - return self._get_fee(size=size, fee_policy=self.fee_policy, network=self.network) + def get_fee_for_txbatcher(self): + return self._get_tx_fee(self.config.FEE_POLICY_SWAPS) - @classmethod - def _get_fee(cls, *, size, fee_policy: FeePolicy, network: 'Network'): - return fee_policy.estimate_fee(size, network=network, allow_fallback_to_static_rates=True) + def _get_tx_fee(self, policy_descriptor: str): + fee_policy = FeePolicy(policy_descriptor) + return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True) def get_swap(self, payment_hash: bytes) -> Optional[SwapData]: # for history @@ -493,19 +453,11 @@ class SwapManager(Logger): if key in self.swaps: swap = self.swaps[key] if not swap.is_funded(): - password = self.wallet.get_unlocked_password() - for batch_rbf in [False]: - # FIXME: tx batching is disabled, because extra logic is needed to handle - # the case where the base tx gets mined. - tx = self.create_funding_tx(swap, None, password=password) - self.logger.info(f'adding funding_tx {tx.txid()}') - self.wallet.adb.add_transaction(tx) - try: - await self.broadcast_funding_tx(swap, tx) - except TxBroadcastError: - self.wallet.adb.remove_transaction(tx.txid()) - continue - break + output = self.create_funding_output(swap) + self.wallet.txbatcher.add_payment_output('swaps', output, self.config.FEE_POLICY_SWAPS) + swap._payment_pending = True + else: + self.logger.info(f'key not in swaps {key}') def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None): """ server method """ @@ -544,7 +496,7 @@ class SwapManager(Logger): ) -> Tuple[SwapData, str, Optional[str]]: """creates a hold invoice""" if prepay: - prepay_amount_sat = self.get_swap_tx_fee() * 2 + prepay_amount_sat = self.get_fee_for_txbatcher() * 2 invoice_amount_sat = lightning_amount_sat - prepay_amount_sat else: invoice_amount_sat = lightning_amount_sat @@ -806,13 +758,15 @@ class SwapManager(Logger): return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount) def create_funding_tx( - self, - swap: SwapData, - tx: Optional[PartialTransaction], - *, - password, + self, + swap: SwapData, + tx: Optional[PartialTransaction], + *, + password, ) -> PartialTransaction: # create funding tx + # use fee policy set by user (not using txbatcher) + fee_policy = FeePolicy(self.config.FEE_POLICY) # note: rbf must not decrease payment # this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output if tx is None: @@ -821,7 +775,7 @@ class SwapManager(Logger): outputs=[funding_output], rbf=True, password=password, - fee_policy=self.fee_policy, + fee_policy=fee_policy, ) else: tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address) @@ -957,7 +911,7 @@ class SwapManager(Logger): self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000 self._min_amount = 20000 self._max_amount = 10000000 - self.mining_fee = self.get_fee(SWAP_TX_SIZE) + self.mining_fee = self.get_fee_for_txbatcher() def update_pairs(self, pairs): self.logger.info(f'updating fees {pairs}') @@ -1075,13 +1029,12 @@ class SwapManager(Logger): swaps.append(swap) return swaps - def get_swap_by_claim_tx(self, tx: Transaction) -> Optional[SwapData]: - # note: we don't batch claim txs atm (batch_rbf cannot combine them - # as the inputs do not belong to the wallet) - if not (len(tx.inputs()) == 1 and len(tx.outputs()) == 1): - return None - txin = tx.inputs()[0] - return self.get_swap_by_claim_txin(txin) + def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[SwapData]: + swaps = [] + for i, txin in enumerate(tx.inputs()): + if swap := self.get_swap_by_claim_txin(txin): + swaps.append((i, swap)) + return swaps def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]: return self._swaps_by_funding_outpoint.get(txin.prevout) diff --git a/electrum/transaction.py b/electrum/transaction.py index fcc5f338f..3abc4879d 100644 --- a/electrum/transaction.py +++ b/electrum/transaction.py @@ -1247,6 +1247,9 @@ class Transaction: def get_change_outputs(self): return [o for o in self._outputs if o.is_change] + def has_change(self): + return len(self.get_change_outputs()) > 0 + def get_dummy_output(self, dummy_addr: str) -> Optional['PartialTxOutput']: idxs = self.get_output_idxs_from_address(dummy_addr) if not idxs: diff --git a/electrum/txbatcher.py b/electrum/txbatcher.py new file mode 100644 index 000000000..5cdc80988 --- /dev/null +++ b/electrum/txbatcher.py @@ -0,0 +1,459 @@ +import asyncio +import threading +import copy + +from typing import Dict, Sequence +from . import util +from .bitcoin import dust_threshold +from .logging import Logger +from .util import log_exceptions, NotEnoughFunds, BelowDustLimit +from .transaction import PartialTransaction, PartialTxOutput, Transaction +from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE +from .lnsweep import SweepInfo +from typing import Optional + +# This class batches outgoing payments and incoming utxo sweeps. +# It ensures that we do not send a payment twice. +# +# Explanation of the problem: +# Suppose we are asked to send two payments: first o1, and then o2. +# We replace tx1(o1) (that pays to o1) with tx1'(o1,o2), that pays to o1 and o2. +# tx1 and tx1' use the same inputs, so they cannot both be mined in the same blockchain. +# If tx1 is mined instead of tx1', we now need to pay o2, so we will broadcast a new transaction tx2(o2). +# However, tx1 may be removed from the blockchain, due to a reorg, and a chain with tx1' can become the valid one. +# In that case, we might pay o2 twice: with tx1' and with tx2 +# +# The following code prevents that by making tx2 a child of tx1. +# This is denoted by tx2(tx1|o2). +# +# Example: +# +# output 1: tx1(o1) --------------- +# \ +# output 2: tx1'(o1,o2)------- ----> tx2(tx1|o2) ------ +# \ \ \ +# output 3: tx1''(o1,o2,o3) \ ---> tx2'(tx1|o2,o3) ----> tx3(tx2|o3) (if tx2 is mined) +# \ +# --------------------------------> tx3(tx1'|o3) (if tx1' is mined) +# +# In the above example, we have to make 3 payments. +# Suppose we have broadcast tx1, tx1' and tx1'' +# - if tx1 gets mined, we broadcast: tx2'(tx1|o2,o3) +# - if tx1' gets mined, we broadcast tx3(tx1'|o3) +# +# Note that there are two possible execution paths that may lead to the creation of tx3: +# - as a child of tx2 +# - as a child of tx1' +# +# A batch is a set of incompatible txs, such as [tx1, tx1', tx1'']. +# Note that we do not persist older batches. We only persist the current batch in self.batch_txids. +# Thus, if we need to broadcast tx2 or tx2', then self.batch_txids is reset, and the old batch is forgotten. +# +# If we cannot RBF a transaction (because the server returns an error), then we create a new batch, +# as if the transaction had been mined. +# if cannot_rbf(tx1) -> broadcast tx2(tx1,o2). The new base is now tx2(tx,o2) +# if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3) +# +# +# Notes: +# +# 1. CPFP: +# When a batch is forgotten but not mined (because the server returned an error), we no longer bump its fee. +# However, the current code does not theat the next batch as a CPFP when computing the fee. +# +# 2. Reorgs: +# This code does not guarantee that a payment or a sweep will happen. +# This is fine for sweeps; it is the responsibility of the caller (lnwatcher) to add them again. +# To make payments reorg-safe, we would need to persist more data and redo failed payments. +# +# 3. batch_payments and batch_inputs are not persisted. +# In the case of sweeps, lnwatcher ensures that SweepInfo is added again after a client restart. +# In order to generalize that logic to payments, callers would need to pass a unique ID along with +# the payment output, so that we can prevent paying twice. + +from .json_db import locked, StoredDict +from .fee_policy import FeePolicy + + + +class TxBatcher(Logger): + + SLEEP_INTERVAL = 1 + RETRY_DELAY = 60 + + def __init__(self, wallet): + Logger.__init__(self) + self.lock = threading.RLock() + self.storage = wallet.db.get_stored_item("tx_batches", {}) + self.tx_batches = {} + self.wallet = wallet + for key, item_storage in self.storage.items(): + self.tx_batches[key] = TxBatch(self.wallet, item_storage) + self._legacy_htlcs = {} + + @locked + def add_payment_output(self, key: str, output: 'PartialTxOutput', fee_policy_descriptor: str): + batch = self._maybe_create_new_batch(key, fee_policy_descriptor) + batch.add_payment_output(output) + + @locked + def add_sweep_input(self, key: str, sweep_info: 'SweepInfo', fee_policy_descriptor: str): + if sweep_info.txin and sweep_info.txout: + # todo: don't use name, detect sighash + if sweep_info.name == 'first-stage-htlc': + if sweep_info.txin.prevout not in self._legacy_htlcs: + self.logger.info(f'received {sweep_info.name}') + self._legacy_htlcs[sweep_info.txin.prevout] = sweep_info + return + if not sweep_info.can_be_batched: + # create a batch only for that input + key = sweep_info.txin.prevout.to_str() + batch = self._maybe_create_new_batch(key, fee_policy_descriptor) + batch.add_sweep_input(sweep_info) + + def _maybe_create_new_batch(self, key, fee_policy_descriptor: str): + if key not in self.storage: + self.storage[key] = { 'fee_policy': fee_policy_descriptor, 'txids': [] } + self.tx_batches[key] = TxBatch(self.wallet, self.storage[key]) + elif self.storage[key]['fee_policy'] != fee_policy_descriptor: + # maybe update policy? + self.logger.warning('fee policy passed to txbatcher inconsistent with existing batch') + return self.tx_batches[key] + + def _delete_batch(self, key): + self.logger.info(f'deleting TxBatch {key}') + self.storage.pop(key) + self.tx_batches.pop(key) + + def find_batch_of_txid(self, txid) -> str: + for k, v in self.tx_batches.items(): + if v.is_mine(txid): + return k + + def is_mine(self, txid): + # used to prevent GUI from interfering + return bool(self.find_batch_of_txid(txid)) + + @log_exceptions + async def run(self): + while True: + await asyncio.sleep(self.SLEEP_INTERVAL) + password = self.wallet.get_unlocked_password() + if self.wallet.has_keystore_encryption() and not password: + continue + for key, txbatch in list(self.tx_batches.items()): + try: + await txbatch.run_iteration(password) + if txbatch.is_done(): + self._delete_batch(key) + except Exception as e: + self.logger.exception(f'TxBatch error: {repr(e)}') + self._delete_batch(key) + continue + for sweep_info in self._legacy_htlcs.values(): + await self._maybe_redeem_legacy_htlcs(sweep_info) + + async def _maybe_redeem_legacy_htlcs(self, sweep_info): + local_height = self.wallet.network.get_local_height() + wanted_height = sweep_info.cltv_abs + if wanted_height - local_height > 0: + return + # fixme: what if sweep info has a csv? + outpoint = sweep_info.txin.prevout.to_str() + prev_txid, index = outpoint.split(':') + if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): + tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) + if tx_mined_status.height > 0: + return + if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + return + self.logger.info(f'will broadcast standalone tx {sweep_info.name}') + tx = PartialTransaction.from_io([sweep_info.txin], [sweep_info.txout], locktime=sweep_info.cltv_abs, version=2) + self.wallet.sign_transaction(tx, password=None, ignore_warnings=True) + if await self.wallet.network.try_broadcasting(tx, sweep_info.name): + self.wallet.adb.add_transaction(tx) + + + +class TxBatch(Logger): + + def __init__(self, wallet, storage: StoredDict): + Logger.__init__(self) + self.wallet = wallet + self.lock = threading.RLock() + self.batch_payments = [] # list of payments we need to make + self.batch_inputs = {} # list of inputs we need to sweep + # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined. + self._batch_txids = storage['txids'] + self.fee_policy = FeePolicy(storage['fee_policy']) + self._base_tx = None # current batch tx. last element of batch_txids + if self._batch_txids: + last_txid = self._batch_txids[-1] + tx = self.wallet.adb.get_transaction(last_txid) + if tx: + tx = PartialTransaction.from_tx(tx) + tx.add_info_from_wallet(self.wallet) # this adds input amounts + self._base_tx = tx + self.logger.info(f'found base_tx {last_txid}') + + self._parent_tx = None + self._unconfirmed_sweeps = set() # list of inputs we are sweeping (until spending tx is confirmed) + + def is_mine(self, txid): + return txid in self._batch_txids + + @locked + def add_payment_output(self, output: 'PartialTxOutput'): + # todo: maybe we should raise NotEnoughFunds here + self.batch_payments.append(output) + + def is_dust(self, sweep_info): + if sweep_info.name in ['local_anchor', 'remote_anchor']: + return False + if sweep_info.txout is not None: + return False + value = sweep_info.txin._trusted_value_sats + witness_size = len(sweep_info.txin.make_witness(71*b'\x00')) + tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh + self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}') + fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.wallet.network, allow_fallback_to_static_rates=True) + return value - fee <= dust_threshold() + + @locked + def add_sweep_input(self, sweep_info: 'SweepInfo'): + if self.is_dust(sweep_info): + raise BelowDustLimit + txin = sweep_info.txin + if txin.prevout in self._unconfirmed_sweeps: + return + # early return if the spending tx is confirmed + # if its block is orphaned, the txin will be added again + prevout = txin.prevout.to_str() + prev_txid, index = prevout.split(':') + if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): + tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) + if tx_mined_status.height > 0: + return + self._unconfirmed_sweeps.add(txin.prevout) + self.logger.info(f'add_sweep_info: {sweep_info.name} {sweep_info.txin.prevout.to_str()}') + self.batch_inputs[txin.prevout] = sweep_info + + def get_base_tx(self) -> Optional[Transaction]: + return self._base_tx + + def _find_confirmed_base_tx(self) -> Optional[Transaction]: + for txid in self._batch_txids: + tx_mined_status = self.wallet.adb.get_tx_height(txid) + if tx_mined_status.conf > 0: + tx = self.wallet.adb.get_transaction(txid) + tx = PartialTransaction.from_tx(tx) + tx.add_info_from_wallet(self.wallet) # needed for txid + return tx + + @locked + def _to_pay_after(self, tx) -> Sequence[PartialTxOutput]: + if not tx: + return self.batch_payments + to_pay = [] + outputs = copy.deepcopy(tx.outputs()) + for x in self.batch_payments: + if x not in outputs: + to_pay.append(x) + else: + outputs.remove(x) + return to_pay + + @locked + def _to_sweep_after(self, tx) -> Dict[str, SweepInfo]: + tx_prevouts = set(txin.prevout for txin in tx.inputs()) if tx else set() + result = [] + for k,v in self.batch_inputs.items(): + prevout = v.txin.prevout + prev_txid, index = prevout.to_str().split(':') + if not self.wallet.adb.db.get_transaction(prev_txid): + continue + if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): + tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) + if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + continue + if prevout in tx_prevouts: + continue + result.append((k,v)) + return dict(result) + + def _should_bump_fee(self, base_tx) -> bool: + if base_tx is None: + return False + if not self.is_mine(base_tx.txid()): + return False + base_tx_fee = base_tx.get_fee() + recommended_fee = self.fee_policy.estimate_fee(base_tx.estimated_size(), network=self.wallet.network, allow_fallback_to_static_rates=True) + should_bump_fee = base_tx_fee * 1.1 < recommended_fee + if should_bump_fee: + self.logger.info(f'base tx fee too low {base_tx_fee} < {recommended_fee}. we will bump the fee') + return should_bump_fee + + def is_done(self): + # todo: require more than one confirmation + return len(self.batch_inputs) == 0 and len(self.batch_payments) == 0 and len(self._batch_txids) == 0 + + async def run_iteration(self, password): + conf_tx = self._find_confirmed_base_tx() + if conf_tx: + self.logger.info(f'base tx confirmed {conf_tx.txid()}') + self._clear_unconfirmed_sweeps(conf_tx) + self._start_new_batch(conf_tx) + + base_tx = self.get_base_tx() + # if base tx has been RBF-replaced, detect it here + try: + tx = self.create_next_transaction(base_tx, password) + except Exception as e: + if base_tx: + self.logger.exception(f'Cannot create batch transaction: {repr(e)}') + self._start_new_batch(base_tx) + return + else: + # will be caught by txBatcher + raise + + if tx is None: + # nothing to do + return + + if await self.wallet.network.try_broadcasting(tx, 'batch'): + self.wallet.adb.add_transaction(tx) + if tx.has_change(): + self._batch_txids.append(tx.txid()) + self._base_tx = tx + else: + self.logger.info(f'starting new batch because current base tx does not have change') + self._start_new_batch(tx) + else: + # most likely reason is that base_tx is not replaceable + # this may be the case if it has children (because we don't pay enough fees to replace them) + # or if we are trying to sweep unconfirmed inputs (replacement-adds-unconfirmed error) + self.logger.info(f'cannot broadcast tx {tx}') + if base_tx: + self.logger.info(f'starting new batch because could not broadcast') + self._start_new_batch(base_tx) + + + def create_next_transaction(self, base_tx, password): + to_pay = self._to_pay_after(base_tx) + to_sweep = self._to_sweep_after(base_tx) + to_sweep_now = {} + for k, v in to_sweep.items(): + can_broadcast, wanted_height = self._can_broadcast(v, base_tx) + if can_broadcast: + to_sweep_now[k] = v + else: + self.wallet.add_future_tx(v, wanted_height) + if not to_pay and not to_sweep_now and not self._should_bump_fee(base_tx): + return + while True: + tx = self._create_batch_tx(base_tx, to_sweep_now, to_pay, password) + # 100 kb max standardness rule + if tx.estimated_size() < 100_000: + break + to_sweep_now = to_sweep_now[0:len(to_sweep_now)//2] + to_pay = to_pay[0:len(to_pay)//2] + + self.logger.info(f'created tx with {len(tx.inputs())} inputs and {len(tx.outputs())} outputs') + self.logger.info(f'{str(tx)}') + return tx + + def _create_batch_tx(self, base_tx, to_sweep, to_pay, password): + self.logger.info(f'to_sweep: {list(to_sweep.keys())}') + self.logger.info(f'to_pay: {to_pay}') + inputs = [] + outputs = [] + locktime = base_tx.locktime if base_tx else None + # sort inputs so that txin-txout pairs are first + for sweep_info in sorted(to_sweep.values(), key=lambda x: not bool(x.txout)): + if sweep_info.cltv_abs is not None: + if locktime is None or locktime < sweep_info.cltv_abs: + # nLockTime must be greater than or equal to the stack operand. + locktime = sweep_info.cltv_abs + inputs.append(copy.deepcopy(sweep_info.txin)) + if sweep_info.txout: + outputs.append(sweep_info.txout) + self.logger.info(f'locktime: {locktime}') + outputs += to_pay + inputs += self._create_inputs_from_tx_change(self._parent_tx) if self._parent_tx else [] + # add sweep info base_tx inputs + if base_tx: + for txin in base_tx.inputs(): + if sweep_info := self.batch_inputs.get(txin.prevout): + if hasattr(txin, 'make_witness'): + txin.make_witness = sweep_info.txin.make_witness + txin.privkey = sweep_info.txin.privkey + txin.witness_script = sweep_info.txin.witness_script + txin.script_sig = sweep_info.txin.script_sig + # create tx + tx = self.wallet.create_transaction( + fee_policy=self.fee_policy, + base_tx=base_tx, + inputs=inputs, + outputs=outputs, + password=password, + locktime=locktime, + BIP69_sort=False, + merge_duplicate_outputs=False, + ) + # this assert will fail if we merge duplicate outputs + for o in outputs: assert o in tx.outputs() + assert tx.is_complete() + return tx + + def _clear_unconfirmed_sweeps(self, tx): + # this ensures that we can accept an input again, + # in case the sweeping tx has been removed from the blockchain after a reorg + for txin in tx.inputs(): + if txin.prevout in self._unconfirmed_sweeps: + self._unconfirmed_sweeps.remove(txin.prevout) + + @locked + def _start_new_batch(self, tx): + use_change = tx and tx.has_change() and any([txout in self.batch_payments for txout in tx.outputs()]) + self.batch_payments = self._to_pay_after(tx) + self.batch_inputs = self._to_sweep_after(tx) + self._batch_txids.clear() + self._base_tx = None + self._parent_tx = tx if use_change else None + + def _create_inputs_from_tx_change(self, parent_tx): + inputs = [] + for o in parent_tx.get_change_outputs(): + coins = self.wallet.adb.get_addr_utxo(o.address) + inputs += list(coins.values()) + for txin in inputs: + txin.nsequence = 0xffffffff - 2 + return inputs + + def _can_broadcast(self, sweep_info: 'SweepInfo', base_tx: 'Transaction'): + prevout = sweep_info.txin.prevout.to_str() + name = sweep_info.name + prev_txid, index = prevout.split(':') + can_broadcast = True + wanted_height = None + local_height = self.wallet.network.get_local_height() + if sweep_info.cltv_abs: + wanted_height = sweep_info.cltv_abs + if wanted_height - local_height > 0: + can_broadcast = False + prev_height = self.wallet.adb.get_tx_height(prev_txid).height + if sweep_info.csv_delay: + if prev_height > 0: + wanted_height = prev_height + sweep_info.csv_delay - 1 + if wanted_height - local_height > 0: + can_broadcast = False + else: + can_broadcast = False + if base_tx and prev_height <= 0: + # we cannot add unconfirmed inputs to existing base_tx (per RBF rules) + # thus, we will wait until the current batch is confirmed + can_broadcast = False + wanted_height = prev_height + return can_broadcast, wanted_height + diff --git a/electrum/wallet.py b/electrum/wallet.py index 8b3e92dd4..9b72948c2 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -33,7 +33,6 @@ import time import json import copy import errno -import traceback import operator import math from functools import partial @@ -60,7 +59,7 @@ from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup, ignore InvalidPassword, format_time, timestamp_to_datetime, Satoshis, Fiat, bfh, TxMinedInfo, quantize_feerate, OrderedDictWithIndex) from .simple_config import SimpleConfig -from .fee_policy import FeePolicy, FeeMethod, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE +from .fee_policy import FeePolicy, FixedFeePolicy, FeeMethod, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE from .bitcoin import COIN, TYPE_ADDRESS from .bitcoin import is_address, address_to_script, is_minikey, relayfee, dust_threshold from .bitcoin import DummyAddress, DummyAddressUsedInTxException @@ -90,12 +89,14 @@ from .util import EventListener, event_listener from . import descriptor from .descriptor import Descriptor from .util import OnchainHistoryItem, LightningHistoryItem +from .txbatcher import TxBatcher if TYPE_CHECKING: from .network import Network from .exchange_rate import FxThread from .submarine_swaps import SwapData from .lnchannel import AbstractChannel + from .lnsweep import SweepInfo _logger = get_logger(__name__) @@ -431,6 +432,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): self._freeze_lock = threading.RLock() # for mutating/iterating frozen_{addresses,coins} self.load_keystore() + self.txbatcher = TxBatcher(self) self._init_lnworker() self._init_requests_rhash_index() self._prepare_onchain_invoice_paid_detection() @@ -461,6 +463,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): async with self.taskgroup as group: await group.spawn(asyncio.Event().wait) # run forever (until cancel) await group.spawn(self.do_synchronize_loop()) + await group.spawn(self.txbatcher.run()) except Exception as e: self.logger.exception("taskgroup died.") finally: @@ -855,8 +858,8 @@ class Abstract_Wallet(ABC, Logger, EventListener): return True return False - def get_swap_by_claim_tx(self, tx: Transaction) -> Optional['SwapData']: - return self.lnworker.swap_manager.get_swap_by_claim_tx(tx) if self.lnworker else None + def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable['SwapData']: + return self.lnworker.swap_manager.get_swaps_by_claim_tx(tx) if self.lnworker else [] def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable['SwapData']: return self.lnworker.swap_manager.get_swaps_by_funding_tx(tx) if self.lnworker else [] @@ -905,7 +908,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): tx_wallet_delta = self.get_wallet_delta(tx) is_relevant = tx_wallet_delta.is_relevant is_any_input_ismine = tx_wallet_delta.is_any_input_ismine - is_swap = bool(self.get_swap_by_claim_tx(tx)) + is_swap = bool(self.get_swaps_by_claim_tx(tx)) fee = tx_wallet_delta.fee exp_n = None can_broadcast = False @@ -1736,9 +1739,12 @@ class Abstract_Wallet(ABC, Logger, EventListener): tx = self.db.get_transaction(hist_item.txid) if not tx: continue + txid = tx.txid() + # tx should not belong to tx batcher + if self.txbatcher.is_mine(txid): + continue # is_mine outputs should not be spent yet # to avoid cancelling our own dependent transactions - txid = tx.txid() if any([self.is_mine(o.address) and self.db.get_spent_outpoint(txid, output_idx) for output_idx, o in enumerate(tx.outputs())]): continue @@ -2096,6 +2102,8 @@ class Abstract_Wallet(ABC, Logger, EventListener): tx.remove_signatures() if not self.can_rbf_tx(tx): raise CannotBumpFee(_('Transaction is final')) + if self.txbatcher.is_mine(tx.txid()): + raise CannotBumpFee('Transaction managed by txbatcher') new_fee_rate = quantize_feerate(new_fee_rate) # strip excess precision tx.add_info_from_wallet(self) if tx.is_missing_info_from_network(): @@ -2320,6 +2328,8 @@ class Abstract_Wallet(ABC, Logger, EventListener): # do not mutate LN funding txs, as that would change their txid if not is_dscancel and self.is_lightning_funding_tx(tx.txid()): return False + if self.txbatcher.is_mine(tx.txid()): + return False return tx.is_rbf_enabled() def cpfp(self, tx: Transaction, fee: int) -> Optional[PartialTransaction]: @@ -2527,7 +2537,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): for k in self.get_keystores(): if k.can_sign_txin(txin): return True - if self.get_swap_by_claim_tx(tx): + if self.get_swaps_by_claim_tx(tx): return True return False @@ -3327,6 +3337,34 @@ class Abstract_Wallet(ABC, Logger, EventListener): return None return self.config.format_amount_and_units(frozen_bal) + def add_future_tx(self, sweep_info: 'SweepInfo', wanted_height: int): + """ add local tx to provide user feedback """ + txin = copy.deepcopy(sweep_info.txin) + prevout = txin.prevout.to_str() + prev_txid, index = prevout.split(':') + if txid := self.adb.db.get_spent_outpoint(prev_txid, int(index)): + # set future tx of existing spender because it is not persisted + # (and wanted_height can change if input of CSV was not mined before) + self.adb.set_future_tx(txid, wanted_height=wanted_height) + return + name = sweep_info.name + # outputs = [] will send coins to a change address + tx = self.create_transaction( + inputs=[txin], + outputs=[], + password=None, + fee_policy=FixedFeePolicy(0), + sign=False, + ) + try: + self.adb.add_transaction(tx) + except Exception as e: + self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}') + return + self.logger.info(f'added future tx: {name}. prevout: {prevout}') + util.trigger_callback('wallet_updated', self) + self.adb.set_future_tx(tx.txid(), wanted_height=wanted_height) + class Simple_Wallet(Abstract_Wallet): # wallet with a single keystore diff --git a/tests/regtest.py b/tests/regtest.py index 603737923..cb4215a3d 100644 --- a/tests/regtest.py +++ b/tests/regtest.py @@ -47,6 +47,8 @@ class TestUnixSockets(TestLightning): class TestLightningAB(TestLightning): agents = { 'alice': { + 'test_force_disable_mpp': 'false', + 'test_force_mpp': 'true', }, 'bob': { 'lightning_listen': 'localhost:9735', diff --git a/tests/regtest/regtest.sh b/tests/regtest/regtest.sh index bc455d980..d7b2688ac 100755 --- a/tests/regtest/regtest.sh +++ b/tests/regtest/regtest.sh @@ -252,6 +252,8 @@ if [[ $1 == "swapserver_forceclose" ]]; then new_blocks 1 wait_until_spent $funding_txid 0 # alice reveals preimage new_blocks 1 + sleep 2 + new_blocks 144 wait_for_balance bob 0.999 fi @@ -407,6 +409,7 @@ if [[ $1 == "breach_with_unspent_htlc" ]]; then fi echo "alice breaches with old ctx" $bitcoin_cli sendrawtransaction $ctx + new_blocks 1 wait_for_balance bob 1.14 fi diff --git a/tests/test_sswaps.py b/tests/test_sswaps.py deleted file mode 100644 index ecae7b6d6..000000000 --- a/tests/test_sswaps.py +++ /dev/null @@ -1,80 +0,0 @@ -from electrum import SimpleConfig -from electrum.util import bfh -from electrum.transaction import PartialTxInput, TxOutpoint -from electrum.submarine_swaps import SwapData, create_claim_tx -from electrum.fee_policy import FeePolicy - -from . import ElectrumTestCase - - -class TestSwapTxs(ElectrumTestCase): - TESTNET = True - - def setUp(self): - super().setUp() - self.maxDiff = None - self.config = SimpleConfig({'electrum_path': self.electrum_path}) - self.fee_policy = FeePolicy('feerate:1000') - - def test_claim_tx_for_successful_reverse_swap(self): - swap_data = SwapData( - is_reverse=True, - locktime=2420532, - onchain_amount=198694, - lightning_amount=200000, - redeem_script=bytes.fromhex('8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac'), - preimage=bytes.fromhex('f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a36'), - prepay_hash=None, - privkey=bytes.fromhex('58fd0018a9a2737d1d6b81d380df96bf0c858473a9592015508a270a7c9b1d8d'), - lockup_address='tb1q2pvugjl4w56rqw4c7zg0q6mmmev0t5jjy3qzg7sl766phh9fxjxsrtl77t', - receive_address='tb1ql0adrj58g88xgz375yct63rclhv29hv03u0mel', - funding_txid='897eea7f53e917323e7472d7a2e3099173f7836c57f1b6850f5cbdfe8085dbf9', - spending_txid=None, - is_redeemed=False, - ) - txin = PartialTxInput( - prevout=TxOutpoint(txid=bfh(swap_data.funding_txid), out_idx=0), - ) - txin._trusted_value_sats = swap_data.onchain_amount - tx = create_claim_tx( - txin=txin, - swap=swap_data, - fee_policy=self.fee_policy, - network=None, - ) - self.assertEqual( - "02000000000101f9db8580febd5c0f85b6f1576c83f7739109e3a2d772743e3217e9537fea7e890000000000fdffffff019007030000000000160014fbfad1ca8741ce640a3ea130bd4478fdd8a2dd8f03473044022025506044aba4939f4f2faa94710673ca65530a621f1fa538a3d046dc98bb685e02205f8d463dc6f81e1083f26fa963e581dabc80ea42f8cd59c9e31f3bf531168a9c0120f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a366a8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac00000000", - str(tx) - ) - - def test_claim_tx_for_timing_out_forward_swap(self): - swap_data = SwapData( - is_reverse=False, - locktime=2420537, - onchain_amount=130000, - lightning_amount=129014, - redeem_script=bytes.fromhex('a914b12bd886ef4fd9ef1c03e899123f2c4b96cec0878763210267ca676c2ed05bb6c380880f1e50b6ef91025dfa963dc49d6c5cb9848f2acf7d670339ef24b1752103d8190cdfcc7dd929a583b7ea8fa8eb1d8463195d336be2f2df94f950ce8b659968ac'), - preimage=bytes.fromhex('116f62c3283e4eb0b947a9cb672f1de7321d2c2373d12cd010500adffc32b1f2'), - prepay_hash=None, - privkey=bytes.fromhex('8d30dead21f5a7a6eeab7456a9a9d449511e942abef9302153cfff84e436614c'), - lockup_address='tb1qte2qwev6qvmrhsddac82tnskmjg02ntn73xqg2rjt0qx2xpz693sw2ljzg', - receive_address='tb1qj76twx886pkfcs7d808n0yzsgxm33wqlwe0dt0', - funding_txid='08ecdcb19ab38fc1288c97da546b8c90549be2348ef306f476dcf6e505158706', - spending_txid=None, - is_redeemed=False, - ) - txin = PartialTxInput( - prevout=TxOutpoint(txid=bfh(swap_data.funding_txid), out_idx=0), - ) - txin._trusted_value_sats = swap_data.onchain_amount - tx = create_claim_tx( - txin=txin, - swap=swap_data, - fee_policy=self.fee_policy, - network=None, - ) - self.assertEqual( - "0200000000010106871505e5f6dc76f406f38e34e29b54908c6b54da978c28c18fb39ab1dcec080000000000fdffffff013afb01000000000016001497b4b718e7d06c9c43cd3bcf37905041b718b81f0347304402200ae708af1393f785c541bbc4d7351791b76a53077a292b71cb2a25ad13a15f9902206b7b91c414ec0d6e5098a1acc26de4b47f3aac414b7a49741e8f27cc6a967a19010065a914b12bd886ef4fd9ef1c03e899123f2c4b96cec0878763210267ca676c2ed05bb6c380880f1e50b6ef91025dfa963dc49d6c5cb9848f2acf7d670339ef24b1752103d8190cdfcc7dd929a583b7ea8fa8eb1d8463195d336be2f2df94f950ce8b659968ac39ef2400", - str(tx) - ) - diff --git a/tests/test_txbatcher.py b/tests/test_txbatcher.py new file mode 100644 index 000000000..1d127aed1 --- /dev/null +++ b/tests/test_txbatcher.py @@ -0,0 +1,216 @@ +import unittest +import logging +from unittest import mock +import asyncio + +from electrum import storage, bitcoin, keystore, wallet +from electrum import Transaction +from electrum import SimpleConfig +from electrum import util +from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_LOCAL +from electrum.transaction import Transaction, PartialTxInput, PartialTxOutput, TxOutpoint +from electrum.logging import console_stderr_handler, Logger +from electrum.submarine_swaps import SwapManager, SwapData +from electrum.lnsweep import SweepInfo +from electrum.fee_policy import FeeTimeEstimates + +from . import ElectrumTestCase +from .test_wallet_vertical import WalletIntegrityHelper + +class MockNetwork(Logger): + + def __init__(self, config): + self.config = config + self.fee_estimates = FeeTimeEstimates() + self.asyncio_loop = util.get_asyncio_loop() + self.interface = None + self.relay_fee = 1000 + self.wallets = [] + self._tx_event = asyncio.Event() + + def get_local_height(self): + return 42 + + def blockchain(self): + class BlockchainMock: + def is_tip_stale(self): + return True + return BlockchainMock() + + async def try_broadcasting(self, tx, name): + for w in self.wallets: + w.adb.receive_tx_callback(tx, TX_HEIGHT_UNCONFIRMED) + + self._tx_event.set() + self._tx = tx + self._tx_event.clear() + return tx.txid() + + async def next_tx(self): + await self._tx_event.wait() + return self._tx + + +WALLET_SEED = 'cause carbon luggage air humble mistake melt paper supreme sense gravity void' +FUNDING_TX = '020000000001021798e10f8b7220c57ea0d605316a52453ca9b3eed99996b5b7bdf4699548bb520000000000fdffffff277d82678d238ca45dd3490ac9fbb49272f0980b093b9197ff70ec8eb082cfb00100000000fdffffff028c360100000000001600147a9bfd90821be827275023849dd91ee80d494957a08601000000000016001476efaaa243327bf3a2c0f5380cb3914099448cec024730440220354b2a74f5ac039cca3618f7ff98229d243b89ac40550c8b027894f2c5cb88ff022064cb5ab1539b4c5367c2e01a8362e0aa12c2732bc8d08c3fce6eab9e56b7fe19012103e0a1499cb3d8047492c60466722c435dfbcffae8da9b83e758fbd203d12728f502473044022073cef8b0cfb093aed5b8eaacbb58c2fa6a69405a8e266cd65e76b726c9151d7602204d5820b23ab96acc57c272aac96d94740a20a6b89c016aa5aed7c06d1e6b9100012102f09e50a265c6a0dcf7c87153ea73d7b12a0fbe9d7d0bbec5db626b2402c1e85c02fa2400' +SWAP_FUNDING_TX = "01000000000101500e9d67647481864edfb020b5c45e1c40d90f06b0130f9faed1a5149c6d26450000000000ffffffff0226080300000000002200205059c44bf57534303ab8f090f06b7bde58f5d2522440247a1ff6b41bdca9348df312c20100000000160014021d4f3b17921d790e1c022367a5bb078ce4deb402483045022100d41331089a2031396a1db8e4dec6dda9cacefe1288644b92f8e08a23325aa19b02204159230691601f7d726e4e6e0b7124d3377620f400d699a01095f0b0a09ee26a012102d60315c72c0cefd41c6d07883c20b88be3fc37aac7912f0052722a95de0de71600000000" +SWAP_CLAIM_TX = "02000000000101f9db8580febd5c0f85b6f1576c83f7739109e3a2d772743e3217e9537fea7e890000000000fdffffff017005030000000000160014b113a47f3718da3fd161339a6681c150fef2cfe30347304402206736066ce15d34eed20951a9d974a100a72dc034f9c878769ddf27f9a584dcb1022042b14d627b8e8465a3a129bb43c0bd8369f49bbcf473879b9a477263655f1f930120f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a366a8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac00000000" + + +class TestTxBatcher(ElectrumTestCase): + + TESTNET = True + + @classmethod + def setUpClass(cls): + super().setUpClass() + console_stderr_handler.setLevel(logging.DEBUG) + + def setUp(self): + super().setUp() + self.config = SimpleConfig({'electrum_path': self.electrum_path}) + self.fee_policy_descriptor = 'feerate:5000' + + async def asyncSetUp(self): + await super().asyncSetUp() + self.network = MockNetwork(self.config) + + def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2): + if config is None: + config = self.config + ks = keystore.from_seed(seed_words, passphrase='', for_multisig=False) + return WalletIntegrityHelper.create_standard_wallet(ks, gap_limit=gap_limit, config=config) + + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') + async def test_batch_payments(self, mock_save_db): + # output 1: tx1(o1) --------------- + # \ + # output 2: tx1'(o1,o2) ----> tx2(tx1|o2) + # + # tx1 is broadcast, and replaced by tx1' + # tx1 gets mined + # txbatcher creates a new transaction tx2, child of tx1 + # + OUTGOING_ADDRESS = 'tb1q7rl9cxr85962ztnsze089zs8ycv52hk43f3m9n' + # create wallet + wallet = self.create_standard_wallet_from_seed(WALLET_SEED) + wallet.start_network(self.network) + wallet.txbatcher.SLEEP_INTERVAL = 0.01 + self.network.wallets.append(wallet) + # fund wallet + funding_tx = Transaction(FUNDING_TX) + await self.network.try_broadcasting(funding_tx, 'funding') + assert wallet.adb.get_transaction(funding_tx.txid()) is not None + self.logger.info(f'wallet balance {wallet.get_balance()}') + # payment 1 -> tx1(output1) + output1 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 10_000) + wallet.txbatcher.add_payment_output('default', output1, self.fee_policy_descriptor) + tx1 = await self.network.next_tx() + assert output1 in tx1.outputs() + # payment 2 -> tx2(output1, output2) + output2 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 20_000) + wallet.txbatcher.add_payment_output('default', output2, self.fee_policy_descriptor) + tx1_prime = await self.network.next_tx() + assert wallet.adb.get_transaction(tx1_prime.txid()) is not None + assert len(tx1_prime.outputs()) == 3 + assert output1 in tx1_prime.outputs() + assert output2 in tx1_prime.outputs() + # tx1 gets confirmed, tx2 gets removed + wallet.adb.receive_tx_callback(tx1, 1) + tx_mined_status = wallet.adb.get_tx_height(tx1.txid()) + wallet.adb.add_verified_tx(tx1.txid(), tx_mined_status._replace(conf=1)) + assert wallet.adb.get_transaction(tx1.txid()) is not None + assert wallet.adb.get_transaction(tx1_prime.txid()) is None + # txbatcher creates tx2 + tx2 = await self.network.next_tx() + assert output1 in tx1.outputs() + assert output2 in tx2.outputs() + # check that tx2 is child of tx1 + assert len(tx2.inputs()) == 1 + assert tx2.inputs()[0].prevout.txid.hex() == tx1.txid() + + + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') + async def test_rbf_batching__cannot_batch_as_would_need_to_use_ismine_outputs_of_basetx(self, mock_save_db): + """Wallet history contains unconf tx1 that spends all its coins to two ismine outputs, + one 'recv' address (20k sats) and one 'change' (80k sats). + The user tries to create tx2, that pays an invoice for 90k sats. + The tx batcher fails to batch, and should create a child transaction + """ + wallet = self.create_standard_wallet_from_seed(WALLET_SEED) + wallet.start_network(self.network) + wallet.txbatcher.SLEEP_INTERVAL = 0.01 + wallet.txbatcher.RETRY_DELAY = 0.60 + self.network.wallets.append(wallet) + + # fund wallet + funding_tx = Transaction(FUNDING_TX) + await self.network.try_broadcasting(funding_tx, 'funding') + assert wallet.adb.get_transaction(funding_tx.txid()) is not None + self.logger.info(f'wallet balance1 {wallet.get_balance()}') + + # to_self_payment tx1 + output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000) + wallet.txbatcher.add_payment_output('default', output1, self.fee_policy_descriptor) + toself_tx = await self.network.next_tx() + assert len(toself_tx.outputs()) == 2 + assert output1 in toself_tx.outputs() + + # outgoing payment tx2 + output2 = PartialTxOutput.from_address_and_value("tb1qkfn0fude7z789uys2u7sf80kd4805zpvs3na0h", 90_000) + wallet.txbatcher.add_payment_output('default', output2, self.fee_policy_descriptor) + tx2 = await self.network.next_tx() + assert len(tx2.outputs()) == 2 + assert output2 in tx2.outputs() + + + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') + async def test_sweep_from_submarine_swap(self, mock_save_db): + self.maxDiff = None + # create wallet + wallet = self.create_standard_wallet_from_seed(WALLET_SEED) + wallet.start_network(self.network) + wallet.txbatcher.SLEEP_INTERVAL = 0.01 + self.network.wallets.append(wallet) + # add swap data + swap_data = SwapData( + is_reverse=True, + locktime=2420532, + onchain_amount=198694, + lightning_amount=200000, + redeem_script=bytes.fromhex('8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac'), + preimage=bytes.fromhex('f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a36'), + prepay_hash=None, + privkey=bytes.fromhex('58fd0018a9a2737d1d6b81d380df96bf0c858473a9592015508a270a7c9b1d8d'), + lockup_address='tb1q2pvugjl4w56rqw4c7zg0q6mmmev0t5jjy3qzg7sl766phh9fxjxsrtl77t', + receive_address='tb1ql0adrj58g88xgz375yct63rclhv29hv03u0mel', + funding_txid='897eea7f53e917323e7472d7a2e3099173f7836c57f1b6850f5cbdfe8085dbf9', + spending_txid=None, + is_redeemed=False, + ) + wallet.adb.db.transactions[swap_data.funding_txid] = Transaction(SWAP_FUNDING_TX) + txin = PartialTxInput( + prevout=TxOutpoint(txid=bytes.fromhex(swap_data.funding_txid), out_idx=0), + ) + txin._trusted_value_sats = swap_data.onchain_amount + txin, locktime = SwapManager.create_claim_txin(txin=txin, swap=swap_data) + sweep_info = SweepInfo( + txin=txin, + csv_delay=0, + cltv_abs=locktime, + txout=None, + name='swap claim', + can_be_batched=True, + ) + wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.fee_policy_descriptor) + tx = await self.network.next_tx() + txid = tx.txid() + self.assertEqual(SWAP_CLAIM_TX, str(tx)) + # add a new payment, reusing the same input + # this tests that txin.make_witness() can be called more than once + output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000) + wallet.txbatcher.add_payment_output('swaps', output1, self.fee_policy_descriptor) + new_tx = await self.network.next_tx() + # check that we batched with previous tx + assert new_tx.inputs()[0].prevout == tx.inputs()[0].prevout == txin.prevout + assert output1 in new_tx.outputs()