interface: don't request same tx from server that we just broadcast to it
Often when the wallet creates a tx, the flow is: - create unsigned tx - sign tx - broadcast tx, but don't save it in history - server sends notification that status of a subscribed address changed - client calls scripthash.get_history - client sees txid in scripthash.get_history response - client calls blockchain.transaction.get to request missing tx Instead, now when we broadcast a tx on an interface, we cache that tx *for that interface*, and just before calling blockchain.transaction.get, we lookup in the cache. Hence this will often save a network request.
This commit is contained in:
@@ -64,6 +64,7 @@ from .i18n import _
|
|||||||
from .logging import Logger
|
from .logging import Logger
|
||||||
from .transaction import Transaction
|
from .transaction import Transaction
|
||||||
from .fee_policy import FEE_ETA_TARGETS
|
from .fee_policy import FEE_ETA_TARGETS
|
||||||
|
from .lrucache import LRUCache
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .network import Network
|
from .network import Network
|
||||||
@@ -558,6 +559,7 @@ class Interface(Logger):
|
|||||||
self.tip = 0
|
self.tip = 0
|
||||||
|
|
||||||
self._headers_cache = {} # type: Dict[int, bytes]
|
self._headers_cache = {} # type: Dict[int, bytes]
|
||||||
|
self._rawtx_cache = LRUCache(maxsize=20) # type: LRUCache[str, bytes] # txid->rawtx
|
||||||
|
|
||||||
self.fee_estimates_eta = {} # type: Dict[int, int]
|
self.fee_estimates_eta = {} # type: Dict[int, int]
|
||||||
|
|
||||||
@@ -1318,6 +1320,8 @@ class Interface(Logger):
|
|||||||
async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
|
async def get_transaction(self, tx_hash: str, *, timeout=None) -> str:
|
||||||
if not is_hash256_str(tx_hash):
|
if not is_hash256_str(tx_hash):
|
||||||
raise Exception(f"{repr(tx_hash)} is not a txid")
|
raise Exception(f"{repr(tx_hash)} is not a txid")
|
||||||
|
if rawtx_bytes := self._rawtx_cache.get(tx_hash):
|
||||||
|
return rawtx_bytes.hex()
|
||||||
raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
|
raw = await self.session.send_request('blockchain.transaction.get', [tx_hash], timeout=timeout)
|
||||||
# validate response
|
# validate response
|
||||||
if not is_hex_str(raw):
|
if not is_hex_str(raw):
|
||||||
@@ -1329,16 +1333,21 @@ class Interface(Logger):
|
|||||||
raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
|
raise RequestCorrupted(f"cannot deserialize received transaction (txid {tx_hash})") from e
|
||||||
if tx.txid() != tx_hash:
|
if tx.txid() != tx_hash:
|
||||||
raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
|
raise RequestCorrupted(f"received tx does not match expected txid {tx_hash} (got {tx.txid()})")
|
||||||
|
self._rawtx_cache[tx_hash] = bytes.fromhex(raw)
|
||||||
return raw
|
return raw
|
||||||
|
|
||||||
async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
|
async def broadcast_transaction(self, tx: 'Transaction', *, timeout=None) -> None:
|
||||||
"""caller should handle TxBroadcastError and RequestTimedOut"""
|
"""caller should handle TxBroadcastError and RequestTimedOut"""
|
||||||
|
txid_calc = tx.txid()
|
||||||
|
assert txid_calc is not None
|
||||||
|
rawtx = tx.serialize()
|
||||||
|
assert is_hex_str(rawtx)
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Urgent)
|
||||||
if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
|
if any(DummyAddress.is_dummy_address(txout.address) for txout in tx.outputs()):
|
||||||
raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
|
raise DummyAddressUsedInTxException("tried to broadcast tx with dummy address!")
|
||||||
try:
|
try:
|
||||||
out = await self.session.send_request('blockchain.transaction.broadcast', [tx.serialize()], timeout=timeout)
|
out = await self.session.send_request('blockchain.transaction.broadcast', [rawtx], timeout=timeout)
|
||||||
# note: both 'out' and exception messages are untrusted input from the server
|
# note: both 'out' and exception messages are untrusted input from the server
|
||||||
except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
|
except (RequestTimedOut, asyncio.CancelledError, asyncio.TimeoutError):
|
||||||
raise # pass-through
|
raise # pass-through
|
||||||
@@ -1349,10 +1358,14 @@ class Interface(Logger):
|
|||||||
self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
|
self.logger.info(f"broadcast_transaction error2 [DO NOT TRUST THIS MESSAGE]: {error_text_str_to_safe_str(repr(e))}. tx={str(tx)}")
|
||||||
send_exception_to_crash_reporter(e)
|
send_exception_to_crash_reporter(e)
|
||||||
raise TxBroadcastUnknownError() from e
|
raise TxBroadcastUnknownError() from e
|
||||||
if out != tx.txid():
|
if out != txid_calc:
|
||||||
self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
|
self.logger.info(f"unexpected txid for broadcast_transaction [DO NOT TRUST THIS MESSAGE]: "
|
||||||
f"{error_text_str_to_safe_str(out)} != {tx.txid()}. tx={str(tx)}")
|
f"{error_text_str_to_safe_str(out)} != {txid_calc}. tx={str(tx)}")
|
||||||
raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
|
raise TxBroadcastHashMismatch(_("Server returned unexpected transaction ID."))
|
||||||
|
# broadcast succeeded.
|
||||||
|
# We now cache the rawtx, for *this interface only*. The tx likely touches some ismine addresses, affecting
|
||||||
|
# the status of a scripthash we are subscribed to. Caching here will save a future get_transaction RPC.
|
||||||
|
self._rawtx_cache[txid_calc] = bytes.fromhex(rawtx)
|
||||||
|
|
||||||
async def get_history_for_scripthash(self, sh: str) -> List[dict]:
|
async def get_history_for_scripthash(self, sh: str) -> List[dict]:
|
||||||
if not is_hash256_str(sh):
|
if not is_hash256_str(sh):
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import collections
|
||||||
|
|
||||||
import aiorpcx
|
import aiorpcx
|
||||||
from aiorpcx import RPCError
|
from aiorpcx import RPCError
|
||||||
@@ -117,6 +118,7 @@ class ServerSession(aiorpcx.RPCSession, Logger):
|
|||||||
self.txs = {
|
self.txs = {
|
||||||
"bdae818ad3c1f261317738ae9284159bf54874356f186dbc7afd631dc1527fcb": bfh("020000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025100ffffffff0200f2052a010000001600140297bde2689a3c79ffe050583b62f86f2d9dae540000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf90120000000000000000000000000000000000000000000000000000000000000000000000000"),
|
"bdae818ad3c1f261317738ae9284159bf54874356f186dbc7afd631dc1527fcb": bfh("020000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025100ffffffff0200f2052a010000001600140297bde2689a3c79ffe050583b62f86f2d9dae540000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf90120000000000000000000000000000000000000000000000000000000000000000000000000"),
|
||||||
} # type: dict[str, bytes]
|
} # type: dict[str, bytes]
|
||||||
|
self._method_counts = collections.defaultdict(int) # type: dict[str, int]
|
||||||
_active_server_sessions.add(self)
|
_active_server_sessions.add(self)
|
||||||
|
|
||||||
async def connection_lost(self):
|
async def connection_lost(self):
|
||||||
@@ -136,6 +138,7 @@ class ServerSession(aiorpcx.RPCSession, Logger):
|
|||||||
'server.ping': self._handle_ping,
|
'server.ping': self._handle_ping,
|
||||||
}
|
}
|
||||||
handler = handlers.get(request.method)
|
handler = handlers.get(request.method)
|
||||||
|
self._method_counts[request.method] += 1
|
||||||
coro = aiorpcx.handler_invocation(handler, request)()
|
coro = aiorpcx.handler_invocation(handler, request)()
|
||||||
return await coro
|
return await coro
|
||||||
|
|
||||||
@@ -220,11 +223,18 @@ class TestInterface(ElectrumTestCase):
|
|||||||
# try requesting known tx:
|
# try requesting known tx:
|
||||||
rawtx = await interface.get_transaction("bdae818ad3c1f261317738ae9284159bf54874356f186dbc7afd631dc1527fcb")
|
rawtx = await interface.get_transaction("bdae818ad3c1f261317738ae9284159bf54874356f186dbc7afd631dc1527fcb")
|
||||||
self.assertEqual(rawtx, _get_active_server_session().txs["bdae818ad3c1f261317738ae9284159bf54874356f186dbc7afd631dc1527fcb"].hex())
|
self.assertEqual(rawtx, _get_active_server_session().txs["bdae818ad3c1f261317738ae9284159bf54874356f186dbc7afd631dc1527fcb"].hex())
|
||||||
|
self.assertEqual(_get_active_server_session()._method_counts["blockchain.transaction.get"], 2)
|
||||||
|
|
||||||
async def test_transaction_broadcast(self):
|
async def test_transaction_broadcast(self):
|
||||||
interface = await self._start_iface_and_wait_for_sync()
|
interface = await self._start_iface_and_wait_for_sync()
|
||||||
rawtx1 = "020000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025200ffffffff0200f2052a010000001600140297bde2689a3c79ffe050583b62f86f2d9dae540000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf90120000000000000000000000000000000000000000000000000000000000000000000000000"
|
rawtx1 = "020000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff025200ffffffff0200f2052a010000001600140297bde2689a3c79ffe050583b62f86f2d9dae540000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf90120000000000000000000000000000000000000000000000000000000000000000000000000"
|
||||||
tx = Transaction(rawtx1)
|
tx = Transaction(rawtx1)
|
||||||
|
# broadcast
|
||||||
await interface.broadcast_transaction(tx)
|
await interface.broadcast_transaction(tx)
|
||||||
|
self.assertEqual(bfh(rawtx1), _get_active_server_session().txs.get(tx.txid()))
|
||||||
|
# now request tx.
|
||||||
|
# as we just broadcast this same tx, this will hit the client iface cache, and won't call the server.
|
||||||
|
self.assertEqual(_get_active_server_session()._method_counts["blockchain.transaction.get"], 0)
|
||||||
rawtx2 = await interface.get_transaction(tx.txid())
|
rawtx2 = await interface.get_transaction(tx.txid())
|
||||||
self.assertEqual(rawtx1, rawtx2)
|
self.assertEqual(rawtx1, rawtx2)
|
||||||
|
self.assertEqual(_get_active_server_session()._method_counts["blockchain.transaction.get"], 0)
|
||||||
|
|||||||
Reference in New Issue
Block a user