wallet: (fix) cannot just piggyback on adb.is_up_to_date()
The wallet needs its own up_to_date logic:
- the adb being up_to_date means all its addresses are synced
- but an HD wallet might decide to roll the gap limit and generate new addresses
- the adb does not know about this...
- the wallet should be considered *not* up_to_date
- relatedly, it is now the wallet that decides to reset the network request counters
- note that wallet.main() was never cleaned up previously.
- now wallet gets its own taskgroup, which is cleaned up in wallet.stop.
Follow-up to adb refactor: 121d8732f1
This commit is contained in:
@@ -672,12 +672,6 @@ class AddressSynchronizer(Logger):
|
||||
with self.lock:
|
||||
status_changed = self._up_to_date != up_to_date
|
||||
self._up_to_date = up_to_date
|
||||
# reset sync state progress indicator
|
||||
if up_to_date:
|
||||
if self.synchronizer:
|
||||
self.synchronizer.reset_request_counters()
|
||||
if self.verifier:
|
||||
self.verifier.reset_request_counters()
|
||||
# fire triggers
|
||||
util.trigger_callback('adb_set_up_to_date', self)
|
||||
if status_changed:
|
||||
@@ -686,6 +680,12 @@ class AddressSynchronizer(Logger):
|
||||
def is_up_to_date(self):
|
||||
return self._up_to_date
|
||||
|
||||
def reset_netrequest_counters(self) -> None:
|
||||
if self.synchronizer:
|
||||
self.synchronizer.reset_request_counters()
|
||||
if self.verifier:
|
||||
self.verifier.reset_request_counters()
|
||||
|
||||
def get_history_sync_state_details(self) -> Tuple[int, int]:
|
||||
nsent, nans = 0, 0
|
||||
if self.synchronizer:
|
||||
|
||||
@@ -47,13 +47,13 @@ import threading
|
||||
import enum
|
||||
import asyncio
|
||||
|
||||
from aiorpcx import timeout_after, TaskTimeout, ignore_after
|
||||
from aiorpcx import timeout_after, TaskTimeout, ignore_after, run_in_thread
|
||||
|
||||
from .i18n import _
|
||||
from .bip32 import BIP32Node, convert_bip32_intpath_to_strpath, convert_bip32_path_to_list_of_uint32
|
||||
from .crypto import sha256
|
||||
from . import util
|
||||
from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup,
|
||||
from .util import (NotEnoughFunds, UserCancelled, profiler, OldTaskGroup, ignore_exceptions,
|
||||
format_satoshis, format_fee_satoshis, NoDynamicFeeEstimates,
|
||||
WalletFileException, BitcoinException,
|
||||
InvalidPassword, format_time, timestamp_to_datetime, Satoshis,
|
||||
@@ -79,7 +79,7 @@ from .invoices import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED, PR_UNCONFIRMED
|
||||
from .contacts import Contacts
|
||||
from .interface import NetworkException
|
||||
from .mnemonic import Mnemonic
|
||||
from .logging import get_logger
|
||||
from .logging import get_logger, Logger
|
||||
from .lnworker import LNWallet
|
||||
from .paymentrequest import PaymentRequest
|
||||
from .util import read_json_file, write_json_file, UserFacingException
|
||||
@@ -267,7 +267,7 @@ class TxWalletDetails(NamedTuple):
|
||||
is_lightning_funding_tx: bool
|
||||
|
||||
|
||||
class Abstract_Wallet(ABC):
|
||||
class Abstract_Wallet(ABC, Logger):
|
||||
"""
|
||||
Wallet classes are created to handle various address generation methods.
|
||||
Completion states (watching-only, single account, no seed, etc) are handled inside classes.
|
||||
@@ -292,6 +292,7 @@ class Abstract_Wallet(ABC):
|
||||
# load addresses needs to be called before constructor for sanity checks
|
||||
db.load_addresses(self.wallet_type)
|
||||
self.keystore = None # type: Optional[KeyStore] # will be set by load_keystore
|
||||
Logger.__init__(self)
|
||||
|
||||
self.network = None
|
||||
self.adb = AddressSynchronizer(db, config)
|
||||
@@ -300,6 +301,8 @@ class Abstract_Wallet(ABC):
|
||||
self.lock = self.adb.lock
|
||||
self.transaction_lock = self.adb.transaction_lock
|
||||
|
||||
self.taskgroup = OldTaskGroup()
|
||||
|
||||
# saved fields
|
||||
self.use_change = db.get('use_change', True)
|
||||
self.multiple_change = db.get('multiple_change', False)
|
||||
@@ -321,25 +324,45 @@ class Abstract_Wallet(ABC):
|
||||
self.contacts = Contacts(self.db)
|
||||
self._coin_price_cache = {}
|
||||
|
||||
# true when synchronized. this is stricter than adb.is_up_to_date():
|
||||
# to-be-generated (HD) addresses are also considered here (gap-limit-roll-forward)
|
||||
self._up_to_date = False
|
||||
|
||||
self.lnworker = None
|
||||
self.load_keystore()
|
||||
self.test_addresses_sanity()
|
||||
# callbacks
|
||||
util.register_callback(self.on_adb_set_up_to_date, ['adb_set_up_to_date'])
|
||||
util.register_callback(self.on_adb_added_tx, ['adb_added_tx'])
|
||||
util.register_callback(self.on_adb_added_verified_tx, ['adb_added_verified_tx'])
|
||||
util.register_callback(self.on_adb_removed_verified_tx, ['adb_removed_verified_tx'])
|
||||
|
||||
async def main(self):
|
||||
from aiorpcx import run_in_thread
|
||||
# calls synchronize
|
||||
@ignore_exceptions # don't kill outer taskgroup
|
||||
async def main_loop(self):
|
||||
self.logger.info("starting taskgroup.")
|
||||
try:
|
||||
async with self.taskgroup as group:
|
||||
await group.spawn(asyncio.Event().wait) # run forever (until cancel)
|
||||
await group.spawn(self.do_synchronize_loop())
|
||||
except Exception as e:
|
||||
self.logger.exception("taskgroup died.")
|
||||
finally:
|
||||
self.logger.info("taskgroup stopped.")
|
||||
|
||||
async def do_synchronize_loop(self):
|
||||
"""Generates new deterministic addresses if needed (gap limit roll-forward),
|
||||
and sets up_to_date.
|
||||
"""
|
||||
while True:
|
||||
# polling.
|
||||
# TODO if adb had "up_to_date_changed" asyncio.Event(), we could *also* trigger on that.
|
||||
# The polling would still be useful as often need to gen new addrs while adb.is_up_to_date() is False
|
||||
await asyncio.sleep(0.1)
|
||||
# note: we only generate new HD addresses if the existing ones
|
||||
# have history that are mined and SPV-verified. This inherently couples
|
||||
# the Sychronizer and the Verifier.
|
||||
# have history that are mined and SPV-verified.
|
||||
num_new_addrs = await run_in_thread(self.synchronize)
|
||||
up_to_date = self.adb.is_up_to_date() and num_new_addrs == 0
|
||||
if self.is_up_to_date() != up_to_date:
|
||||
self.set_up_to_date(up_to_date)
|
||||
|
||||
def save_db(self):
|
||||
if self.storage:
|
||||
@@ -398,32 +421,37 @@ class Abstract_Wallet(ABC):
|
||||
|
||||
async def stop(self):
|
||||
"""Stop all networking and save DB to disk."""
|
||||
util.unregister_callback(self.on_adb_set_up_to_date)
|
||||
util.unregister_callback(self.on_adb_added_tx)
|
||||
util.unregister_callback(self.on_adb_added_verified_tx)
|
||||
util.unregister_callback(self.on_adb_removed_verified_tx)
|
||||
try:
|
||||
async with ignore_after(5):
|
||||
await self.adb.stop()
|
||||
if self.network:
|
||||
if self.lnworker:
|
||||
await self.lnworker.stop()
|
||||
self.lnworker = None
|
||||
await self.adb.stop()
|
||||
await self.taskgroup.cancel_remaining()
|
||||
finally: # even if we get cancelled
|
||||
if any([ks.is_requesting_to_be_rewritten_to_wallet_file for ks in self.get_keystores()]):
|
||||
self.save_keystore()
|
||||
self.save_db()
|
||||
|
||||
def is_up_to_date(self):
|
||||
return self.adb.is_up_to_date()
|
||||
def is_up_to_date(self) -> bool:
|
||||
return self._up_to_date
|
||||
|
||||
def on_adb_set_up_to_date(self, event, adb):
|
||||
if adb != self.adb:
|
||||
return
|
||||
if adb.is_up_to_date():
|
||||
def set_up_to_date(self, up_to_date: bool) -> None:
|
||||
with self.lock:
|
||||
status_changed = self._up_to_date != up_to_date
|
||||
self._up_to_date = up_to_date
|
||||
if up_to_date:
|
||||
self.adb.reset_netrequest_counters() # sync progress indicator
|
||||
self.save_db()
|
||||
# fire triggers
|
||||
util.trigger_callback('wallet_updated', self)
|
||||
util.trigger_callback('status')
|
||||
if status_changed:
|
||||
self.logger.info(f'set_up_to_date: {up_to_date}')
|
||||
|
||||
def on_adb_added_tx(self, event, adb, tx_hash):
|
||||
if self.adb != adb:
|
||||
@@ -460,7 +488,7 @@ class Abstract_Wallet(ABC):
|
||||
def start_network(self, network):
|
||||
self.network = network
|
||||
if network:
|
||||
asyncio.run_coroutine_threadsafe(self.main(), self.network.asyncio_loop)
|
||||
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
||||
self.adb.start_network(network)
|
||||
if self.lnworker:
|
||||
self.lnworker.start_network(network)
|
||||
|
||||
Reference in New Issue
Block a user