multi-wallet: properly stop lnworker/lnwatcher
This commit is contained in:
@@ -70,13 +70,17 @@ class AddressSynchronizer(Logger):
|
|||||||
inherited by wallet
|
inherited by wallet
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
network: Optional['Network']
|
||||||
|
synchronizer: Optional['Synchronizer']
|
||||||
|
verifier: Optional['SPV']
|
||||||
|
|
||||||
def __init__(self, db: 'WalletDB'):
|
def __init__(self, db: 'WalletDB'):
|
||||||
self.db = db
|
self.db = db
|
||||||
self.network = None # type: Network
|
self.network = None
|
||||||
Logger.__init__(self)
|
Logger.__init__(self)
|
||||||
# verifier (SPV) and synchronizer are started in start_network
|
# verifier (SPV) and synchronizer are started in start_network
|
||||||
self.synchronizer = None # type: Synchronizer
|
self.synchronizer = None
|
||||||
self.verifier = None # type: SPV
|
self.verifier = None
|
||||||
# locks: if you need to take multiple ones, acquire them in the order they are defined here!
|
# locks: if you need to take multiple ones, acquire them in the order they are defined here!
|
||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
self.transaction_lock = threading.RLock()
|
self.transaction_lock = threading.RLock()
|
||||||
@@ -156,7 +160,7 @@ class AddressSynchronizer(Logger):
|
|||||||
# add it in case it was previously unconfirmed
|
# add it in case it was previously unconfirmed
|
||||||
self.add_unverified_tx(tx_hash, tx_height)
|
self.add_unverified_tx(tx_hash, tx_height)
|
||||||
|
|
||||||
def start_network(self, network):
|
def start_network(self, network: Optional['Network']) -> None:
|
||||||
self.network = network
|
self.network = network
|
||||||
if self.network is not None:
|
if self.network is not None:
|
||||||
self.synchronizer = Synchronizer(self)
|
self.synchronizer = Synchronizer(self)
|
||||||
@@ -166,7 +170,7 @@ class AddressSynchronizer(Logger):
|
|||||||
def on_blockchain_updated(self, event, *args):
|
def on_blockchain_updated(self, event, *args):
|
||||||
self._get_addr_balance_cache = {} # invalidate cache
|
self._get_addr_balance_cache = {} # invalidate cache
|
||||||
|
|
||||||
def stop_threads(self):
|
def stop(self):
|
||||||
if self.network:
|
if self.network:
|
||||||
if self.synchronizer:
|
if self.synchronizer:
|
||||||
asyncio.run_coroutine_threadsafe(self.synchronizer.stop(), self.network.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(self.synchronizer.stop(), self.network.asyncio_loop)
|
||||||
|
|||||||
@@ -455,7 +455,7 @@ class Daemon(Logger):
|
|||||||
wallet = self._wallets.pop(path, None)
|
wallet = self._wallets.pop(path, None)
|
||||||
if not wallet:
|
if not wallet:
|
||||||
return False
|
return False
|
||||||
wallet.stop_threads()
|
wallet.stop()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def run_cmdline(self, config_options):
|
async def run_cmdline(self, config_options):
|
||||||
@@ -501,7 +501,7 @@ class Daemon(Logger):
|
|||||||
self.gui_object.stop()
|
self.gui_object.stop()
|
||||||
# stop network/wallets
|
# stop network/wallets
|
||||||
for k, wallet in self._wallets.items():
|
for k, wallet in self._wallets.items():
|
||||||
wallet.stop_threads()
|
wallet.stop()
|
||||||
if self.network:
|
if self.network:
|
||||||
self.logger.info("shutting down network")
|
self.logger.info("shutting down network")
|
||||||
self.network.stop()
|
self.network.stop()
|
||||||
|
|||||||
@@ -147,6 +147,10 @@ class LNWatcher(AddressSynchronizer):
|
|||||||
# status gets populated when we run
|
# status gets populated when we run
|
||||||
self.channel_status = {}
|
self.channel_status = {}
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
|
util.unregister_callback(self.on_network_update)
|
||||||
|
|
||||||
def get_channel_status(self, outpoint):
|
def get_channel_status(self, outpoint):
|
||||||
return self.channel_status.get(outpoint, 'unknown')
|
return self.channel_status.get(outpoint, 'unknown')
|
||||||
|
|
||||||
|
|||||||
@@ -254,6 +254,10 @@ class LNWorker(Logger, NetworkRetryManager[LNPeerAddr]):
|
|||||||
self._add_peers_from_config()
|
self._add_peers_from_config()
|
||||||
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
asyncio.run_coroutine_threadsafe(self.taskgroup.cancel_remaining(), self.network.asyncio_loop)
|
||||||
|
util.unregister_callback(self.on_proxy_changed)
|
||||||
|
|
||||||
def _add_peers_from_config(self):
|
def _add_peers_from_config(self):
|
||||||
peer_list = self.config.get('lightning_peers', [])
|
peer_list = self.config.get('lightning_peers', [])
|
||||||
for host, port, pubkey in peer_list:
|
for host, port, pubkey in peer_list:
|
||||||
@@ -569,6 +573,11 @@ class LNWallet(LNWorker):
|
|||||||
tg_coro = self.taskgroup.spawn(coro)
|
tg_coro = self.taskgroup.spawn(coro)
|
||||||
asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
|
self.lnwatcher.stop()
|
||||||
|
self.lnwatcher = None
|
||||||
|
|
||||||
def peer_closed(self, peer):
|
def peer_closed(self, peer):
|
||||||
for chan in self.channels_for_peer(peer.pubkey).values():
|
for chan in self.channels_for_peer(peer.pubkey).values():
|
||||||
chan.peer_state = PeerState.DISCONNECTED
|
chan.peer_state = PeerState.DISCONNECTED
|
||||||
@@ -1404,6 +1413,10 @@ class LNBackups(Logger):
|
|||||||
for cb in self.channel_backups.values():
|
for cb in self.channel_backups.values():
|
||||||
self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
|
self.lnwatcher.add_channel(cb.funding_outpoint.to_str(), cb.get_funding_address())
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.lnwatcher.stop()
|
||||||
|
self.lnwatcher = None
|
||||||
|
|
||||||
def import_channel_backup(self, encrypted):
|
def import_channel_backup(self, encrypted):
|
||||||
xpub = self.wallet.get_fingerprint()
|
xpub = self.wallet.get_fingerprint()
|
||||||
decrypted = pw_decode_bytes(encrypted, xpub, version=PW_HASH_VERSION_LATEST)
|
decrypted = pw_decode_bytes(encrypted, xpub, version=PW_HASH_VERSION_LATEST)
|
||||||
|
|||||||
@@ -222,6 +222,8 @@ class Abstract_Wallet(AddressSynchronizer, ABC):
|
|||||||
|
|
||||||
txin_type: str
|
txin_type: str
|
||||||
wallet_type: str
|
wallet_type: str
|
||||||
|
lnworker: Optional['LNWallet']
|
||||||
|
lnbackups: Optional['LNBackups']
|
||||||
|
|
||||||
def __init__(self, db: WalletDB, storage: Optional[WalletStorage], *, config: SimpleConfig):
|
def __init__(self, db: WalletDB, storage: Optional[WalletStorage], *, config: SimpleConfig):
|
||||||
if not db.is_ready_to_be_used_by_wallet():
|
if not db.is_ready_to_be_used_by_wallet():
|
||||||
@@ -310,10 +312,16 @@ class Abstract_Wallet(AddressSynchronizer, ABC):
|
|||||||
self.db.put('lightning_privkey2', None)
|
self.db.put('lightning_privkey2', None)
|
||||||
self.save_db()
|
self.save_db()
|
||||||
|
|
||||||
def stop_threads(self):
|
def stop(self):
|
||||||
super().stop_threads()
|
super().stop()
|
||||||
if any([ks.is_requesting_to_be_rewritten_to_wallet_file for ks in self.get_keystores()]):
|
if any([ks.is_requesting_to_be_rewritten_to_wallet_file for ks in self.get_keystores()]):
|
||||||
self.save_keystore()
|
self.save_keystore()
|
||||||
|
if self.network:
|
||||||
|
if self.lnworker:
|
||||||
|
self.lnworker.stop()
|
||||||
|
self.lnworker = None
|
||||||
|
self.lnbackups.stop()
|
||||||
|
self.lnbackups = None
|
||||||
self.save_db()
|
self.save_db()
|
||||||
|
|
||||||
def set_up_to_date(self, b):
|
def set_up_to_date(self, b):
|
||||||
|
|||||||
Reference in New Issue
Block a user