synchronizer: ensure fairness between wallets
Scenario (prior to change): User opens wallet1 with 10k addresses, and then immediately opens wallet2 with 100 addresses. wallet1 will synchronise first, fully, and only then will wallet2 start syncing. Now, wallet1 and wallet2 will sync concurrently (and wallet2 will finish much sooner as expected).
This commit is contained in:
@@ -61,6 +61,10 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
|||||||
def __init__(self, network: 'Network'):
|
def __init__(self, network: 'Network'):
|
||||||
self.asyncio_loop = network.asyncio_loop
|
self.asyncio_loop = network.asyncio_loop
|
||||||
self._reset_request_counters()
|
self._reset_request_counters()
|
||||||
|
# Ensure fairness between Synchronizers. e.g. if multiple wallets
|
||||||
|
# are open, a large wallet should not starve the small wallets:
|
||||||
|
self._network_request_semaphore = asyncio.Semaphore(100)
|
||||||
|
|
||||||
NetworkJobOnDefaultServer.__init__(self, network)
|
NetworkJobOnDefaultServer.__init__(self, network)
|
||||||
|
|
||||||
def _reset(self):
|
def _reset(self):
|
||||||
@@ -106,7 +110,8 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
|||||||
self.scripthash_to_address[h] = addr
|
self.scripthash_to_address[h] = addr
|
||||||
self._requests_sent += 1
|
self._requests_sent += 1
|
||||||
try:
|
try:
|
||||||
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
async with self._network_request_semaphore:
|
||||||
|
await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue)
|
||||||
except RPCError as e:
|
except RPCError as e:
|
||||||
if e.message == 'history too large': # no unique error code
|
if e.message == 'history too large': # no unique error code
|
||||||
raise GracefulDisconnect(e, log_level=logging.ERROR) from e
|
raise GracefulDisconnect(e, log_level=logging.ERROR) from e
|
||||||
@@ -167,7 +172,8 @@ class Synchronizer(SynchronizerBase):
|
|||||||
self.requested_histories.add((addr, status))
|
self.requested_histories.add((addr, status))
|
||||||
h = address_to_scripthash(addr)
|
h = address_to_scripthash(addr)
|
||||||
self._requests_sent += 1
|
self._requests_sent += 1
|
||||||
result = await self.interface.get_history_for_scripthash(h)
|
async with self._network_request_semaphore:
|
||||||
|
result = await self.interface.get_history_for_scripthash(h)
|
||||||
self._requests_answered += 1
|
self._requests_answered += 1
|
||||||
self.logger.info(f"receiving history {addr} {len(result)}")
|
self.logger.info(f"receiving history {addr} {len(result)}")
|
||||||
hashes = set(map(lambda item: item['tx_hash'], result))
|
hashes = set(map(lambda item: item['tx_hash'], result))
|
||||||
@@ -210,7 +216,8 @@ class Synchronizer(SynchronizerBase):
|
|||||||
async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
|
async def _get_transaction(self, tx_hash, *, allow_server_not_finding_tx=False):
|
||||||
self._requests_sent += 1
|
self._requests_sent += 1
|
||||||
try:
|
try:
|
||||||
raw_tx = await self.interface.get_transaction(tx_hash)
|
async with self._network_request_semaphore:
|
||||||
|
raw_tx = await self.interface.get_transaction(tx_hash)
|
||||||
except RPCError as e:
|
except RPCError as e:
|
||||||
# most likely, "No such mempool or blockchain transaction"
|
# most likely, "No such mempool or blockchain transaction"
|
||||||
if allow_server_not_finding_tx:
|
if allow_server_not_finding_tx:
|
||||||
|
|||||||
Reference in New Issue
Block a user