# This class batches outgoing payments and incoming utxo sweeps. # It ensures that we do not send a payment twice. # # Explanation of the problem: # Suppose we are asked to send two payments: first o1, and then o2. # We replace tx1(o1) (that pays to o1) with tx1'(o1,o2), that pays to o1 and o2. # tx1 and tx1' use the same inputs, so they cannot both be mined in the same blockchain. # If tx1 is mined instead of tx1', we now need to pay o2, so we will broadcast a new transaction tx2(o2). # However, tx1 may be removed from the blockchain, due to a reorg, and a chain with tx1' can become the valid one. # In that case, we might pay o2 twice: with tx1' and with tx2 # # The following code prevents that by making tx2 a child of tx1. # This is denoted by tx2(tx1|o2). # # Example: # # output 1: tx1(o1) --------------- # \ # output 2: tx1'(o1,o2)------- ----> tx2(tx1|o2) ------ # \ \ \ # output 3: tx1''(o1,o2,o3) \ ---> tx2'(tx1|o2,o3) ----> tx3(tx2|o3) (if tx2 is mined) # \ # --------------------------------> tx3(tx1'|o3) (if tx1' is mined) # # In the above example, we have to make 3 payments. # Suppose we have broadcast tx1, tx1' and tx1'' # - if tx1 gets mined, we broadcast: tx2'(tx1|o2,o3) # - if tx1' gets mined, we broadcast tx3(tx1'|o3) # # Note that there are two possible execution paths that may lead to the creation of tx3: # - as a child of tx2 # - as a child of tx1' # # A batch is a set of incompatible txs, such as [tx1, tx1', tx1'']. # Note that we do not persist older batches. We only persist the current batch in self.batch_txids. # Thus, if we need to broadcast tx2 or tx2', then self.batch_txids is reset, and the old batch is forgotten. # # If we cannot RBF a transaction (because the server returns an error), then we create a new batch, # as if the transaction had been mined. # if cannot_rbf(tx1) -> broadcast tx2(tx1,o2). The new base is now tx2(tx,o2) # if cannot_rbf(tx1') -> broadcast tx3(tx1'|o3) # # # Notes: # # 1. CPFP: # When a batch is forgotten but not mined (because the server returned an error), we no longer bump its fee. # However, the current code does not theat the next batch as a CPFP when computing the fee. # # 2. Reorgs: # This code does not guarantee that a payment or a sweep will happen. # This is fine for sweeps; it is the responsibility of the caller (lnwatcher) to add them again. # To make payments reorg-safe, we would need to persist more data and redo failed payments. # # 3. batch_payments and batch_inputs are not persisted. # In the case of sweeps, lnwatcher ensures that SweepInfo is added again after a client restart. # In order to generalize that logic to payments, callers would need to pass a unique ID along with # the payment output, so that we can prevent paying twice. import asyncio import threading import copy from typing import Dict, Sequence, Optional, TYPE_CHECKING, Mapping, Set, List, Tuple from . import util from .bitcoin import dust_threshold from .logging import Logger from .util import log_exceptions, NotEnoughFunds, BelowDustLimit, NoDynamicFeeEstimates, OldTaskGroup from .transaction import PartialTransaction, PartialTxOutput, Transaction, TxOutpoint, PartialTxInput from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE from .lnsweep import SweepInfo from .json_db import locked, StoredDict from .fee_policy import FeePolicy if TYPE_CHECKING: from .wallet import Abstract_Wallet class TxBatcher(Logger): SLEEP_INTERVAL = 1 def __init__(self, wallet: 'Abstract_Wallet'): Logger.__init__(self) self.lock = threading.RLock() self.storage = wallet.db.get_stored_item("tx_batches", {}) self.tx_batches = {} # type: Dict[str, TxBatch] self.wallet = wallet for key, item_storage in self.storage.items(): self.tx_batches[key] = TxBatch(self.wallet, item_storage) self._legacy_htlcs = {} # type: Dict[TxOutpoint, SweepInfo] self.taskgroup = None # type: Optional[OldTaskGroup] self.password_future = None # type: Optional[asyncio.Future[Optional[str]]] @locked def add_payment_output(self, key: str, output: 'PartialTxOutput') -> None: batch = self._maybe_create_new_batch(key, fee_policy_name=key) batch.add_payment_output(output) @locked def add_sweep_input(self, key: str, sweep_info: 'SweepInfo') -> None: """Can raise BelowDustLimit or NoDynamicFeeEstimates.""" if sweep_info.txin and sweep_info.txout: # detect legacy htlc using name and csv delay if sweep_info.name in ['received-htlc', 'offered-htlc'] and sweep_info.csv_delay == 0: if sweep_info.txin.prevout not in self._legacy_htlcs: self.logger.info(f'received {sweep_info.name}') self._legacy_htlcs[sweep_info.txin.prevout] = sweep_info return fee_policy_name = key if not sweep_info.can_be_batched: # create a batch only for that input key = sweep_info.txin.prevout.to_str() batch = self._maybe_create_new_batch(key, fee_policy_name) batch.add_sweep_input(sweep_info) def _maybe_create_new_batch(self, key: str, fee_policy_name: str) -> 'TxBatch': assert util.get_running_loop() == util.get_asyncio_loop(), f"this must be run on the asyncio thread!" if key not in self.storage: self.logger.info(f'creating new batch: {key}') self.storage[key] = { 'fee_policy_name': fee_policy_name, 'txids': [], 'prevout': None } self.tx_batches[key] = batch = TxBatch(self.wallet, self.storage[key]) if self.taskgroup: asyncio.ensure_future(self.taskgroup.spawn(self.run_batch(key, batch))) return self.tx_batches[key] @locked def delete_batch(self, key: str) -> None: self.logger.info(f'deleting TxBatch {key}') self.storage.pop(key) self.tx_batches.pop(key) def find_batch_by_prevout(self, prevout: str) -> Optional['TxBatch']: for k, v in self.tx_batches.items(): if v._prevout == prevout: return v return None def find_batch_of_txid(self, txid: str) -> Optional[str]: for k, v in self.tx_batches.items(): if v.is_mine(txid): return k return None def is_mine(self, txid: str) -> bool: # used to prevent GUI from interfering return bool(self.find_batch_of_txid(txid)) async def run_batch(self, key: str, batch: 'TxBatch') -> None: await batch.run() self.delete_batch(key) @log_exceptions async def run(self): self.taskgroup = OldTaskGroup() for key, batch in self.tx_batches.items(): await self.taskgroup.spawn(self.run_batch(key, batch)) async with self.taskgroup as group: await group.spawn(self.redeem_legacy_htlcs()) async def redeem_legacy_htlcs(self) -> None: while True: await asyncio.sleep(self.SLEEP_INTERVAL) for sweep_info in self._legacy_htlcs.values(): await self._maybe_redeem_legacy_htlcs(sweep_info) async def _maybe_redeem_legacy_htlcs(self, sweep_info: 'SweepInfo') -> None: assert sweep_info.csv_delay == 0 local_height = self.wallet.network.get_local_height() wanted_height = sweep_info.cltv_abs if wanted_height - local_height > 0: return outpoint = sweep_info.txin.prevout.to_str() prev_txid, index = outpoint.split(':') if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) if tx_mined_status.height > 0: return if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: return self.logger.info(f'will broadcast standalone tx {sweep_info.name}') tx = PartialTransaction.from_io([sweep_info.txin], [sweep_info.txout], locktime=sweep_info.cltv_abs, version=2) self.wallet.sign_transaction(tx, password=None, ignore_warnings=True) if await self.wallet.network.try_broadcasting(tx, sweep_info.name): self.wallet.adb.add_transaction(tx) async def get_password(self, txid: str) -> Optional[str]: # daemon, android have password in memory password = self.wallet.get_unlocked_password() if password: return password future = self.get_password_future(txid) try: await future except asyncio.CancelledError as e: return None password = future.result() return password @locked def set_password_future(self, password: Optional[str]) -> None: if self.password_future is not None: if password is not None: self.password_future.set_result(password) else: self.password_future.cancel() self.password_future = None util.trigger_callback('password_not_required', self.wallet) @locked def get_password_future(self, txid: str): if self.password_future is None: self.password_future = asyncio.Future() self.password_future.txids = [] self.logger.info(f'password required: {txid}') self.password_future.txids.append(txid) util.trigger_callback('password_required', self.wallet) return self.password_future class TxBatch(Logger): def __init__(self, wallet: 'Abstract_Wallet', storage: StoredDict): Logger.__init__(self) self.wallet = wallet self.storage = storage self.lock = threading.RLock() self.batch_payments = [] # type: List[PartialTxOutput] # payments we need to make self.batch_inputs = {} # type: Dict[TxOutpoint, SweepInfo] # inputs we need to sweep # list of tx that were broadcast. Each tx is a RBF replacement of the previous one. Ony one can get mined. self._prevout = storage.get('prevout') # type: Optional[str] self._batch_txids = storage['txids'] # type: List[str] self._fee_policy_name = storage.get('fee_policy_name', 'default') # type: str self._base_tx = None # type: Optional[PartialTransaction] # current batch tx. last element of batch_txids self._parent_tx = None # type: Optional[PartialTransaction] self._unconfirmed_sweeps = set() # type: Set[TxOutpoint] # inputs we are sweeping (until spending tx is confirmed) @property def fee_policy(self) -> FeePolicy: # this assumes the descriptor is in config.fee_policy cv_name = 'fee_policy' + '.' + self._fee_policy_name descriptor = self.wallet.config.get(cv_name, 'eta:2') return FeePolicy(descriptor) @log_exceptions async def run(self) -> None: while not self.is_done(): await asyncio.sleep(self.wallet.txbatcher.SLEEP_INTERVAL) if not (self.wallet.network and self.wallet.network.is_connected()): continue try: await self.run_iteration() except Exception as e: self.logger.exception(f'TxBatch error: {repr(e)}') break def is_mine(self, txid: str) -> bool: return txid in self._batch_txids @locked def add_payment_output(self, output: 'PartialTxOutput') -> None: # todo: maybe we should raise NotEnoughFunds here self.batch_payments.append(output) def is_dust(self, sweep_info: SweepInfo) -> bool: """Can raise NoDynamicFeeEstimates.""" if sweep_info.is_anchor(): return False if sweep_info.txout is not None: return False value = sweep_info.txin.value_sats() witness_size = len(sweep_info.txin.make_witness(71*b'\x00')) tx_size_vbytes = 84 + witness_size//4 # assumes no batching, sweep to p2wpkh self.logger.info(f'{sweep_info.name} size = {tx_size_vbytes}') fee = self.fee_policy.estimate_fee(tx_size_vbytes, network=self.wallet.network) return value - fee <= dust_threshold() @locked def add_sweep_input(self, sweep_info: 'SweepInfo') -> None: """Can raise BelowDustLimit or NoDynamicFeeEstimates.""" if self.is_dust(sweep_info): # note: this uses the current fee estimates. Just because something is dust # at the current fee levels, if fees go down, it might still become # worthwhile to sweep. So callers might want to retry later. raise BelowDustLimit txin = sweep_info.txin if txin.prevout in self._unconfirmed_sweeps: return # early return if the spending tx is confirmed # if its block is orphaned, the txin will be added again prevout = txin.prevout.to_str() prev_txid, index = prevout.split(':') if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) if tx_mined_status.height > 0: return self._unconfirmed_sweeps.add(txin.prevout) self.logger.info(f'add_sweep_info: {sweep_info.name} {sweep_info.txin.prevout.to_str()}') self.batch_inputs[txin.prevout] = sweep_info @locked def _to_pay_after(self, tx: Optional[PartialTransaction]) -> Sequence[PartialTxOutput]: if not tx: return self.batch_payments to_pay = [] outputs = copy.deepcopy(tx.outputs()) for x in self.batch_payments: if x not in outputs: to_pay.append(x) else: outputs.remove(x) return to_pay @locked def _to_sweep_after(self, tx: Optional[PartialTransaction]) -> Dict[str, SweepInfo]: tx_prevouts = set(txin.prevout for txin in tx.inputs()) if tx else set() result = [] for k, v in list(self.batch_inputs.items()): prevout = v.txin.prevout prev_txid, index = prevout.to_str().split(':') if not self.wallet.adb.db.get_transaction(prev_txid): continue if v.is_anchor(): prev_tx_mined_status = self.wallet.adb.get_tx_height(prev_txid) if prev_tx_mined_status.conf > 0: self.logger.info(f"anchor not needed {k}") self.batch_inputs.pop(k) # note: if the input is already in a batch tx, this will trigger assert error continue if spender_txid := self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)): tx_mined_status = self.wallet.adb.get_tx_height(spender_txid) if tx_mined_status.height not in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: continue if prevout in tx_prevouts: continue result.append((k,v)) return dict(result) def _should_bump_fee(self, base_tx: Optional[PartialTransaction]) -> bool: if base_tx is None: return False if not self.is_mine(base_tx.txid()): return False base_tx_fee = base_tx.get_fee() recommended_fee = self.fee_policy.estimate_fee(base_tx.estimated_size(), network=self.wallet.network) should_bump_fee = base_tx_fee * 1.1 < recommended_fee if should_bump_fee: self.logger.info(f'base tx fee too low {base_tx_fee} < {recommended_fee}. we will bump the fee') return should_bump_fee def is_done(self): # todo: require more than one confirmation return len(self.batch_inputs) == 0 and len(self.batch_payments) == 0 and len(self._batch_txids) == 0 def find_base_tx(self) -> Optional[PartialTransaction]: if not self._prevout: return None prev_txid, index = self._prevout.split(':') txid = self.wallet.adb.db.get_spent_outpoint(prev_txid, int(index)) tx = self.wallet.adb.get_transaction(txid) if txid else None if not tx: return None tx = PartialTransaction.from_tx(tx) tx.add_info_from_wallet(self.wallet) # this sets is_change if self.is_mine(txid): if self._base_tx is None: self.logger.info(f'found base_tx {txid}') self._base_tx = tx else: self.logger.info(f'base tx was replaced by {tx.txid()}') self._new_base_tx(tx) # if tx is confirmed or local, we will start a new batch tx_mined_status = self.wallet.adb.get_tx_height(txid) if tx_mined_status.conf > 0: self.logger.info(f'base tx confirmed {txid}') self._clear_unconfirmed_sweeps(tx) self._start_new_batch(tx) elif tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: # fixme: adb may return TX_HEIGHT_LOCAL when not up to date if self.wallet.adb.is_up_to_date(): self.logger.info(f'removing local base_tx {txid}') self.wallet.adb.remove_transaction(txid) self._start_new_batch(None) return self._base_tx async def run_iteration(self) -> None: base_tx = self.find_base_tx() try: tx = self.create_next_transaction(base_tx) except NoDynamicFeeEstimates: self.logger.debug('no dynamic fee estimates available') return except Exception as e: if base_tx: self.logger.exception(f'Cannot create batch transaction: {repr(e)}') self._start_new_batch(base_tx) return else: # will be caught by txBatcher raise if tx is None: # nothing to do return # add tx to wallet, in order to reserve utxos # note: This saves the tx as local *unsigned*. # It will transition to local and signed, after we broadcast # the signed tx and get it back via the Synchronizer dance. self.wallet.adb.add_transaction(tx) # await password if not await self.sign_transaction(tx): self.wallet.adb.remove_transaction(tx.txid()) return # save local base_tx self._new_base_tx(tx) if not await self.wallet.network.try_broadcasting(tx, 'batch'): # most likely reason is that base_tx is not replaceable # this may be the case if it has children (because we don't pay enough fees to replace them) # or if we are trying to sweep unconfirmed inputs (replacement-adds-unconfirmed error) self.logger.info(f'cannot broadcast tx {tx.txid()}') self.wallet.adb.remove_transaction(tx.txid()) if base_tx: self.logger.info(f'starting new batch because could not broadcast') self._start_new_batch(base_tx) async def sign_transaction(self, tx: PartialTransaction) -> Optional[PartialTransaction]: tx.add_info_from_wallet(self.wallet) # this adds input amounts self.add_sweep_info_to_tx(tx) pw_required = self.wallet.has_keystore_encryption() and tx.requires_keystore() password = await self.wallet.txbatcher.get_password(tx.txid()) if pw_required else None if password is None and pw_required: return None self.wallet.sign_transaction(tx, password) assert tx.is_complete() return tx def create_next_transaction(self, base_tx: Optional[PartialTransaction]) -> Optional[PartialTransaction]: to_pay = self._to_pay_after(base_tx) to_sweep = self._to_sweep_after(base_tx) to_sweep_now = {} for k, v in to_sweep.items(): can_broadcast, wanted_height = self._can_broadcast(v, base_tx) if can_broadcast: to_sweep_now[k] = v else: self.wallet.add_future_tx(v, wanted_height) while True: if not to_pay and not to_sweep_now and not self._should_bump_fee(base_tx): return None try: tx = self._create_batch_tx(base_tx=base_tx, to_sweep=to_sweep_now, to_pay=to_pay) except NotEnoughFunds: if to_pay: k = max(to_pay, key=lambda x: x.value) self.logger.info(f'Not enough funds, removing output {k}') to_pay.remove(k) continue else: self.logger.info(f'Not enough funds, waiting') return None # 100 kb max standardness rule if tx.estimated_size() < 100_000: break to_sweep_now = to_sweep_now[0:len(to_sweep_now)//2] to_pay = to_pay[0:len(to_pay)//2] self.logger.info(f'created tx {tx.txid()} with {len(tx.inputs())} inputs and {len(tx.outputs())} outputs') return tx def add_sweep_info_to_tx(self, base_tx: PartialTransaction) -> None: for txin in base_tx.inputs(): if sweep_info := self.batch_inputs.get(txin.prevout): if hasattr(sweep_info.txin, 'make_witness'): txin.make_witness = sweep_info.txin.make_witness txin.privkey = sweep_info.txin.privkey txin.witness_script = sweep_info.txin.witness_script txin.script_sig = sweep_info.txin.script_sig def _create_batch_tx( self, *, base_tx: Optional[PartialTransaction], to_sweep: Mapping[str, SweepInfo], to_pay: Sequence[PartialTxOutput], ) -> PartialTransaction: self.logger.info(f'to_sweep: {list(to_sweep.keys())}') self.logger.info(f'to_pay: {to_pay}') inputs = [] # type: List[PartialTxInput] outputs = [] # type: List[PartialTxOutput] locktime = base_tx.locktime if base_tx else None # sort inputs so that txin-txout pairs are first for sweep_info in sorted(to_sweep.values(), key=lambda x: not bool(x.txout)): if sweep_info.cltv_abs is not None: if locktime is None or locktime < sweep_info.cltv_abs: # nLockTime must be greater than or equal to the stack operand. locktime = sweep_info.cltv_abs inputs.append(copy.deepcopy(sweep_info.txin)) if sweep_info.txout: outputs.append(sweep_info.txout) self.logger.info(f'locktime: {locktime}') outputs += to_pay inputs += self._create_inputs_from_tx_change(self._parent_tx) if self._parent_tx else [] # create tx coins = self.wallet.get_spendable_coins(nonlocal_only=True) tx = self.wallet.make_unsigned_transaction( coins=coins, fee_policy=self.fee_policy, base_tx=base_tx, inputs=inputs, outputs=outputs, locktime=locktime, BIP69_sort=False, merge_duplicate_outputs=False, ) # this assert will fail if we merge duplicate outputs for o in outputs: assert o in tx.outputs() return tx def _clear_unconfirmed_sweeps(self, tx: PartialTransaction) -> None: # this ensures that we can accept an input again, # in case the sweeping tx has been removed from the blockchain after a reorg for txin in tx.inputs(): if txin.prevout in self._unconfirmed_sweeps: self._unconfirmed_sweeps.remove(txin.prevout) @locked def _start_new_batch(self, tx: Optional[PartialTransaction]) -> None: use_change = tx and tx.has_change() and any([txout in self.batch_payments for txout in tx.outputs()]) self.batch_payments = self._to_pay_after(tx) self.batch_inputs = self._to_sweep_after(tx) self._batch_txids.clear() self._base_tx = None self._parent_tx = tx if use_change else None self._prevout = None @locked def _new_base_tx(self, tx: Transaction) -> None: self._prevout = tx.inputs()[0].prevout.to_str() self.storage['prevout'] = self._prevout if tx.has_change(): self._batch_txids.append(tx.txid()) self._base_tx = tx else: self.logger.info(f'starting new batch because current base tx does not have change') self._start_new_batch(tx) def _create_inputs_from_tx_change(self, parent_tx: PartialTransaction) -> List[PartialTxInput]: inputs = [] for o in parent_tx.get_change_outputs(): coins = self.wallet.adb.get_addr_utxo(o.address) inputs += list(coins.values()) for txin in inputs: txin.nsequence = 0xffffffff - 2 return inputs def _can_broadcast(self, sweep_info: 'SweepInfo', base_tx: 'Transaction') -> Tuple[bool, Optional[int]]: prevout = sweep_info.txin.prevout.to_str() name = sweep_info.name prev_txid, index = prevout.split(':') can_broadcast = True wanted_height_cltv = None wanted_height_csv = None local_height = self.wallet.network.get_local_height() if sweep_info.cltv_abs: wanted_height_cltv = sweep_info.cltv_abs if wanted_height_cltv - local_height > 0: can_broadcast = False prev_height = self.wallet.adb.get_tx_height(prev_txid).height if sweep_info.csv_delay: if prev_height > 0: wanted_height_csv = prev_height + sweep_info.csv_delay - 1 if wanted_height_csv - local_height > 0: can_broadcast = False else: can_broadcast = False wanted_height_csv = local_height + sweep_info.csv_delay if not can_broadcast: wanted_height = max((wanted_height_csv or 0), (wanted_height_cltv or 0)) else: wanted_height = None if base_tx and prev_height <= 0: # we cannot add unconfirmed inputs to existing base_tx (per RBF rules) # thus, we will wait until the current batch is confirmed if can_broadcast: can_broadcast = False wanted_height = local_height + 1 return can_broadcast, wanted_height