synchronizer: small refactor _on_address_status
This commit is contained in:
@@ -24,7 +24,7 @@
|
||||
# SOFTWARE.
|
||||
import asyncio
|
||||
import hashlib
|
||||
from typing import Dict, List, TYPE_CHECKING, Tuple, Set
|
||||
from typing import Dict, List, TYPE_CHECKING, Tuple, Set, Optional, Sequence
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
|
||||
@@ -45,7 +45,7 @@ if TYPE_CHECKING:
|
||||
class SynchronizerFailure(Exception): pass
|
||||
|
||||
|
||||
def history_status(h):
|
||||
def history_status(h: Sequence[tuple[str, int]]) -> Optional[str]:
|
||||
if not h:
|
||||
return None
|
||||
status = ''
|
||||
@@ -96,7 +96,7 @@ class SynchronizerBase(NetworkJobOnDefaultServer):
|
||||
finally:
|
||||
self._adding_addrs.discard(addr) # ok for addr not to be present
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
async def _on_address_status(self, addr: str, status: Optional[str]):
|
||||
"""Handle the change of the status of an address.
|
||||
Should remove addr from self._handling_addr_statuses when done.
|
||||
"""
|
||||
@@ -160,10 +160,19 @@ class Synchronizer(SynchronizerBase):
|
||||
and not self._stale_histories
|
||||
and self.status_queue.empty())
|
||||
|
||||
async def _maybe_request_history_for_addr(self, addr: str) -> List[dict]:
|
||||
sh = address_to_scripthash(addr)
|
||||
self._requests_sent += 1
|
||||
async with self._network_request_semaphore:
|
||||
result = await self.interface.get_history_for_scripthash(sh)
|
||||
self._requests_answered += 1
|
||||
self.logger.info(f"receiving history {addr} {len(result)}")
|
||||
return result
|
||||
|
||||
async def _on_address_status(self, addr, status):
|
||||
try:
|
||||
history = self.adb.db.get_addr_history(addr)
|
||||
if history_status(history) == status:
|
||||
old_history = self.adb.db.get_addr_history(addr)
|
||||
if history_status(old_history) == status:
|
||||
return
|
||||
# No point in requesting history twice for the same announced status.
|
||||
# However if we got announced a new status, we should request history again:
|
||||
@@ -174,12 +183,7 @@ class Synchronizer(SynchronizerBase):
|
||||
self._stale_histories.pop(addr, asyncio.Future()).cancel()
|
||||
finally:
|
||||
self._handling_addr_statuses.discard(addr)
|
||||
h = address_to_scripthash(addr)
|
||||
self._requests_sent += 1
|
||||
async with self._network_request_semaphore:
|
||||
result = await self.interface.get_history_for_scripthash(h)
|
||||
self._requests_answered += 1
|
||||
self.logger.info(f"receiving history {addr} {len(result)}")
|
||||
result = await self._maybe_request_history_for_addr(addr)
|
||||
hist = list(map(lambda item: (item['tx_hash'], item['height']), result))
|
||||
# tx_fees
|
||||
tx_fees = [(item['tx_hash'], item.get('fee')) for item in result]
|
||||
@@ -242,7 +246,7 @@ class Synchronizer(SynchronizerBase):
|
||||
raise SynchronizerFailure(f"received tx does not match expected txid ({tx_hash} != {tx.txid()})")
|
||||
self.requested_tx.remove(tx_hash)
|
||||
self.adb.receive_tx_callback(tx)
|
||||
self.logger.info(f"received tx {tx_hash}. bytes: {len(raw_tx)}")
|
||||
self.logger.info(f"received tx {tx_hash}. bytes-len: {len(raw_tx)//2}")
|
||||
|
||||
async def main(self):
|
||||
self.adb.up_to_date_changed()
|
||||
|
||||
Reference in New Issue
Block a user