diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 8adad4752..17c4c2701 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -434,10 +434,12 @@ class AddressSynchronizer(Logger, EventListener): children |= self.get_depending_transactions(other_hash) return children - def receive_tx_callback(self, tx: Transaction, *, tx_height: int) -> None: + def receive_tx_callback(self, tx: Transaction, *, tx_height: Optional[int] = None) -> None: txid = tx.txid() assert txid is not None - self.add_unverified_or_unconfirmed_tx(txid, tx_height) + if tx_height is not None: + # note: tx_height is only set by the unit tests: to inject a tx into the history + self.add_unverified_or_unconfirmed_tx(txid, tx_height) self.add_transaction(tx, allow_unrelated=True) def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]): @@ -618,7 +620,7 @@ class AddressSynchronizer(Logger, EventListener): assert self.is_mine(addr), "address needs to be is_mine to be watched" await self._address_history_changed_events[addr].wait() - def add_unverified_or_unconfirmed_tx(self, tx_hash, tx_height): + def add_unverified_or_unconfirmed_tx(self, tx_hash: str, tx_height: int) -> None: if self.db.is_in_verified_tx(tx_hash): if tx_height <= 0: # tx was previously SPV-verified but now in mempool (probably reorg) @@ -634,7 +636,7 @@ class AddressSynchronizer(Logger, EventListener): else: self.unconfirmed_tx[tx_hash] = tx_height - def remove_unverified_tx(self, tx_hash, tx_height): + def remove_unverified_tx(self, tx_hash: str, tx_height: int) -> None: with self.lock: new_height = self.unverified_tx.get(tx_hash) if new_height == tx_height: diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 3f64fd5f6..3fbdfcddb 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -143,7 +143,7 @@ class Synchronizer(SynchronizerBase): def _reset(self): super()._reset() self._init_done = False - self.requested_tx = {} + self.requested_tx = set() # type: Set[str] self.requested_histories = set() self._stale_histories = dict() # type: Dict[str, asyncio.Task] @@ -208,14 +208,15 @@ class Synchronizer(SynchronizerBase): async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False): # "hist" is a list of [tx_hash, tx_height] lists transaction_hashes = [] - for tx_hash, tx_height in hist: + for tx_hash, _tx_height in hist: if tx_hash in self.requested_tx: continue tx = self.adb.db.get_transaction(tx_hash) if tx and not isinstance(tx, PartialTransaction): continue # already have complete tx transaction_hashes.append(tx_hash) - self.requested_tx[tx_hash] = tx_height + # note: tx_height might change by the time we get the raw_tx + self.requested_tx.add(tx_hash) if not transaction_hashes: return async with OldTaskGroup() as group: @@ -230,7 +231,7 @@ class Synchronizer(SynchronizerBase): except RPCError as e: # most likely, "No such mempool or blockchain transaction" if allow_server_not_finding_tx: - self.requested_tx.pop(tx_hash) + self.requested_tx.remove(tx_hash) return else: raise @@ -239,9 +240,9 @@ class Synchronizer(SynchronizerBase): tx = Transaction(raw_tx) if tx_hash != tx.txid(): raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})") - tx_height = self.requested_tx.pop(tx_hash) - self.adb.receive_tx_callback(tx, tx_height=tx_height) - self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}") + self.requested_tx.remove(tx_hash) + self.adb.receive_tx_callback(tx) + self.logger.info(f"received tx {tx_hash}. bytes: {len(raw_tx)}") async def main(self): self.adb.up_to_date_changed()