synchronizer: get_transaction should discard tx_height as it can change
tx_height comes from the `get_history` RPC, we then call the `get_transaction` RPC.
By the time the `get_transaction` RPC returns, we might have received another
scripthash status update, called `get_history` again, and updated height for the txid.
Synchronizer._get_transaction() should not call adb.receive_tx_callback() with
the old tx_height (but it was doing exactly that).
Patch to trigger, e.g. regtest failures:
(e.g. for tests.regtest.TestLightningAB.test_extract_preimage)
```
diff --git a/electrum/interface.py b/electrum/interface.py
index 8649652b9c..fce7a1c6de 100644
--- a/electrum/interface.py
+++ b/electrum/interface.py
@@ -991,6 +991,7 @@ class Interface(Logger):
return res
async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
+ await asyncio.sleep(3)
if not is_hash256_str(tx_hash):
raise Exception(f"{repr(tx_hash)} is not a txid")
raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
```
This commit is contained in:
@@ -434,10 +434,12 @@ class AddressSynchronizer(Logger, EventListener):
|
|||||||
children |= self.get_depending_transactions(other_hash)
|
children |= self.get_depending_transactions(other_hash)
|
||||||
return children
|
return children
|
||||||
|
|
||||||
def receive_tx_callback(self, tx: Transaction, *, tx_height: int) -> None:
|
def receive_tx_callback(self, tx: Transaction, *, tx_height: Optional[int] = None) -> None:
|
||||||
txid = tx.txid()
|
txid = tx.txid()
|
||||||
assert txid is not None
|
assert txid is not None
|
||||||
self.add_unverified_or_unconfirmed_tx(txid, tx_height)
|
if tx_height is not None:
|
||||||
|
# note: tx_height is only set by the unit tests: to inject a tx into the history
|
||||||
|
self.add_unverified_or_unconfirmed_tx(txid, tx_height)
|
||||||
self.add_transaction(tx, allow_unrelated=True)
|
self.add_transaction(tx, allow_unrelated=True)
|
||||||
|
|
||||||
def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]):
|
def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]):
|
||||||
@@ -618,7 +620,7 @@ class AddressSynchronizer(Logger, EventListener):
|
|||||||
assert self.is_mine(addr), "address needs to be is_mine to be watched"
|
assert self.is_mine(addr), "address needs to be is_mine to be watched"
|
||||||
await self._address_history_changed_events[addr].wait()
|
await self._address_history_changed_events[addr].wait()
|
||||||
|
|
||||||
def add_unverified_or_unconfirmed_tx(self, tx_hash, tx_height):
|
def add_unverified_or_unconfirmed_tx(self, tx_hash: str, tx_height: int) -> None:
|
||||||
if self.db.is_in_verified_tx(tx_hash):
|
if self.db.is_in_verified_tx(tx_hash):
|
||||||
if tx_height <= 0:
|
if tx_height <= 0:
|
||||||
# tx was previously SPV-verified but now in mempool (probably reorg)
|
# tx was previously SPV-verified but now in mempool (probably reorg)
|
||||||
@@ -634,7 +636,7 @@ class AddressSynchronizer(Logger, EventListener):
|
|||||||
else:
|
else:
|
||||||
self.unconfirmed_tx[tx_hash] = tx_height
|
self.unconfirmed_tx[tx_hash] = tx_height
|
||||||
|
|
||||||
def remove_unverified_tx(self, tx_hash, tx_height):
|
def remove_unverified_tx(self, tx_hash: str, tx_height: int) -> None:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
new_height = self.unverified_tx.get(tx_hash)
|
new_height = self.unverified_tx.get(tx_hash)
|
||||||
if new_height == tx_height:
|
if new_height == tx_height:
|
||||||
|
|||||||
@@ -143,7 +143,7 @@ class Synchronizer(SynchronizerBase):
|
|||||||
def _reset(self):
|
def _reset(self):
|
||||||
super()._reset()
|
super()._reset()
|
||||||
self._init_done = False
|
self._init_done = False
|
||||||
self.requested_tx = {}
|
self.requested_tx = set() # type: Set[str]
|
||||||
self.requested_histories = set()
|
self.requested_histories = set()
|
||||||
self._stale_histories = dict() # type: Dict[str, asyncio.Task]
|
self._stale_histories = dict() # type: Dict[str, asyncio.Task]
|
||||||
|
|
||||||
@@ -208,14 +208,15 @@ class Synchronizer(SynchronizerBase):
|
|||||||
async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False):
|
async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False):
|
||||||
# "hist" is a list of [tx_hash, tx_height] lists
|
# "hist" is a list of [tx_hash, tx_height] lists
|
||||||
transaction_hashes = []
|
transaction_hashes = []
|
||||||
for tx_hash, tx_height in hist:
|
for tx_hash, _tx_height in hist:
|
||||||
if tx_hash in self.requested_tx:
|
if tx_hash in self.requested_tx:
|
||||||
continue
|
continue
|
||||||
tx = self.adb.db.get_transaction(tx_hash)
|
tx = self.adb.db.get_transaction(tx_hash)
|
||||||
if tx and not isinstance(tx, PartialTransaction):
|
if tx and not isinstance(tx, PartialTransaction):
|
||||||
continue # already have complete tx
|
continue # already have complete tx
|
||||||
transaction_hashes.append(tx_hash)
|
transaction_hashes.append(tx_hash)
|
||||||
self.requested_tx[tx_hash] = tx_height
|
# note: tx_height might change by the time we get the raw_tx
|
||||||
|
self.requested_tx.add(tx_hash)
|
||||||
|
|
||||||
if not transaction_hashes: return
|
if not transaction_hashes: return
|
||||||
async with OldTaskGroup() as group:
|
async with OldTaskGroup() as group:
|
||||||
@@ -230,7 +231,7 @@ class Synchronizer(SynchronizerBase):
|
|||||||
except RPCError as e:
|
except RPCError as e:
|
||||||
# most likely, "No such mempool or blockchain transaction"
|
# most likely, "No such mempool or blockchain transaction"
|
||||||
if allow_server_not_finding_tx:
|
if allow_server_not_finding_tx:
|
||||||
self.requested_tx.pop(tx_hash)
|
self.requested_tx.remove(tx_hash)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
@@ -239,9 +240,9 @@ class Synchronizer(SynchronizerBase):
|
|||||||
tx = Transaction(raw_tx)
|
tx = Transaction(raw_tx)
|
||||||
if tx_hash != tx.txid():
|
if tx_hash != tx.txid():
|
||||||
raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
|
raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
|
||||||
tx_height = self.requested_tx.pop(tx_hash)
|
self.requested_tx.remove(tx_hash)
|
||||||
self.adb.receive_tx_callback(tx, tx_height=tx_height)
|
self.adb.receive_tx_callback(tx)
|
||||||
self.logger.info(f"received tx {tx_hash} height: {tx_height} bytes: {len(raw_tx)}")
|
self.logger.info(f"received tx {tx_hash}. bytes: {len(raw_tx)}")
|
||||||
|
|
||||||
async def main(self):
|
async def main(self):
|
||||||
self.adb.up_to_date_changed()
|
self.adb.up_to_date_changed()
|
||||||
|
|||||||
Reference in New Issue
Block a user