diff --git a/electrum/txbatcher.py b/electrum/txbatcher.py index 419621c62..ccf614365 100644 --- a/electrum/txbatcher.py +++ b/electrum/txbatcher.py @@ -352,7 +352,7 @@ class TxBatch(Logger): # todo: require more than one confirmation return len(self.batch_inputs) == 0 and len(self.batch_payments) == 0 and len(self._batch_txids) == 0 - def find_base_tx(self) -> Optional[PartialTransaction]: + async def find_base_tx(self) -> Optional[PartialTransaction]: if not self._prevout: return None prev_txid, index = self._prevout.split(':') @@ -376,17 +376,16 @@ class TxBatch(Logger): self.logger.info(f'base tx confirmed {txid}') self._clear_unconfirmed_sweeps(tx) self._start_new_batch(tx) - elif tx_mined_status.height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]: - # fixme: adb may return TX_HEIGHT_LOCAL when not up to date - if self.wallet.adb.is_up_to_date(): - self.logger.info(f'removing local base_tx {txid}') - self.wallet.adb.remove_transaction(txid) - self._start_new_batch(None) + if tx_mined_status.height in [TX_HEIGHT_LOCAL]: + # this may happen if our Electrum server is unresponsive + # server could also be lying to us. Rebroadcasting might + # help, if we have switched to another server. + await self.wallet.network.try_broadcasting(tx, 'batch') return self._base_tx async def run_iteration(self) -> None: - base_tx = self.find_base_tx() + base_tx = await self.find_base_tx() try: tx = self.create_next_transaction(base_tx) except NoDynamicFeeEstimates: @@ -419,15 +418,33 @@ class TxBatch(Logger): self._new_base_tx(tx) if not await self.wallet.network.try_broadcasting(tx, 'batch'): - # most likely reason is that base_tx is not replaceable - # this may be the case if it has children (because we don't pay enough fees to replace them) - # or if we are trying to sweep unconfirmed inputs (replacement-adds-unconfirmed error) self.logger.info(f'cannot broadcast tx {tx.txid()}') - self.wallet.adb.remove_transaction(tx.txid()) if base_tx: + # The most likely cause is that base_tx is not + # replaceable. This may be the case if it has children + # (because we don't pay enough fees to replace them) + # or if we are trying to sweep unconfirmed inputs + # (replacement-adds-unconfirmed error) + + # it is OK to remove the transaction, because + # create_next_transaction will create a new tx that is + # incompatible with the one we remove here, so we + # cannot double pay. + self.wallet.adb.remove_transaction(tx.txid()) self.logger.info(f'starting new batch because could not broadcast') self._start_new_batch(base_tx) - + else: + # it is dangerous to remove the transaction if there + # is no base_tx. Indeed, the transaction might have + # been broadcast. So, we just keep the transaction as + # local, and we will try to rebroadcast it later (see + # above). + # + # FIXME: it should be possible to ensure that + # create_next_transaction creates transactions that + # spend the same coins, using self._prevout. This + # would make them incompatible, and safe to broadcast. + pass async def sign_transaction(self, tx: PartialTransaction) -> Optional[PartialTransaction]: tx.add_info_from_wallet(self.wallet) # this adds input amounts diff --git a/tests/test_txbatcher.py b/tests/test_txbatcher.py index d257e5799..c7a1b07ce 100644 --- a/tests/test_txbatcher.py +++ b/tests/test_txbatcher.py @@ -222,34 +222,6 @@ class TestTxBatcher(ElectrumTestCase): assert new_tx.inputs()[0].prevout == tx.inputs()[0].prevout == txin.prevout assert output1 in new_tx.outputs() - @mock.patch.object(wallet.Abstract_Wallet, 'save_db') - async def test_remove_local_base_tx(self, mock_save_db): - """ - The swap claim tx does not get broadcast - we test that txbatcher.find_base_tx() removes the local tx - """ - self.maxDiff = None - # create wallet - wallet = self._create_wallet() - # mock is_up_to_date - wallet.adb.is_up_to_date = lambda: True - # do not broadcast, wait forever - async def do_wait(x, y): - await asyncio.sleep(100000000) - self.network.try_broadcasting = do_wait - # add swap data - wallet.adb.db.transactions[SWAPDATA.funding_txid] = tx = Transaction(SWAP_FUNDING_TX) - wallet.adb.receive_tx_callback(tx, tx_height=1) - wallet.txbatcher.add_sweep_input('default', SWAP_SWEEP_INFO) - txbatch = wallet.txbatcher.tx_batches.get('default') - base_tx = await self._wait_for_base_tx(txbatch) - self.assertEqual(base_tx.txid(), '80a8cbc42de74cb48a09644c1e438c8b39144bd3b55c574f21d89d05c85fed34') - await wallet.stop() - txbatch.batch_inputs.clear() - wallet.start_network(self.network) - base_tx = await self._wait_for_base_tx(txbatch, should_be_none=True) - self.assertEqual(base_tx, None) - async def _wait_for_base_tx(self, txbatch, should_be_none=False): async with timeout_after(10): while True: