address_sync: include verifier in sync_state progress indicator
This commit is contained in:
@@ -672,8 +672,14 @@ class AddressSynchronizer(Logger):
|
||||
with self.lock:
|
||||
status_changed = self._up_to_date != up_to_date
|
||||
self._up_to_date = up_to_date
|
||||
if self.network:
|
||||
self.network.notify('status')
|
||||
# reset sync state progress indicator
|
||||
if up_to_date:
|
||||
if self.synchronizer:
|
||||
self.synchronizer.reset_request_counters()
|
||||
if self.verifier:
|
||||
self.verifier.reset_request_counters()
|
||||
# fire triggers
|
||||
util.trigger_callback('status')
|
||||
if status_changed:
|
||||
self.logger.info(f'set_up_to_date: {up_to_date}')
|
||||
|
||||
@@ -681,10 +687,16 @@ class AddressSynchronizer(Logger):
|
||||
return self._up_to_date
|
||||
|
||||
def get_history_sync_state_details(self) -> Tuple[int, int]:
|
||||
nsent, nans = 0, 0
|
||||
if self.synchronizer:
|
||||
return self.synchronizer.num_requests_sent_and_answered()
|
||||
else:
|
||||
return 0, 0
|
||||
n1, n2 = self.synchronizer.num_requests_sent_and_answered()
|
||||
nsent += n1
|
||||
nans += n2
|
||||
if self.verifier:
|
||||
n1, n2 = self.verifier.num_requests_sent_and_answered()
|
||||
nsent += n1
|
||||
nans += n2
|
||||
return nsent, nans
|
||||
|
||||
@with_transaction_lock
|
||||
def get_tx_delta(self, tx_hash: str, address: str) -> int:
|
||||
|
||||
@@ -60,7 +60,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
"""
|
||||
def __init__(self, network: 'Network'):
|
||||
self.asyncio_loop = network.asyncio_loop
|
||||
self._reset_request_counters()
|
||||
|
||||
NetworkJobOnDefaultServer.__init__(self, network)
|
||||
|
||||
@@ -69,7 +68,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
self.requested_addrs = set()
|
||||
self.scripthash_to_address = {}
|
||||
self._processed_some_notifications = False # so that we don't miss them
|
||||
self._reset_request_counters()
|
||||
# Queues
|
||||
self.add_queue = asyncio.Queue()
|
||||
self.status_queue = asyncio.Queue()
|
||||
@@ -85,10 +83,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
# we are being cancelled now
|
||||
self.session.unsubscribe(self.status_queue)
|
||||
|
||||
def _reset_request_counters(self):
|
||||
self._requests_sent = 0
|
||||
self._requests_answered = 0
|
||||
|
||||
def add(self, addr):
|
||||
asyncio.run_coroutine_threadsafe(self._add_address(addr), self.asyncio_loop)
|
||||
|
||||
@@ -129,9 +123,6 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
await self.taskgroup.spawn(self._on_address_status, addr, status)
|
||||
self._processed_some_notifications = True
|
||||
|
||||
def num_requests_sent_and_answered(self) -> Tuple[int, int]:
|
||||
return self._requests_sent, self._requests_answered
|
||||
|
||||
async def main(self):
|
||||
raise NotImplementedError() # implemented by subclasses
|
||||
|
||||
@@ -271,8 +262,6 @@ class Synchronizer(SynchronizerBase):
|
||||
if (up_to_date != self.wallet.is_up_to_date()
|
||||
or up_to_date and self._processed_some_notifications):
|
||||
self._processed_some_notifications = False
|
||||
if up_to_date:
|
||||
self._reset_request_counters()
|
||||
self.wallet.set_up_to_date(up_to_date)
|
||||
util.trigger_callback('wallet_updated', self.wallet)
|
||||
|
||||
|
||||
@@ -1326,6 +1326,7 @@ class NetworkJobOnDefaultServer(Logger, ABC):
|
||||
server connection changes.
|
||||
"""
|
||||
self.taskgroup = OldTaskGroup()
|
||||
self.reset_request_counters()
|
||||
|
||||
async def _start(self, interface: 'Interface'):
|
||||
self.interface = interface
|
||||
@@ -1357,6 +1358,13 @@ class NetworkJobOnDefaultServer(Logger, ABC):
|
||||
self._reset()
|
||||
await self._start(interface)
|
||||
|
||||
def reset_request_counters(self):
|
||||
self._requests_sent = 0
|
||||
self._requests_answered = 0
|
||||
|
||||
def num_requests_sent_and_answered(self) -> Tuple[int, int]:
|
||||
return self._requests_sent, self._requests_answered
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
s = self.interface.session
|
||||
|
||||
@@ -87,6 +87,7 @@ class SPV(NetworkJobOnDefaultServer):
|
||||
header = self.blockchain.read_header(tx_height)
|
||||
if header is None:
|
||||
if tx_height < constants.net.max_checkpoint():
|
||||
# FIXME these requests are not counted (self._requests_sent += 1)
|
||||
await self.taskgroup.spawn(self.interface.request_chunk(tx_height, None, can_return_early=True))
|
||||
continue
|
||||
# request now
|
||||
@@ -96,6 +97,7 @@ class SPV(NetworkJobOnDefaultServer):
|
||||
|
||||
async def _request_and_verify_single_proof(self, tx_hash, tx_height):
|
||||
try:
|
||||
self._requests_sent += 1
|
||||
async with self._network_request_semaphore:
|
||||
merkle = await self.interface.get_merkle_for_transaction(tx_hash, tx_height)
|
||||
except aiorpcx.jsonrpc.RPCError:
|
||||
@@ -103,6 +105,8 @@ class SPV(NetworkJobOnDefaultServer):
|
||||
self.wallet.remove_unverified_tx(tx_hash, tx_height)
|
||||
self.requested_merkle.discard(tx_hash)
|
||||
return
|
||||
finally:
|
||||
self._requests_answered += 1
|
||||
# Verify the hash of the server-provided merkle branch to a
|
||||
# transaction matches the merkle root of its block
|
||||
if tx_height != merkle.get('block_height'):
|
||||
|
||||
Reference in New Issue
Block a user