@@ -30,6 +30,7 @@ from collections import defaultdict
|
|||||||
from typing import Sequence, List, Tuple, Optional, Dict, NamedTuple, TYPE_CHECKING, Set
|
from typing import Sequence, List, Tuple, Optional, Dict, NamedTuple, TYPE_CHECKING, Set
|
||||||
import binascii
|
import binascii
|
||||||
import base64
|
import base64
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
from .sql_db import SqlDB, sql
|
from .sql_db import SqlDB, sql
|
||||||
@@ -250,6 +251,7 @@ class ChannelDB(SqlDB):
|
|||||||
self._nodes = {}
|
self._nodes = {}
|
||||||
self._addresses = defaultdict(set)
|
self._addresses = defaultdict(set)
|
||||||
self._channels_for_node = defaultdict(set)
|
self._channels_for_node = defaultdict(set)
|
||||||
|
self.data_loaded = asyncio.Event()
|
||||||
|
|
||||||
def update_counts(self):
|
def update_counts(self):
|
||||||
self.num_channels = len(self._channels)
|
self.num_channels = len(self._channels)
|
||||||
@@ -278,6 +280,7 @@ class ChannelDB(SqlDB):
|
|||||||
return LNPeerAddr(host, port, node_id)
|
return LNPeerAddr(host, port, node_id)
|
||||||
|
|
||||||
def get_recent_peers(self):
|
def get_recent_peers(self):
|
||||||
|
assert self.data_loaded.is_set(), "channelDB load_data did not finish yet!"
|
||||||
r = [self.get_last_good_address(x) for x in self._addresses.keys()]
|
r = [self.get_last_good_address(x) for x in self._addresses.keys()]
|
||||||
r = r[-self.NUM_MAX_RECENT_PEERS:]
|
r = r[-self.NUM_MAX_RECENT_PEERS:]
|
||||||
return r
|
return r
|
||||||
@@ -546,6 +549,7 @@ class ChannelDB(SqlDB):
|
|||||||
self.logger.info(f'load data {len(self._channels)} {len(self._policies)} {len(self._channels_for_node)}')
|
self.logger.info(f'load data {len(self._channels)} {len(self._policies)} {len(self._channels_for_node)}')
|
||||||
self.update_counts()
|
self.update_counts()
|
||||||
self.count_incomplete_channels()
|
self.count_incomplete_channels()
|
||||||
|
self.data_loaded.set()
|
||||||
|
|
||||||
def count_incomplete_channels(self):
|
def count_incomplete_channels(self):
|
||||||
out = set()
|
out = set()
|
||||||
|
|||||||
@@ -241,6 +241,7 @@ class Peer(Logger):
|
|||||||
await group.spawn(self.process_gossip())
|
await group.spawn(self.process_gossip())
|
||||||
|
|
||||||
async def process_gossip(self):
|
async def process_gossip(self):
|
||||||
|
await self.channel_db.data_loaded.wait()
|
||||||
# verify in peer's TaskGroup so that we fail the connection
|
# verify in peer's TaskGroup so that we fail the connection
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|||||||
@@ -154,6 +154,7 @@ class LNWorker(Logger):
|
|||||||
|
|
||||||
async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
|
async def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
await self.channel_db.data_loaded.wait()
|
||||||
recent_peers = self.channel_db.get_recent_peers()
|
recent_peers = self.channel_db.get_recent_peers()
|
||||||
# maintenance for last tried times
|
# maintenance for last tried times
|
||||||
# due to this, below we can just test membership in _last_tried_peer
|
# due to this, below we can just test membership in _last_tried_peer
|
||||||
|
|||||||
Reference in New Issue
Block a user