synchronizer/verifier: ensure fairness between wallets (follow-up)
follow-up to 4346d2fc76
It's not just about the Synchronizer, the Verifier should not starve other jobs either...
(previously I thought the Verifier is not too important as it only makes
requests if there are new txs; however with LNWatcher its progress is not persisted)
This commit is contained in:
@@ -110,8 +110,9 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||
# we are verifying channel announcements as they are from untrusted ln peers.
|
||||
# we use electrum servers to do this. however we don't trust electrum servers either...
|
||||
try:
|
||||
result = await self.network.get_txid_from_txpos(
|
||||
block_height, short_channel_id.txpos, True)
|
||||
async with self._network_request_semaphore:
|
||||
result = await self.network.get_txid_from_txpos(
|
||||
block_height, short_channel_id.txpos, True)
|
||||
except aiorpcx.jsonrpc.RPCError:
|
||||
# the electrum server is complaining about the txpos for given block.
|
||||
# it is not clear what to do now, but let's believe the server.
|
||||
@@ -128,7 +129,8 @@ class LNChannelVerifier(NetworkJobOnDefaultServer):
|
||||
# the electrum server sent an incorrect proof. blame is on server, not the ln peer
|
||||
raise GracefulDisconnect(e) from e
|
||||
try:
|
||||
raw_tx = await self.network.get_transaction(tx_hash)
|
||||
async with self._network_request_semaphore:
|
||||
raw_tx = await self.network.get_transaction(tx_hash)
|
||||
except aiorpcx.jsonrpc.RPCError as e:
|
||||
# the electrum server can't find the tx; but it was the
|
||||
# one who told us about the txid!! blame is on server
|
||||
|
||||
@@ -61,9 +61,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
def __init__(self, network: 'Network'):
|
||||
self.asyncio_loop = network.asyncio_loop
|
||||
self._reset_request_counters()
|
||||
# Ensure fairness between Synchronizers. e.g. if multiple wallets
|
||||
# are open, a large wallet should not starve the small wallets:
|
||||
self._network_request_semaphore = asyncio.Semaphore(100)
|
||||
|
||||
NetworkJobOnDefaultServer.__init__(self, network)
|
||||
|
||||
|
||||
@@ -1171,6 +1171,10 @@ class NetworkJobOnDefaultServer(Logger):
|
||||
self.network = network
|
||||
self.interface = None # type: Interface
|
||||
self._restart_lock = asyncio.Lock()
|
||||
# Ensure fairness between NetworkJobs. e.g. if multiple wallets
|
||||
# are open, a large wallet's Synchronizer should not starve the small wallets:
|
||||
self._network_request_semaphore = asyncio.Semaphore(100)
|
||||
|
||||
self._reset()
|
||||
asyncio.run_coroutine_threadsafe(self._restart(), network.asyncio_loop)
|
||||
register_callback(self._restart, ['default_server_changed'])
|
||||
|
||||
@@ -96,7 +96,8 @@ class SPV(NetworkJobOnDefaultServer):
|
||||
|
||||
async def _request_and_verify_single_proof(self, tx_hash, tx_height):
|
||||
try:
|
||||
merkle = await self.network.get_merkle_for_transaction(tx_hash, tx_height)
|
||||
async with self._network_request_semaphore:
|
||||
merkle = await self.network.get_merkle_for_transaction(tx_hash, tx_height)
|
||||
except UntrustedServerReturnedError as e:
|
||||
if not isinstance(e.original_exception, aiorpcx.jsonrpc.RPCError):
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user