diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 92a9a36a2..4ce831971 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -461,6 +461,11 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener): def on_event_banner(self, *args): self.console.showMessage(args[0]) + @qt_event_listener + def on_event_adb_set_future_tx(self, adb, txid): + if adb == self.wallet.adb: + self.history_model.refresh('set_future_tx') + @qt_event_listener def on_event_verified(self, *args): wallet, tx_hash, tx_mined_status = args diff --git a/electrum/lnsweep.py b/electrum/lnsweep.py index c0e3229b2..722434ad5 100644 --- a/electrum/lnsweep.py +++ b/electrum/lnsweep.py @@ -43,6 +43,8 @@ class SweepInfo(NamedTuple): cltv_abs: Optional[int] # set to None only if the script has no cltv txin: PartialTxInput txout: Optional[PartialTxOutput] # only for first-stage htlc tx + can_be_batched: bool # todo: this could be more fine-grained + def sweep_their_ctx_watchtower( chan: 'Channel', @@ -251,7 +253,8 @@ def sweep_their_htlctx_justice( csv_delay=0, cltv_abs=None, txin=txin, - txout=None + txout=None, + can_be_batched=False, ) return index_to_sweepinfo @@ -329,6 +332,7 @@ def sweep_our_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # to_local @@ -350,6 +354,7 @@ def sweep_our_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) we_breached = ctn < chan.get_oldest_unrevoked_ctn(LOCAL) if we_breached: @@ -384,7 +389,11 @@ def sweep_our_ctx( csv_delay=0, cltv_abs=htlc_tx.locktime, txin=htlc_tx.inputs()[0], - txout=htlc_tx.outputs()[0]) + txout=htlc_tx.outputs()[0], + can_be_batched=False, # both parties can spend + # actually, we might want to batch depending on the context + # f(amount in htlc, remaining_time, number of available utxos for anchors) + ) else: # second-stage address = bitcoin.script_to_p2wsh(htlctx_witness_script) @@ -404,6 +413,9 @@ def sweep_our_ctx( cltv_abs=0, txin=sweep_txin, txout=None, + # this is safe to batch, we are the only ones who can spend + # (assuming we did not broadcast a revoked state) + can_be_batched=True, ) # offered HTLCs, in our ctx --> "timeout" @@ -541,6 +553,7 @@ def sweep_their_ctx_to_remote_backup( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # to_remote @@ -562,6 +575,7 @@ def sweep_their_ctx_to_remote_backup( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) return txs @@ -619,6 +633,7 @@ def sweep_their_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # to_local is handled by lnwatcher @@ -631,6 +646,7 @@ def sweep_their_ctx( cltv_abs=None, txin=txin, txout=None, + can_be_batched=False, ) # to_remote @@ -656,12 +672,14 @@ def sweep_their_ctx( our_payment_privkey=our_payment_privkey, has_anchors=chan.has_anchors() ): + # todo: we might not want to sweep this at all, if we add it to the wallet addresses txs[prevout] = SweepInfo( name='their_ctx_to_remote', csv_delay=csv_delay, cltv_abs=None, txin=txin, txout=None, + can_be_batched=True, ) # HTLCs @@ -701,6 +719,7 @@ def sweep_their_ctx( cltv_abs=cltv_abs, txin=txin, txout=None, + can_be_batched=False, ) # received HTLCs, in their ctx --> "timeout" # offered HTLCs, in their ctx --> "success" diff --git a/electrum/lnwatcher.py b/electrum/lnwatcher.py index 70acd7692..086000362 100644 --- a/electrum/lnwatcher.py +++ b/electrum/lnwatcher.py @@ -2,20 +2,14 @@ # Distributed under the MIT software license, see the accompanying # file LICENCE or http://www.opensource.org/licenses/mit-license.php -from typing import NamedTuple, Iterable, TYPE_CHECKING -import copy -import asyncio +from typing import TYPE_CHECKING from enum import IntEnum, auto -from typing import NamedTuple, Dict -from . import util -from .util import log_exceptions, ignore_exceptions, TxMinedInfo +from .util import log_exceptions, ignore_exceptions, TxMinedInfo, BelowDustLimit from .util import EventListener, event_listener from .address_synchronizer import AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_FUTURE -from .transaction import Transaction, TxOutpoint, PartialTransaction +from .transaction import Transaction, TxOutpoint from .logging import Logger -from .bitcoin import dust_threshold -from .fee_policy import FeePolicy if TYPE_CHECKING: @@ -46,7 +40,6 @@ class LNWatcher(Logger, EventListener): self.register_callbacks() # status gets populated when we run self.channel_status = {} - self.fee_policy = FeePolicy('eta:2') async def stop(self): self.unregister_callbacks() @@ -75,6 +68,13 @@ class LNWatcher(Logger, EventListener): async def on_event_blockchain_updated(self, *args): await self.trigger_callbacks() + @event_listener + async def on_event_wallet_updated(self, wallet): + # called if we add local tx + if wallet.adb != self.adb: + return + await self.trigger_callbacks() + @event_listener async def on_event_adb_added_verified_tx(self, adb, tx_hash): if adb != self.adb: @@ -141,6 +141,10 @@ class LNWatcher(Logger, EventListener): """ prev_txid, index = outpoint.split(':') spender_txid = self.adb.db.get_spent_outpoint(prev_txid, int(index)) + # discard local spenders + tx_mined_status = self.adb.get_tx_height(spender_txid) + if tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + spender_txid = None if not spender_txid: return spender_tx = self.adb.get_transaction(spender_txid) @@ -211,18 +215,6 @@ class LNWalletWatcher(LNWatcher): keep_watching=keep_watching) await self.lnworker.handle_onchain_state(chan) - def is_dust(self, sweep_info): - if sweep_info.name in ['local_anchor', 'remote_anchor']: - return False - if sweep_info.txout is not None: - return False - value = sweep_info.txin._trusted_value_sats - witness_size = len(sweep_info.txin.make_witness(71*b'\x00')) - tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh - self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}') - fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.network, allow_fallback_to_static_rates=True) - return value - fee <= dust_threshold() - @log_exceptions async def sweep_commitment_transaction(self, funding_outpoint, closing_tx) -> bool: """This function is called when a channel was closed. In this case @@ -235,19 +227,16 @@ class LNWalletWatcher(LNWatcher): return False # detect who closed and get information about how to claim outputs sweep_info_dict = chan.sweep_ctx(closing_tx) - self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}") + #self.logger.info(f"do_breach_remedy: {[x.name for x in sweep_info_dict.values()]}") keep_watching = False if sweep_info_dict else not self.is_deeply_mined(closing_tx.txid()) - # create and broadcast transactions for prevout, sweep_info in sweep_info_dict.items(): - if self.is_dust(sweep_info): - continue prev_txid, prev_index = prevout.split(':') name = sweep_info.name + ' ' + chan.get_id_for_log() self.lnworker.wallet.set_default_label(prevout, name) if not self.adb.get_transaction(prev_txid): # do not keep watching if prevout does not exist - self.logger.info(f'prevout does not exist for {name}: {prev_txid}') + self.logger.info(f'prevout does not exist for {name}: {prevout}') continue spender_txid = self.get_spender(prevout) spender_tx = self.adb.get_transaction(spender_txid) if spender_txid else None @@ -260,115 +249,20 @@ class LNWalletWatcher(LNWatcher): if htlc_tx_spender: keep_watching |= not self.is_deeply_mined(htlc_tx_spender) else: - keep_watching = True - await self.maybe_redeem(prevout2, htlc_sweep_info, name) + keep_watching |= self.maybe_redeem(htlc_sweep_info) keep_watching |= not self.is_deeply_mined(spender_txid) self.maybe_extract_preimage(chan, spender_tx, prevout) else: - keep_watching = True - # broadcast or maybe update our own tx - await self.maybe_redeem(prevout, sweep_info, name) - + keep_watching |= self.maybe_redeem(sweep_info) return keep_watching - def get_redeem_tx(self, prevout: str, sweep_info: 'SweepInfo', name: str): - # check if redeem tx needs to be updated - # if it is in the mempool, we need to check fee rise - txid = self.get_spender(prevout) - old_tx = self.adb.get_transaction(txid) - assert old_tx is not None or txid is None - tx_depth = self.get_tx_mined_depth(txid) if txid else None - if txid and tx_depth not in [TxMinedDepth.FREE, TxMinedDepth.MEMPOOL]: - assert old_tx is not None - return old_tx, None - # fixme: deepcopy is needed because tx.serialize() is destructive - inputs = [copy.deepcopy(sweep_info.txin)] - outputs = [sweep_info.txout] if sweep_info.txout else [] - if sweep_info.name == 'first-stage-htlc': - new_tx = PartialTransaction.from_io(inputs, outputs, locktime=sweep_info.cltv_abs, version=2) - self.lnworker.wallet.sign_transaction(new_tx, password=None, ignore_warnings=True) - else: - # password is needed for 1st stage htlc tx with anchors because we add inputs - password = self.lnworker.wallet.get_unlocked_password() - new_tx = self.lnworker.wallet.create_transaction( - fee_policy = self.fee_policy, - inputs = inputs, - outputs = outputs, - password = password, - locktime = sweep_info.cltv_abs, - BIP69_sort=False, - ) - if new_tx is None: - self.logger.info(f'{name} could not claim output: {prevout}, dust') - assert old_tx is not None - return old_tx, None - if txid is None: - return None, new_tx - elif tx_depth == TxMinedDepth.MEMPOOL: - delta = new_tx.get_fee() - self.adb.get_tx_fee(txid) - if delta > 1: - self.logger.info(f'increasing fee of mempool tx {name}: {prevout}') - return old_tx, new_tx - else: - assert old_tx is not None - return old_tx, None - elif tx_depth == TxMinedDepth.FREE: - # return new tx, even if it is equal to old_tx, - # because we need to test if it can be broadcast - return old_tx, new_tx - else: - assert old_tx is not None - return old_tx, None - - async def maybe_redeem(self, prevout, sweep_info: 'SweepInfo', name: str) -> None: - old_tx, new_tx = self.get_redeem_tx(prevout, sweep_info, name) - if new_tx is None: - return - prev_txid, prev_index = prevout.split(':') - can_broadcast = True - local_height = self.network.get_local_height() - if sweep_info.cltv_abs: - wanted_height = sweep_info.cltv_abs - if wanted_height - local_height > 0: - can_broadcast = False - # self.logger.debug(f"pending redeem for {prevout}. waiting for {name}: CLTV ({local_height=}, {wanted_height=})") - if sweep_info.csv_delay: - prev_height = self.adb.get_tx_height(prev_txid) - if prev_height.height > 0: - wanted_height = prev_height.height + sweep_info.csv_delay - 1 - else: - wanted_height = local_height + sweep_info.csv_delay - if wanted_height - local_height > 0: - can_broadcast = False - # self.logger.debug( - # f"pending redeem for {prevout}. waiting for {name}: CSV " - # f"({local_height=}, {wanted_height=}, {prev_height.height=}, {sweep_info.csv_delay=})") - if can_broadcast: - self.logger.info(f'we can broadcast: {name}') - if await self.network.try_broadcasting(new_tx, name): - tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None)) - else: - tx_was_added = False - else: - # we may have a tx with a different fee, in which case it will be replaced - if not old_tx or (old_tx and old_tx.txid() != new_tx.txid()): - try: - tx_was_added = self.adb.add_transaction(new_tx, is_new=(old_tx is None)) - except Exception as e: - self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}') - tx_was_added = False - if tx_was_added: - self.logger.info(f'added redeem tx: {name}. prevout: {prevout}') - else: - tx_was_added = False - # set future tx regardless of tx_was_added, because it is not persisted - # (and wanted_height can change if input of CSV was not mined before) - self.adb.set_future_tx(new_tx.txid(), wanted_height=wanted_height) - if tx_was_added: - self.lnworker.wallet.set_label(new_tx.txid(), name) - if old_tx and old_tx.txid() != new_tx.txid(): - self.lnworker.wallet.set_label(old_tx.txid(), None) - util.trigger_callback('wallet_updated', self.lnworker.wallet) + def maybe_redeem(self, sweep_info: 'SweepInfo') -> bool: + """ returns False if it was dust """ + try: + self.lnworker.wallet.txbatcher.add_sweep_input('lnwatcher', sweep_info, self.config.FEE_POLICY_LIGHTNING) + except BelowDustLimit: + return False + return True def maybe_extract_preimage(self, chan: 'AbstractChannel', spender_tx: Transaction, prevout: str): txin_idx = spender_tx.get_input_idx_that_spent_prevout(TxOutpoint.from_str(prevout)) diff --git a/electrum/simple_config.py b/electrum/simple_config.py index ad2036715..66d7d55c8 100644 --- a/electrum/simple_config.py +++ b/electrum/simple_config.py @@ -675,7 +675,9 @@ Warning: setting this to too low will result in lots of payment failures."""), TEST_SHUTDOWN_FEE_RANGE = ConfigVar('test_shutdown_fee_range', default=None) TEST_SHUTDOWN_LEGACY = ConfigVar('test_shutdown_legacy', default=False, type_=bool) - FEE_POLICY = ConfigVar('fee_policy', default='eta:2', type_=str) + FEE_POLICY = ConfigVar('fee_policy', default='eta:2', type_=str) # exposed to GUI + FEE_POLICY_LIGHTNING = ConfigVar('fee_policy_lightning', default='eta:2', type_=str) # for txbatcher (sweeping) + FEE_POLICY_SWAPS = ConfigVar('fee_policy_swaps', default='eta:2', type_=str) # for txbatcher (sweeping and sending if we are a swapserver) RPC_USERNAME = ConfigVar('rpcuser', default=None, type_=str) RPC_PASSWORD = ConfigVar('rpcpassword', default=None, type_=str) diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index fab40f8c0..cf00bcd7d 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -35,7 +35,7 @@ from .lnutil import hex_to_bytes from .lnaddr import lndecode from .json_db import StoredObject, stored_in from . import constants -from .address_synchronizer import TX_HEIGHT_LOCAL +from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE from .i18n import _ from .fee_policy import FeePolicy @@ -44,6 +44,7 @@ from .crypto import ripemd from .invoices import Invoice from .network import TxBroadcastError from .lnonion import OnionRoutingFailure, OnionFailureCode +from .lnsweep import SweepInfo if TYPE_CHECKING: @@ -154,38 +155,14 @@ class SwapData(StoredObject): _funding_prevout = None # type: Optional[TxOutpoint] # for RBF _payment_hash = None _zeroconf = False + _payment_pending = False # for forward swaps @property def payment_hash(self) -> bytes: return self._payment_hash def is_funded(self) -> bool: - return self.funding_txid is not None - - -def create_claim_tx( - *, - txin: PartialTxInput, - swap: SwapData, - network: 'Network', - fee_policy: FeePolicy, -) -> PartialTransaction: - """Create tx to either claim successful reverse-swap, - or to get refunded for timed-out forward-swap. - """ - # FIXME the mining fee should depend on swap.is_reverse. - # the txs are not the same size... - amount_sat = txin.value_sats() - SwapManager._get_fee(size=SWAP_TX_SIZE, fee_policy=fee_policy, network=network) - if amount_sat < dust_threshold(): - raise BelowDustLimit() - txin, locktime = SwapManager.create_claim_txin(txin=txin, swap=swap) - txout = PartialTxOutput.from_address_and_value(swap.receive_address, amount_sat) - tx = PartialTransaction.from_io([txin], [txout], version=2, locktime=locktime) - sig = tx.sign_txin(0, txin.privkey) - txin.script_sig = b'' - txin.witness = txin.make_witness(sig) - assert tx.is_complete() - return tx + return self._payment_pending or bool(self.funding_txid) class SwapManager(Logger): @@ -202,7 +179,6 @@ class SwapManager(Logger): self.wallet = wallet self.config = wallet.config - self.fee_policy = FeePolicy(wallet.config.FEE_POLICY) self.lnworker = lnworker self.config = wallet.config self.taskgroup = OldTaskGroup() @@ -371,55 +347,35 @@ class SwapManager(Logger): self._add_or_reindex_swap(swap) # to update _swaps_by_funding_outpoint funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex()) spent_height = txin.spent_height - should_bump_fee = False + # set spending_txid (even if tx is local), for GUI grouping + swap.spending_txid = txin.spent_txid + # discard local spenders + if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + spent_height = None if spent_height is not None: - swap.spending_txid = txin.spent_txid if spent_height > 0: if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY: self.logger.info(f'stop watching swap {swap.lockup_address}') self.lnwatcher.remove_callback(swap.lockup_address) swap.is_redeemed = True - elif spent_height == TX_HEIGHT_LOCAL: - if funding_height.conf > 0 or (swap.is_reverse and swap._zeroconf): - tx = self.lnwatcher.adb.get_transaction(txin.spent_txid) - try: - await self.network.broadcast_transaction(tx) - except TxBroadcastError: - self.logger.info(f'error broadcasting claim tx {txin.spent_txid}') - elif funding_height.height == TX_HEIGHT_LOCAL: - # the funding tx was double spent. - # this will remove both funding and child (spending tx) from adb - self.lnwatcher.adb.remove_transaction(swap.funding_txid) - swap.funding_txid = None - swap.spending_txid = None - else: - # spending tx is in mempool - pass if not swap.is_reverse: if swap.preimage is None and spent_height is not None: # extract the preimage, add it to lnwatcher claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid) - preimage = claim_tx.inputs()[0].witness_elements()[1] - if sha256(preimage) == swap.payment_hash: - swap.preimage = preimage - self.logger.info(f'found preimage: {preimage.hex()}') - self.lnworker.preimages[swap.payment_hash.hex()] = preimage.hex() - # note: we must check the payment secret before we broadcast the funding tx + for txin in claim_tx.inputs(): + preimage = txin.witness_elements()[1] + if sha256(preimage) == swap.payment_hash: + swap.preimage = preimage + self.logger.info(f'found preimage: {preimage.hex()}') + self.lnworker.preimages[swap.payment_hash.hex()] = preimage.hex() + break else: # this is our refund tx if spent_height > 0: self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}') self._fail_swap(swap, 'refund tx confirmed') return - else: - claim_tx.add_info_from_wallet(self.wallet) - claim_tx_fee = claim_tx.get_fee() - recommended_fee = self.get_swap_tx_fee() - if claim_tx_fee * 1.1 < recommended_fee: - should_bump_fee = True - self.logger.info(f'claim tx fee too low {claim_tx_fee} < {recommended_fee}. we will bump the fee') - if remaining_time > 0: # too early for refund return @@ -447,32 +403,36 @@ class SwapManager(Logger): # for testing: do not create claim tx return - if spent_height is not None and not should_bump_fee: + if spent_height is not None and spent_height > 0: return + txin, locktime = self.create_claim_txin(txin=txin, swap=swap) + # note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation + csv = 1 if (swap.is_reverse and not swap._zeroconf) else 0 + name = 'swap claim' if swap.is_reverse else 'swap refund' + can_be_batched = bool(csv) if swap.is_reverse else True + sweep_info = SweepInfo( + txin=txin, + csv_delay=csv, + cltv_abs=locktime, + txout=None, + name=name, + can_be_batched=can_be_batched, + ) try: - tx = create_claim_tx(txin=txin, swap=swap, fee_policy=self.fee_policy, network=self.network) + self.wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.config.FEE_POLICY_SWAPS) except BelowDustLimit: self.logger.info('utxo value below dust threshold') return - self.logger.info(f'adding claim tx {tx.txid()}') - self.wallet.adb.add_transaction(tx) - swap.spending_txid = tx.txid() - if funding_height.conf > 0 or (swap.is_reverse and swap._zeroconf): - try: - await self.network.broadcast_transaction(tx) - except TxBroadcastError: - self.logger.info(f'error broadcasting claim tx {txin.spent_txid}') def get_swap_tx_fee(self): - return self.get_fee(SWAP_TX_SIZE) + return self._get_tx_fee(self.config.FEE_POLICY) - def get_fee(self, size): - # note: 'size' is in vbytes - return self._get_fee(size=size, fee_policy=self.fee_policy, network=self.network) + def get_fee_for_txbatcher(self): + return self._get_tx_fee(self.config.FEE_POLICY_SWAPS) - @classmethod - def _get_fee(cls, *, size, fee_policy: FeePolicy, network: 'Network'): - return fee_policy.estimate_fee(size, network=network, allow_fallback_to_static_rates=True) + def _get_tx_fee(self, policy_descriptor: str): + fee_policy = FeePolicy(policy_descriptor) + return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True) def get_swap(self, payment_hash: bytes) -> Optional[SwapData]: # for history @@ -493,19 +453,11 @@ class SwapManager(Logger): if key in self.swaps: swap = self.swaps[key] if not swap.is_funded(): - password = self.wallet.get_unlocked_password() - for batch_rbf in [False]: - # FIXME: tx batching is disabled, because extra logic is needed to handle - # the case where the base tx gets mined. - tx = self.create_funding_tx(swap, None, password=password) - self.logger.info(f'adding funding_tx {tx.txid()}') - self.wallet.adb.add_transaction(tx) - try: - await self.broadcast_funding_tx(swap, tx) - except TxBroadcastError: - self.wallet.adb.remove_transaction(tx.txid()) - continue - break + output = self.create_funding_output(swap) + self.wallet.txbatcher.add_payment_output('swaps', output, self.config.FEE_POLICY_SWAPS) + swap._payment_pending = True + else: + self.logger.info(f'key not in swaps {key}') def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None): """ server method """ @@ -544,7 +496,7 @@ class SwapManager(Logger): ) -> Tuple[SwapData, str, Optional[str]]: """creates a hold invoice""" if prepay: - prepay_amount_sat = self.get_swap_tx_fee() * 2 + prepay_amount_sat = self.get_fee_for_txbatcher() * 2 invoice_amount_sat = lightning_amount_sat - prepay_amount_sat else: invoice_amount_sat = lightning_amount_sat @@ -806,13 +758,15 @@ class SwapManager(Logger): return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount) def create_funding_tx( - self, - swap: SwapData, - tx: Optional[PartialTransaction], - *, - password, + self, + swap: SwapData, + tx: Optional[PartialTransaction], + *, + password, ) -> PartialTransaction: # create funding tx + # use fee policy set by user (not using txbatcher) + fee_policy = FeePolicy(self.config.FEE_POLICY) # note: rbf must not decrease payment # this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output if tx is None: @@ -821,7 +775,7 @@ class SwapManager(Logger): outputs=[funding_output], rbf=True, password=password, - fee_policy=self.fee_policy, + fee_policy=fee_policy, ) else: tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address) @@ -957,7 +911,7 @@ class SwapManager(Logger): self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000 self._min_amount = 20000 self._max_amount = 10000000 - self.mining_fee = self.get_fee(SWAP_TX_SIZE) + self.mining_fee = self.get_fee_for_txbatcher() def update_pairs(self, pairs): self.logger.info(f'updating fees {pairs}') @@ -1075,13 +1029,12 @@ class SwapManager(Logger): swaps.append(swap) return swaps - def get_swap_by_claim_tx(self, tx: Transaction) -> Optional[SwapData]: - # note: we don't batch claim txs atm (batch_rbf cannot combine them - # as the inputs do not belong to the wallet) - if not (len(tx.inputs()) == 1 and len(tx.outputs()) == 1): - return None - txin = tx.inputs()[0] - return self.get_swap_by_claim_txin(txin) + def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[SwapData]: + swaps = [] + for i, txin in enumerate(tx.inputs()): + if swap := self.get_swap_by_claim_txin(txin): + swaps.append((i, swap)) + return swaps def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]: return self._swaps_by_funding_outpoint.get(txin.prevout) diff --git a/electrum/transaction.py b/electrum/transaction.py index fcc5f338f..3abc4879d 100644 --- a/electrum/transaction.py +++ b/electrum/transaction.py @@ -1247,6 +1247,9 @@ class Transaction: def get_change_outputs(self): return [o for o in self._outputs if o.is_change] + def has_change(self): + return len(self.get_change_outputs()) > 0 + def get_dummy_output(self, dummy_addr: str) -> Optional['PartialTxOutput']: idxs = self.get_output_idxs_from_address(dummy_addr) if not idxs: diff --git a/electrum/txbatcher.py b/electrum/txbatcher.py new file mode 100644 index 000000000..5cdc80988 --- /dev/null +++ b/electrum/txbatcher.py @@ -0,0 +1,459 @@ +import asyncio +import threading +import copy + +from typing import Dict, Sequence +from . import util +from .bitcoin import dust_threshold +from .logging import Logger +from .util import log_exceptions, NotEnoughFunds, BelowDustLimit +from .transaction import PartialTransaction, PartialTxOutput, Transaction +from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE +from .lnsweep import SweepInfo +from typing import Optional + +# This class batches outgoing payments and incoming utxo sweeps. +# It ensures that we do not send a payment twice. +# +# Explanation of the problem: +# Suppose we are asked to send two payments: first o1, and then o2. +# We replace tx1(o1) (that pays to o1) with tx1'(o1,o2), that pays to o1 and o2. +# tx1 and tx1' use the same inputs, so they cannot both be mined in the same blockchain. +# If tx1 is mined instead of tx1', we now need to pay o2, so we will broadcast a new transaction tx2(o2). +# However, tx1 may be removed from the blockchain, due to a reorg, and a chain with tx1' can become the valid one. +# In that case, we might pay o2 twice: with tx1' and with tx2 +# +# The following code prevents that by making tx2 a child of tx1. +# This is denoted by tx2(tx1|o2). +# +# Example: +# +# output 1: tx1(o1) --------------- +# \ +# output 2: tx1'(o1,o2)------- ----> tx2(tx1|o2) ------ +# \ \ \ +# output 3: tx1''(o1,o2,o3) \ ---> tx2'(tx1|o2,o3) ----> tx3(tx2|o3) (if tx2 is mined) +# \ +# --------------------------------> tx3(tx1'|o3) (if tx1' is mined) +# +# In the above example, we have to make 3 payments. +# Suppose we have broadcast tx1, tx1' and tx1'' +# - if tx1 gets mined, we broadcast: tx2'(tx1|o2,o3) +# - if tx1' gets mined, we broadcast tx3(tx1'|o3) +# +# Note that there are two possible execution paths that may lead to the creation of tx3: +# - as a child of tx2 +# - as a child of tx1' +# +# A batch is a set of incompatible txs, such as [tx1, tx1', tx1'']. +# Note that we do not persist older batches. We only persist the current batch in self.batch_txids. +# Thus, if we need to broadcast tx2 or tx2', then self.batch_txids is reset, and the old batch is forgotten. +# +# If we cannot RBF a transaction (because the server returns an error), then we create a new batch, +# as if the transaction had been mined. +# if cannot_rbf(tx1) -> broadcast tx2(tx1,o2). The new base is now tx2(tx,o2) +# if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3) +# +# +# Notes: +# +# 1. CPFP: +# When a batch is forgotten but not mined (because the server returned an error), we no longer bump its fee. +# However, the current code does not theat the next batch as a CPFP when computing the fee. +# +# 2. Reorgs: +# This code does not guarantee that a payment or a sweep will happen. +# This is fine for sweeps; it is the responsibility of the caller (lnwatcher) to add them again. +# To make payments reorg-safe, we would need to persist more data and redo failed payments. +# +# 3. batch_payments and batch_inputs are not persisted. +# In the case of sweeps, lnwatcher ensures that SweepInfo is added again after a client restart. +# In order to generalize that logic to payments, callers would need to pass a unique ID along with +# the payment output, so that we can prevent paying twice. + +from .json_db import locked, StoredDict +from .fee_policy import FeePolicy + + + +class TxBatcher(Logger): + + SLEEP_INTERVAL = 1 + RETRY_DELAY = 60 + + def __init__(self, wallet): + Logger.__init__(self) + self.lock = threading.RLock() + self.storage = wallet.db.get_stored_item("tx_batches", {}) + self.tx_batches = {} + self.wallet = wallet + for key, item_storage in self.storage.items(): + self.tx_batches[key] = TxBatch(self.wallet, item_storage) + self._legacy_htlcs = {} + + @locked + def add_payment_output(self, key: str, output: 'PartialTxOutput', fee_policy_descriptor: str): + batch = self._maybe_create_new_batch(key, fee_policy_descriptor) + batch.add_payment_output(output) + + @locked + def add_sweep_input(self, key: str, sweep_info: 'SweepInfo', fee_policy_descriptor: str): + if sweep_info.txin and sweep_info.txout: + # todo: don't use name, detect sighash + if sweep_info.name == 'first-stage-htlc': + if sweep_info.txin.prevout not in self._legacy_htlcs: + self.logger.info(f'received {sweep_info.name}') + self._legacy_htlcs[sweep_info.txin.prevout] = sweep_info + return + if not sweep_info.can_be_batched: + # create a batch only for that input + key = sweep_info.txin.prevout.to_str() + batch = self._maybe_create_new_batch(key, fee_policy_descriptor) + batch.add_sweep_input(sweep_info) + + def _maybe_create_new_batch(self, key, fee_policy_descriptor: str): + if key not in self.storage: + self.storage[key] = { 'fee_policy': fee_policy_descriptor, 'txids': [] } + self.tx_batches[key] = TxBatch(self.wallet, self.storage[key]) + elif self.storage[key]['fee_policy'] != fee_policy_descriptor: + # maybe update policy? + self.logger.warning('fee policy passed to txbatcher inconsistent with existing batch') + return self.tx_batches[key] + + def _delete_batch(self, key): + self.logger.info(f'deleting TxBatch {key}') + self.storage.pop(key) + self.tx_batches.pop(key) + + def find_batch_of_txid(self, txid) -> str: + for k, v in self.tx_batches.items(): + if v.is_mine(txid): + return k + + def is_mine(self, txid): + # used to prevent GUI from interfering + return bool(self.find_batch_of_txid(txid)) + + @log_exceptions + async def run(self): + while True: + await asyncio.sleep(self.SLEEP_INTERVAL) + password = self.wallet.get_unlocked_password() + if self.wallet.has_keystore_encryption() and not password: + continue + for key, txbatch in list(self.tx_batches.items()): + try: + await txbatch.run_iteration(password) + if txbatch.is_done(): + self._delete_batch(key) + except Exception as e: + self.logger.exception(f'TxBatch error: {repr(e)}') + self._delete_batch(key) + continue + for sweep_info in self._legacy_htlcs.values(): + await self._maybe_redeem_legacy_htlcs(sweep_info) + + async def _maybe_redeem_legacy_htlcs(self, sweep_info): + local_height = self.wallet.network.get_local_height() + wanted_height = sweep_info.cltv_abs + if wanted_height - local_height > 0: + return + # fixme: what if sweep info has a csv? + outpoint = sweep_info.txin.prevout.to_str() + prev_txid, index = outpoint.split(':') + if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): + tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) + if tx_mined_status.height > 0: + return + if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + return + self.logger.info(f'will broadcast standalone tx {sweep_info.name}') + tx = PartialTransaction.from_io([sweep_info.txin], [sweep_info.txout], locktime=sweep_info.cltv_abs, version=2) + self.wallet.sign_transaction(tx, password=None, ignore_warnings=True) + if await self.wallet.network.try_broadcasting(tx, sweep_info.name): + self.wallet.adb.add_transaction(tx) + + + +class TxBatch(Logger): + + def __init__(self, wallet, storage: StoredDict): + Logger.__init__(self) + self.wallet = wallet + self.lock = threading.RLock() + self.batch_payments = [] # list of payments we need to make + self.batch_inputs = {} # list of inputs we need to sweep + # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined. + self._batch_txids = storage['txids'] + self.fee_policy = FeePolicy(storage['fee_policy']) + self._base_tx = None # current batch tx. last element of batch_txids + if self._batch_txids: + last_txid = self._batch_txids[-1] + tx = self.wallet.adb.get_transaction(last_txid) + if tx: + tx = PartialTransaction.from_tx(tx) + tx.add_info_from_wallet(self.wallet) # this adds input amounts + self._base_tx = tx + self.logger.info(f'found base_tx {last_txid}') + + self._parent_tx = None + self._unconfirmed_sweeps = set() # list of inputs we are sweeping (until spending tx is confirmed) + + def is_mine(self, txid): + return txid in self._batch_txids + + @locked + def add_payment_output(self, output: 'PartialTxOutput'): + # todo: maybe we should raise NotEnoughFunds here + self.batch_payments.append(output) + + def is_dust(self, sweep_info): + if sweep_info.name in ['local_anchor', 'remote_anchor']: + return False + if sweep_info.txout is not None: + return False + value = sweep_info.txin._trusted_value_sats + witness_size = len(sweep_info.txin.make_witness(71*b'\x00')) + tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh + self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}') + fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.wallet.network, allow_fallback_to_static_rates=True) + return value - fee <= dust_threshold() + + @locked + def add_sweep_input(self, sweep_info: 'SweepInfo'): + if self.is_dust(sweep_info): + raise BelowDustLimit + txin = sweep_info.txin + if txin.prevout in self._unconfirmed_sweeps: + return + # early return if the spending tx is confirmed + # if its block is orphaned, the txin will be added again + prevout = txin.prevout.to_str() + prev_txid, index = prevout.split(':') + if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): + tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) + if tx_mined_status.height > 0: + return + self._unconfirmed_sweeps.add(txin.prevout) + self.logger.info(f'add_sweep_info: {sweep_info.name} {sweep_info.txin.prevout.to_str()}') + self.batch_inputs[txin.prevout] = sweep_info + + def get_base_tx(self) -> Optional[Transaction]: + return self._base_tx + + def _find_confirmed_base_tx(self) -> Optional[Transaction]: + for txid in self._batch_txids: + tx_mined_status = self.wallet.adb.get_tx_height(txid) + if tx_mined_status.conf > 0: + tx = self.wallet.adb.get_transaction(txid) + tx = PartialTransaction.from_tx(tx) + tx.add_info_from_wallet(self.wallet) # needed for txid + return tx + + @locked + def _to_pay_after(self, tx) -> Sequence[PartialTxOutput]: + if not tx: + return self.batch_payments + to_pay = [] + outputs = copy.deepcopy(tx.outputs()) + for x in self.batch_payments: + if x not in outputs: + to_pay.append(x) + else: + outputs.remove(x) + return to_pay + + @locked + def _to_sweep_after(self, tx) -> Dict[str, SweepInfo]: + tx_prevouts = set(txin.prevout for txin in tx.inputs()) if tx else set() + result = [] + for k,v in self.batch_inputs.items(): + prevout = v.txin.prevout + prev_txid, index = prevout.to_str().split(':') + if not self.wallet.adb.db.get_transaction(prev_txid): + continue + if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): + tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) + if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: + continue + if prevout in tx_prevouts: + continue + result.append((k,v)) + return dict(result) + + def _should_bump_fee(self, base_tx) -> bool: + if base_tx is None: + return False + if not self.is_mine(base_tx.txid()): + return False + base_tx_fee = base_tx.get_fee() + recommended_fee = self.fee_policy.estimate_fee(base_tx.estimated_size(), network=self.wallet.network, allow_fallback_to_static_rates=True) + should_bump_fee = base_tx_fee * 1.1 < recommended_fee + if should_bump_fee: + self.logger.info(f'base tx fee too low {base_tx_fee} < {recommended_fee}. we will bump the fee') + return should_bump_fee + + def is_done(self): + # todo: require more than one confirmation + return len(self.batch_inputs) == 0 and len(self.batch_payments) == 0 and len(self._batch_txids) == 0 + + async def run_iteration(self, password): + conf_tx = self._find_confirmed_base_tx() + if conf_tx: + self.logger.info(f'base tx confirmed {conf_tx.txid()}') + self._clear_unconfirmed_sweeps(conf_tx) + self._start_new_batch(conf_tx) + + base_tx = self.get_base_tx() + # if base tx has been RBF-replaced, detect it here + try: + tx = self.create_next_transaction(base_tx, password) + except Exception as e: + if base_tx: + self.logger.exception(f'Cannot create batch transaction: {repr(e)}') + self._start_new_batch(base_tx) + return + else: + # will be caught by txBatcher + raise + + if tx is None: + # nothing to do + return + + if await self.wallet.network.try_broadcasting(tx, 'batch'): + self.wallet.adb.add_transaction(tx) + if tx.has_change(): + self._batch_txids.append(tx.txid()) + self._base_tx = tx + else: + self.logger.info(f'starting new batch because current base tx does not have change') + self._start_new_batch(tx) + else: + # most likely reason is that base_tx is not replaceable + # this may be the case if it has children (because we don't pay enough fees to replace them) + # or if we are trying to sweep unconfirmed inputs (replacement-adds-unconfirmed error) + self.logger.info(f'cannot broadcast tx {tx}') + if base_tx: + self.logger.info(f'starting new batch because could not broadcast') + self._start_new_batch(base_tx) + + + def create_next_transaction(self, base_tx, password): + to_pay = self._to_pay_after(base_tx) + to_sweep = self._to_sweep_after(base_tx) + to_sweep_now = {} + for k, v in to_sweep.items(): + can_broadcast, wanted_height = self._can_broadcast(v, base_tx) + if can_broadcast: + to_sweep_now[k] = v + else: + self.wallet.add_future_tx(v, wanted_height) + if not to_pay and not to_sweep_now and not self._should_bump_fee(base_tx): + return + while True: + tx = self._create_batch_tx(base_tx, to_sweep_now, to_pay, password) + # 100 kb max standardness rule + if tx.estimated_size() < 100_000: + break + to_sweep_now = to_sweep_now[0:len(to_sweep_now)//2] + to_pay = to_pay[0:len(to_pay)//2] + + self.logger.info(f'created tx with {len(tx.inputs())} inputs and {len(tx.outputs())} outputs') + self.logger.info(f'{str(tx)}') + return tx + + def _create_batch_tx(self, base_tx, to_sweep, to_pay, password): + self.logger.info(f'to_sweep: {list(to_sweep.keys())}') + self.logger.info(f'to_pay: {to_pay}') + inputs = [] + outputs = [] + locktime = base_tx.locktime if base_tx else None + # sort inputs so that txin-txout pairs are first + for sweep_info in sorted(to_sweep.values(), key=lambda x: not bool(x.txout)): + if sweep_info.cltv_abs is not None: + if locktime is None or locktime < sweep_info.cltv_abs: + # nLockTime must be greater than or equal to the stack operand. + locktime = sweep_info.cltv_abs + inputs.append(copy.deepcopy(sweep_info.txin)) + if sweep_info.txout: + outputs.append(sweep_info.txout) + self.logger.info(f'locktime: {locktime}') + outputs += to_pay + inputs += self._create_inputs_from_tx_change(self._parent_tx) if self._parent_tx else [] + # add sweep info base_tx inputs + if base_tx: + for txin in base_tx.inputs(): + if sweep_info := self.batch_inputs.get(txin.prevout): + if hasattr(txin, 'make_witness'): + txin.make_witness = sweep_info.txin.make_witness + txin.privkey = sweep_info.txin.privkey + txin.witness_script = sweep_info.txin.witness_script + txin.script_sig = sweep_info.txin.script_sig + # create tx + tx = self.wallet.create_transaction( + fee_policy=self.fee_policy, + base_tx=base_tx, + inputs=inputs, + outputs=outputs, + password=password, + locktime=locktime, + BIP69_sort=False, + merge_duplicate_outputs=False, + ) + # this assert will fail if we merge duplicate outputs + for o in outputs: assert o in tx.outputs() + assert tx.is_complete() + return tx + + def _clear_unconfirmed_sweeps(self, tx): + # this ensures that we can accept an input again, + # in case the sweeping tx has been removed from the blockchain after a reorg + for txin in tx.inputs(): + if txin.prevout in self._unconfirmed_sweeps: + self._unconfirmed_sweeps.remove(txin.prevout) + + @locked + def _start_new_batch(self, tx): + use_change = tx and tx.has_change() and any([txout in self.batch_payments for txout in tx.outputs()]) + self.batch_payments = self._to_pay_after(tx) + self.batch_inputs = self._to_sweep_after(tx) + self._batch_txids.clear() + self._base_tx = None + self._parent_tx = tx if use_change else None + + def _create_inputs_from_tx_change(self, parent_tx): + inputs = [] + for o in parent_tx.get_change_outputs(): + coins = self.wallet.adb.get_addr_utxo(o.address) + inputs += list(coins.values()) + for txin in inputs: + txin.nsequence = 0xffffffff - 2 + return inputs + + def _can_broadcast(self, sweep_info: 'SweepInfo', base_tx: 'Transaction'): + prevout = sweep_info.txin.prevout.to_str() + name = sweep_info.name + prev_txid, index = prevout.split(':') + can_broadcast = True + wanted_height = None + local_height = self.wallet.network.get_local_height() + if sweep_info.cltv_abs: + wanted_height = sweep_info.cltv_abs + if wanted_height - local_height > 0: + can_broadcast = False + prev_height = self.wallet.adb.get_tx_height(prev_txid).height + if sweep_info.csv_delay: + if prev_height > 0: + wanted_height = prev_height + sweep_info.csv_delay - 1 + if wanted_height - local_height > 0: + can_broadcast = False + else: + can_broadcast = False + if base_tx and prev_height <= 0: + # we cannot add unconfirmed inputs to existing base_tx (per RBF rules) + # thus, we will wait until the current batch is confirmed + can_broadcast = False + wanted_height = prev_height + return can_broadcast, wanted_height + diff --git a/electrum/wallet.py b/electrum/wallet.py index 8b3e92dd4..9b72948c2 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -33,7 +33,6 @@ import time import json import copy import errno -import traceback import operator import math from functools import partial @@ -60,7 +59,7 @@ from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup, ignore InvalidPassword, format_time, timestamp_to_datetime, Satoshis, Fiat, bfh, TxMinedInfo, quantize_feerate, OrderedDictWithIndex) from .simple_config import SimpleConfig -from .fee_policy import FeePolicy, FeeMethod, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE +from .fee_policy import FeePolicy, FixedFeePolicy, FeeMethod, FEE_RATIO_HIGH_WARNING, FEERATE_WARNING_HIGH_FEE from .bitcoin import COIN, TYPE_ADDRESS from .bitcoin import is_address, address_to_script, is_minikey, relayfee, dust_threshold from .bitcoin import DummyAddress, DummyAddressUsedInTxException @@ -90,12 +89,14 @@ from .util import EventListener, event_listener from . import descriptor from .descriptor import Descriptor from .util import OnchainHistoryItem, LightningHistoryItem +from .txbatcher import TxBatcher if TYPE_CHECKING: from .network import Network from .exchange_rate import FxThread from .submarine_swaps import SwapData from .lnchannel import AbstractChannel + from .lnsweep import SweepInfo _logger = get_logger(__name__) @@ -431,6 +432,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): self._freeze_lock = threading.RLock() # for mutating/iterating frozen_{addresses,coins} self.load_keystore() + self.txbatcher = TxBatcher(self) self._init_lnworker() self._init_requests_rhash_index() self._prepare_onchain_invoice_paid_detection() @@ -461,6 +463,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): async with self.taskgroup as group: await group.spawn(asyncio.Event().wait) # run forever (until cancel) await group.spawn(self.do_synchronize_loop()) + await group.spawn(self.txbatcher.run()) except Exception as e: self.logger.exception("taskgroup died.") finally: @@ -855,8 +858,8 @@ class Abstract_Wallet(ABC, Logger, EventListener): return True return False - def get_swap_by_claim_tx(self, tx: Transaction) -> Optional['SwapData']: - return self.lnworker.swap_manager.get_swap_by_claim_tx(tx) if self.lnworker else None + def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable['SwapData']: + return self.lnworker.swap_manager.get_swaps_by_claim_tx(tx) if self.lnworker else [] def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable['SwapData']: return self.lnworker.swap_manager.get_swaps_by_funding_tx(tx) if self.lnworker else [] @@ -905,7 +908,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): tx_wallet_delta = self.get_wallet_delta(tx) is_relevant = tx_wallet_delta.is_relevant is_any_input_ismine = tx_wallet_delta.is_any_input_ismine - is_swap = bool(self.get_swap_by_claim_tx(tx)) + is_swap = bool(self.get_swaps_by_claim_tx(tx)) fee = tx_wallet_delta.fee exp_n = None can_broadcast = False @@ -1736,9 +1739,12 @@ class Abstract_Wallet(ABC, Logger, EventListener): tx = self.db.get_transaction(hist_item.txid) if not tx: continue + txid = tx.txid() + # tx should not belong to tx batcher + if self.txbatcher.is_mine(txid): + continue # is_mine outputs should not be spent yet # to avoid cancelling our own dependent transactions - txid = tx.txid() if any([self.is_mine(o.address) and self.db.get_spent_outpoint(txid, output_idx) for output_idx, o in enumerate(tx.outputs())]): continue @@ -2096,6 +2102,8 @@ class Abstract_Wallet(ABC, Logger, EventListener): tx.remove_signatures() if not self.can_rbf_tx(tx): raise CannotBumpFee(_('Transaction is final')) + if self.txbatcher.is_mine(tx.txid()): + raise CannotBumpFee('Transaction managed by txbatcher') new_fee_rate = quantize_feerate(new_fee_rate) # strip excess precision tx.add_info_from_wallet(self) if tx.is_missing_info_from_network(): @@ -2320,6 +2328,8 @@ class Abstract_Wallet(ABC, Logger, EventListener): # do not mutate LN funding txs, as that would change their txid if not is_dscancel and self.is_lightning_funding_tx(tx.txid()): return False + if self.txbatcher.is_mine(tx.txid()): + return False return tx.is_rbf_enabled() def cpfp(self, tx: Transaction, fee: int) -> Optional[PartialTransaction]: @@ -2527,7 +2537,7 @@ class Abstract_Wallet(ABC, Logger, EventListener): for k in self.get_keystores(): if k.can_sign_txin(txin): return True - if self.get_swap_by_claim_tx(tx): + if self.get_swaps_by_claim_tx(tx): return True return False @@ -3327,6 +3337,34 @@ class Abstract_Wallet(ABC, Logger, EventListener): return None return self.config.format_amount_and_units(frozen_bal) + def add_future_tx(self, sweep_info: 'SweepInfo', wanted_height: int): + """ add local tx to provide user feedback """ + txin = copy.deepcopy(sweep_info.txin) + prevout = txin.prevout.to_str() + prev_txid, index = prevout.split(':') + if txid := self.adb.db.get_spent_outpoint(prev_txid, int(index)): + # set future tx of existing spender because it is not persisted + # (and wanted_height can change if input of CSV was not mined before) + self.adb.set_future_tx(txid, wanted_height=wanted_height) + return + name = sweep_info.name + # outputs = [] will send coins to a change address + tx = self.create_transaction( + inputs=[txin], + outputs=[], + password=None, + fee_policy=FixedFeePolicy(0), + sign=False, + ) + try: + self.adb.add_transaction(tx) + except Exception as e: + self.logger.info(f'could not add future tx: {name}. prevout: {prevout} {str(e)}') + return + self.logger.info(f'added future tx: {name}. prevout: {prevout}') + util.trigger_callback('wallet_updated', self) + self.adb.set_future_tx(tx.txid(), wanted_height=wanted_height) + class Simple_Wallet(Abstract_Wallet): # wallet with a single keystore diff --git a/tests/regtest.py b/tests/regtest.py index 603737923..cb4215a3d 100644 --- a/tests/regtest.py +++ b/tests/regtest.py @@ -47,6 +47,8 @@ class TestUnixSockets(TestLightning): class TestLightningAB(TestLightning): agents = { 'alice': { + 'test_force_disable_mpp': 'false', + 'test_force_mpp': 'true', }, 'bob': { 'lightning_listen': 'localhost:9735', diff --git a/tests/regtest/regtest.sh b/tests/regtest/regtest.sh index bc455d980..d7b2688ac 100755 --- a/tests/regtest/regtest.sh +++ b/tests/regtest/regtest.sh @@ -252,6 +252,8 @@ if [[ $1 == "swapserver_forceclose" ]]; then new_blocks 1 wait_until_spent $funding_txid 0 # alice reveals preimage new_blocks 1 + sleep 2 + new_blocks 144 wait_for_balance bob 0.999 fi @@ -407,6 +409,7 @@ if [[ $1 == "breach_with_unspent_htlc" ]]; then fi echo "alice breaches with old ctx" $bitcoin_cli sendrawtransaction $ctx + new_blocks 1 wait_for_balance bob 1.14 fi diff --git a/tests/test_sswaps.py b/tests/test_sswaps.py deleted file mode 100644 index ecae7b6d6..000000000 --- a/tests/test_sswaps.py +++ /dev/null @@ -1,80 +0,0 @@ -from electrum import SimpleConfig -from electrum.util import bfh -from electrum.transaction import PartialTxInput, TxOutpoint -from electrum.submarine_swaps import SwapData, create_claim_tx -from electrum.fee_policy import FeePolicy - -from . import ElectrumTestCase - - -class TestSwapTxs(ElectrumTestCase): - TESTNET = True - - def setUp(self): - super().setUp() - self.maxDiff = None - self.config = SimpleConfig({'electrum_path': self.electrum_path}) - self.fee_policy = FeePolicy('feerate:1000') - - def test_claim_tx_for_successful_reverse_swap(self): - swap_data = SwapData( - is_reverse=True, - locktime=2420532, - onchain_amount=198694, - lightning_amount=200000, - redeem_script=bytes.fromhex('8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac'), - preimage=bytes.fromhex('f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a36'), - prepay_hash=None, - privkey=bytes.fromhex('58fd0018a9a2737d1d6b81d380df96bf0c858473a9592015508a270a7c9b1d8d'), - lockup_address='tb1q2pvugjl4w56rqw4c7zg0q6mmmev0t5jjy3qzg7sl766phh9fxjxsrtl77t', - receive_address='tb1ql0adrj58g88xgz375yct63rclhv29hv03u0mel', - funding_txid='897eea7f53e917323e7472d7a2e3099173f7836c57f1b6850f5cbdfe8085dbf9', - spending_txid=None, - is_redeemed=False, - ) - txin = PartialTxInput( - prevout=TxOutpoint(txid=bfh(swap_data.funding_txid), out_idx=0), - ) - txin._trusted_value_sats = swap_data.onchain_amount - tx = create_claim_tx( - txin=txin, - swap=swap_data, - fee_policy=self.fee_policy, - network=None, - ) - self.assertEqual( - "02000000000101f9db8580febd5c0f85b6f1576c83f7739109e3a2d772743e3217e9537fea7e890000000000fdffffff019007030000000000160014fbfad1ca8741ce640a3ea130bd4478fdd8a2dd8f03473044022025506044aba4939f4f2faa94710673ca65530a621f1fa538a3d046dc98bb685e02205f8d463dc6f81e1083f26fa963e581dabc80ea42f8cd59c9e31f3bf531168a9c0120f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a366a8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac00000000", - str(tx) - ) - - def test_claim_tx_for_timing_out_forward_swap(self): - swap_data = SwapData( - is_reverse=False, - locktime=2420537, - onchain_amount=130000, - lightning_amount=129014, - redeem_script=bytes.fromhex('a914b12bd886ef4fd9ef1c03e899123f2c4b96cec0878763210267ca676c2ed05bb6c380880f1e50b6ef91025dfa963dc49d6c5cb9848f2acf7d670339ef24b1752103d8190cdfcc7dd929a583b7ea8fa8eb1d8463195d336be2f2df94f950ce8b659968ac'), - preimage=bytes.fromhex('116f62c3283e4eb0b947a9cb672f1de7321d2c2373d12cd010500adffc32b1f2'), - prepay_hash=None, - privkey=bytes.fromhex('8d30dead21f5a7a6eeab7456a9a9d449511e942abef9302153cfff84e436614c'), - lockup_address='tb1qte2qwev6qvmrhsddac82tnskmjg02ntn73xqg2rjt0qx2xpz693sw2ljzg', - receive_address='tb1qj76twx886pkfcs7d808n0yzsgxm33wqlwe0dt0', - funding_txid='08ecdcb19ab38fc1288c97da546b8c90549be2348ef306f476dcf6e505158706', - spending_txid=None, - is_redeemed=False, - ) - txin = PartialTxInput( - prevout=TxOutpoint(txid=bfh(swap_data.funding_txid), out_idx=0), - ) - txin._trusted_value_sats = swap_data.onchain_amount - tx = create_claim_tx( - txin=txin, - swap=swap_data, - fee_policy=self.fee_policy, - network=None, - ) - self.assertEqual( - "0200000000010106871505e5f6dc76f406f38e34e29b54908c6b54da978c28c18fb39ab1dcec080000000000fdffffff013afb01000000000016001497b4b718e7d06c9c43cd3bcf37905041b718b81f0347304402200ae708af1393f785c541bbc4d7351791b76a53077a292b71cb2a25ad13a15f9902206b7b91c414ec0d6e5098a1acc26de4b47f3aac414b7a49741e8f27cc6a967a19010065a914b12bd886ef4fd9ef1c03e899123f2c4b96cec0878763210267ca676c2ed05bb6c380880f1e50b6ef91025dfa963dc49d6c5cb9848f2acf7d670339ef24b1752103d8190cdfcc7dd929a583b7ea8fa8eb1d8463195d336be2f2df94f950ce8b659968ac39ef2400", - str(tx) - ) - diff --git a/tests/test_txbatcher.py b/tests/test_txbatcher.py new file mode 100644 index 000000000..1d127aed1 --- /dev/null +++ b/tests/test_txbatcher.py @@ -0,0 +1,216 @@ +import unittest +import logging +from unittest import mock +import asyncio + +from electrum import storage, bitcoin, keystore, wallet +from electrum import Transaction +from electrum import SimpleConfig +from electrum import util +from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_LOCAL +from electrum.transaction import Transaction, PartialTxInput, PartialTxOutput, TxOutpoint +from electrum.logging import console_stderr_handler, Logger +from electrum.submarine_swaps import SwapManager, SwapData +from electrum.lnsweep import SweepInfo +from electrum.fee_policy import FeeTimeEstimates + +from . import ElectrumTestCase +from .test_wallet_vertical import WalletIntegrityHelper + +class MockNetwork(Logger): + + def __init__(self, config): + self.config = config + self.fee_estimates = FeeTimeEstimates() + self.asyncio_loop = util.get_asyncio_loop() + self.interface = None + self.relay_fee = 1000 + self.wallets = [] + self._tx_event = asyncio.Event() + + def get_local_height(self): + return 42 + + def blockchain(self): + class BlockchainMock: + def is_tip_stale(self): + return True + return BlockchainMock() + + async def try_broadcasting(self, tx, name): + for w in self.wallets: + w.adb.receive_tx_callback(tx, TX_HEIGHT_UNCONFIRMED) + + self._tx_event.set() + self._tx = tx + self._tx_event.clear() + return tx.txid() + + async def next_tx(self): + await self._tx_event.wait() + return self._tx + + +WALLET_SEED = 'cause carbon luggage air humble mistake melt paper supreme sense gravity void' +FUNDING_TX = '020000000001021798e10f8b7220c57ea0d605316a52453ca9b3eed99996b5b7bdf4699548bb520000000000fdffffff277d82678d238ca45dd3490ac9fbb49272f0980b093b9197ff70ec8eb082cfb00100000000fdffffff028c360100000000001600147a9bfd90821be827275023849dd91ee80d494957a08601000000000016001476efaaa243327bf3a2c0f5380cb3914099448cec024730440220354b2a74f5ac039cca3618f7ff98229d243b89ac40550c8b027894f2c5cb88ff022064cb5ab1539b4c5367c2e01a8362e0aa12c2732bc8d08c3fce6eab9e56b7fe19012103e0a1499cb3d8047492c60466722c435dfbcffae8da9b83e758fbd203d12728f502473044022073cef8b0cfb093aed5b8eaacbb58c2fa6a69405a8e266cd65e76b726c9151d7602204d5820b23ab96acc57c272aac96d94740a20a6b89c016aa5aed7c06d1e6b9100012102f09e50a265c6a0dcf7c87153ea73d7b12a0fbe9d7d0bbec5db626b2402c1e85c02fa2400' +SWAP_FUNDING_TX = "01000000000101500e9d67647481864edfb020b5c45e1c40d90f06b0130f9faed1a5149c6d26450000000000ffffffff0226080300000000002200205059c44bf57534303ab8f090f06b7bde58f5d2522440247a1ff6b41bdca9348df312c20100000000160014021d4f3b17921d790e1c022367a5bb078ce4deb402483045022100d41331089a2031396a1db8e4dec6dda9cacefe1288644b92f8e08a23325aa19b02204159230691601f7d726e4e6e0b7124d3377620f400d699a01095f0b0a09ee26a012102d60315c72c0cefd41c6d07883c20b88be3fc37aac7912f0052722a95de0de71600000000" +SWAP_CLAIM_TX = "02000000000101f9db8580febd5c0f85b6f1576c83f7739109e3a2d772743e3217e9537fea7e890000000000fdffffff017005030000000000160014b113a47f3718da3fd161339a6681c150fef2cfe30347304402206736066ce15d34eed20951a9d974a100a72dc034f9c878769ddf27f9a584dcb1022042b14d627b8e8465a3a129bb43c0bd8369f49bbcf473879b9a477263655f1f930120f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a366a8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac00000000" + + +class TestTxBatcher(ElectrumTestCase): + + TESTNET = True + + @classmethod + def setUpClass(cls): + super().setUpClass() + console_stderr_handler.setLevel(logging.DEBUG) + + def setUp(self): + super().setUp() + self.config = SimpleConfig({'electrum_path': self.electrum_path}) + self.fee_policy_descriptor = 'feerate:5000' + + async def asyncSetUp(self): + await super().asyncSetUp() + self.network = MockNetwork(self.config) + + def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2): + if config is None: + config = self.config + ks = keystore.from_seed(seed_words, passphrase='', for_multisig=False) + return WalletIntegrityHelper.create_standard_wallet(ks, gap_limit=gap_limit, config=config) + + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') + async def test_batch_payments(self, mock_save_db): + # output 1: tx1(o1) --------------- + # \ + # output 2: tx1'(o1,o2) ----> tx2(tx1|o2) + # + # tx1 is broadcast, and replaced by tx1' + # tx1 gets mined + # txbatcher creates a new transaction tx2, child of tx1 + # + OUTGOING_ADDRESS = 'tb1q7rl9cxr85962ztnsze089zs8ycv52hk43f3m9n' + # create wallet + wallet = self.create_standard_wallet_from_seed(WALLET_SEED) + wallet.start_network(self.network) + wallet.txbatcher.SLEEP_INTERVAL = 0.01 + self.network.wallets.append(wallet) + # fund wallet + funding_tx = Transaction(FUNDING_TX) + await self.network.try_broadcasting(funding_tx, 'funding') + assert wallet.adb.get_transaction(funding_tx.txid()) is not None + self.logger.info(f'wallet balance {wallet.get_balance()}') + # payment 1 -> tx1(output1) + output1 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 10_000) + wallet.txbatcher.add_payment_output('default', output1, self.fee_policy_descriptor) + tx1 = await self.network.next_tx() + assert output1 in tx1.outputs() + # payment 2 -> tx2(output1, output2) + output2 = PartialTxOutput.from_address_and_value(OUTGOING_ADDRESS, 20_000) + wallet.txbatcher.add_payment_output('default', output2, self.fee_policy_descriptor) + tx1_prime = await self.network.next_tx() + assert wallet.adb.get_transaction(tx1_prime.txid()) is not None + assert len(tx1_prime.outputs()) == 3 + assert output1 in tx1_prime.outputs() + assert output2 in tx1_prime.outputs() + # tx1 gets confirmed, tx2 gets removed + wallet.adb.receive_tx_callback(tx1, 1) + tx_mined_status = wallet.adb.get_tx_height(tx1.txid()) + wallet.adb.add_verified_tx(tx1.txid(), tx_mined_status._replace(conf=1)) + assert wallet.adb.get_transaction(tx1.txid()) is not None + assert wallet.adb.get_transaction(tx1_prime.txid()) is None + # txbatcher creates tx2 + tx2 = await self.network.next_tx() + assert output1 in tx1.outputs() + assert output2 in tx2.outputs() + # check that tx2 is child of tx1 + assert len(tx2.inputs()) == 1 + assert tx2.inputs()[0].prevout.txid.hex() == tx1.txid() + + + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') + async def test_rbf_batching__cannot_batch_as_would_need_to_use_ismine_outputs_of_basetx(self, mock_save_db): + """Wallet history contains unconf tx1 that spends all its coins to two ismine outputs, + one 'recv' address (20k sats) and one 'change' (80k sats). + The user tries to create tx2, that pays an invoice for 90k sats. + The tx batcher fails to batch, and should create a child transaction + """ + wallet = self.create_standard_wallet_from_seed(WALLET_SEED) + wallet.start_network(self.network) + wallet.txbatcher.SLEEP_INTERVAL = 0.01 + wallet.txbatcher.RETRY_DELAY = 0.60 + self.network.wallets.append(wallet) + + # fund wallet + funding_tx = Transaction(FUNDING_TX) + await self.network.try_broadcasting(funding_tx, 'funding') + assert wallet.adb.get_transaction(funding_tx.txid()) is not None + self.logger.info(f'wallet balance1 {wallet.get_balance()}') + + # to_self_payment tx1 + output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000) + wallet.txbatcher.add_payment_output('default', output1, self.fee_policy_descriptor) + toself_tx = await self.network.next_tx() + assert len(toself_tx.outputs()) == 2 + assert output1 in toself_tx.outputs() + + # outgoing payment tx2 + output2 = PartialTxOutput.from_address_and_value("tb1qkfn0fude7z789uys2u7sf80kd4805zpvs3na0h", 90_000) + wallet.txbatcher.add_payment_output('default', output2, self.fee_policy_descriptor) + tx2 = await self.network.next_tx() + assert len(tx2.outputs()) == 2 + assert output2 in tx2.outputs() + + + @mock.patch.object(wallet.Abstract_Wallet, 'save_db') + async def test_sweep_from_submarine_swap(self, mock_save_db): + self.maxDiff = None + # create wallet + wallet = self.create_standard_wallet_from_seed(WALLET_SEED) + wallet.start_network(self.network) + wallet.txbatcher.SLEEP_INTERVAL = 0.01 + self.network.wallets.append(wallet) + # add swap data + swap_data = SwapData( + is_reverse=True, + locktime=2420532, + onchain_amount=198694, + lightning_amount=200000, + redeem_script=bytes.fromhex('8201208763a914d7a62ef0270960fe23f0f351b28caadab62c21838821030bfd61153816df786036ea293edce851d3a4b9f4a1c66bdc1a17f00ffef3d6b167750334ef24b1752102fc8128f17f9e666ea281c702171ab16c1dd2a4337b71f08970f5aa10c608a93268ac'), + preimage=bytes.fromhex('f1939b5723155713855d7ebea6e174f77d41d669269e7f138856c3de190e7a36'), + prepay_hash=None, + privkey=bytes.fromhex('58fd0018a9a2737d1d6b81d380df96bf0c858473a9592015508a270a7c9b1d8d'), + lockup_address='tb1q2pvugjl4w56rqw4c7zg0q6mmmev0t5jjy3qzg7sl766phh9fxjxsrtl77t', + receive_address='tb1ql0adrj58g88xgz375yct63rclhv29hv03u0mel', + funding_txid='897eea7f53e917323e7472d7a2e3099173f7836c57f1b6850f5cbdfe8085dbf9', + spending_txid=None, + is_redeemed=False, + ) + wallet.adb.db.transactions[swap_data.funding_txid] = Transaction(SWAP_FUNDING_TX) + txin = PartialTxInput( + prevout=TxOutpoint(txid=bytes.fromhex(swap_data.funding_txid), out_idx=0), + ) + txin._trusted_value_sats = swap_data.onchain_amount + txin, locktime = SwapManager.create_claim_txin(txin=txin, swap=swap_data) + sweep_info = SweepInfo( + txin=txin, + csv_delay=0, + cltv_abs=locktime, + txout=None, + name='swap claim', + can_be_batched=True, + ) + wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.fee_policy_descriptor) + tx = await self.network.next_tx() + txid = tx.txid() + self.assertEqual(SWAP_CLAIM_TX, str(tx)) + # add a new payment, reusing the same input + # this tests that txin.make_witness() can be called more than once + output1 = PartialTxOutput.from_address_and_value("tb1qyfnv3y866ufedugxxxfksyratv4pz3h78g9dad", 20_000) + wallet.txbatcher.add_payment_output('swaps', output1, self.fee_policy_descriptor) + new_tx = await self.network.next_tx() + # check that we batched with previous tx + assert new_tx.inputs()[0].prevout == tx.inputs()[0].prevout == txin.prevout + assert output1 in new_tx.outputs()