Move the part of process_gossip that requires access to channel_db into in LNGossip.
This commit is contained in:
@@ -84,7 +84,6 @@ class Peer(Logger):
|
|||||||
self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
|
self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
|
||||||
assert self.node_ids[0] != self.node_ids[1]
|
assert self.node_ids[0] != self.node_ids[1]
|
||||||
self.network = lnworker.network
|
self.network = lnworker.network
|
||||||
self.channel_db = lnworker.network.channel_db
|
|
||||||
self.ping_time = 0
|
self.ping_time = 0
|
||||||
self.reply_channel_range = asyncio.Queue()
|
self.reply_channel_range = asyncio.Queue()
|
||||||
# gossip uses a single queue to preserve message order
|
# gossip uses a single queue to preserve message order
|
||||||
@@ -261,6 +260,15 @@ class Peer(Logger):
|
|||||||
if chan.short_channel_id == payload['short_channel_id']:
|
if chan.short_channel_id == payload['short_channel_id']:
|
||||||
chan.set_remote_update(payload['raw'])
|
chan.set_remote_update(payload['raw'])
|
||||||
self.logger.info("saved remote_update")
|
self.logger.info("saved remote_update")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# Save (some bounded number of) orphan channel updates for later
|
||||||
|
# as it might be for our own direct channel with this peer
|
||||||
|
# (and we might not yet know the short channel id for that)
|
||||||
|
short_channel_id = ShortChannelID(payload['short_channel_id'])
|
||||||
|
self.orphan_channel_updates[short_channel_id] = payload
|
||||||
|
while len(self.orphan_channel_updates) > 25:
|
||||||
|
self.orphan_channel_updates.popitem(last=False)
|
||||||
|
|
||||||
def on_announcement_signatures(self, chan: Channel, payload):
|
def on_announcement_signatures(self, chan: Channel, payload):
|
||||||
if chan.config[LOCAL].was_announced:
|
if chan.config[LOCAL].was_announced:
|
||||||
@@ -292,8 +300,6 @@ class Peer(Logger):
|
|||||||
await group.spawn(self.process_gossip())
|
await group.spawn(self.process_gossip())
|
||||||
|
|
||||||
async def process_gossip(self):
|
async def process_gossip(self):
|
||||||
await self.channel_db.data_loaded.wait()
|
|
||||||
# verify in peer's TaskGroup so that we fail the connection
|
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
chan_anns = []
|
chan_anns = []
|
||||||
@@ -311,35 +317,10 @@ class Peer(Logger):
|
|||||||
raise Exception('unknown message')
|
raise Exception('unknown message')
|
||||||
if self.gossip_queue.empty():
|
if self.gossip_queue.empty():
|
||||||
break
|
break
|
||||||
self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}')
|
# verify in peer's TaskGroup so that we fail the connection
|
||||||
# note: data processed in chunks to avoid taking sql lock for too long
|
self.verify_channel_announcements(chan_anns)
|
||||||
# channel announcements
|
self.verify_node_announcements(node_anns)
|
||||||
for chan_anns_chunk in chunks(chan_anns, 300):
|
await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
|
||||||
self.verify_channel_announcements(chan_anns_chunk)
|
|
||||||
self.channel_db.add_channel_announcement(chan_anns_chunk)
|
|
||||||
# node announcements
|
|
||||||
for node_anns_chunk in chunks(node_anns, 100):
|
|
||||||
self.verify_node_announcements(node_anns_chunk)
|
|
||||||
self.channel_db.add_node_announcement(node_anns_chunk)
|
|
||||||
# channel updates
|
|
||||||
for chan_upds_chunk in chunks(chan_upds, 1000):
|
|
||||||
categorized_chan_upds = self.channel_db.add_channel_updates(
|
|
||||||
chan_upds_chunk, max_age=self.network.lngossip.max_age)
|
|
||||||
orphaned = categorized_chan_upds.orphaned
|
|
||||||
if orphaned:
|
|
||||||
self.logger.info(f'adding {len(orphaned)} unknown channel ids')
|
|
||||||
orphaned_ids = [c['short_channel_id'] for c in orphaned]
|
|
||||||
await self.network.lngossip.add_new_ids(orphaned_ids)
|
|
||||||
# Save (some bounded number of) orphan channel updates for later
|
|
||||||
# as it might be for our own direct channel with this peer
|
|
||||||
# (and we might not yet know the short channel id for that)
|
|
||||||
for chan_upd_payload in orphaned:
|
|
||||||
short_channel_id = ShortChannelID(chan_upd_payload['short_channel_id'])
|
|
||||||
self.orphan_channel_updates[short_channel_id] = chan_upd_payload
|
|
||||||
while len(self.orphan_channel_updates) > 25:
|
|
||||||
self.orphan_channel_updates.popitem(last=False)
|
|
||||||
if categorized_chan_upds.good:
|
|
||||||
self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}')
|
|
||||||
|
|
||||||
def verify_channel_announcements(self, chan_anns):
|
def verify_channel_announcements(self, chan_anns):
|
||||||
for payload in chan_anns:
|
for payload in chan_anns:
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ from aiorpcx import run_in_thread, TaskGroup, NetAddress
|
|||||||
|
|
||||||
from . import constants, util
|
from . import constants, util
|
||||||
from . import keystore
|
from . import keystore
|
||||||
from .util import profiler
|
from .util import profiler, chunks
|
||||||
from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER
|
from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER
|
||||||
from .util import NetworkRetryManager, JsonRPCClient
|
from .util import NetworkRetryManager, JsonRPCClient
|
||||||
from .lnutil import LN_MAX_FUNDING_SAT
|
from .lnutil import LN_MAX_FUNDING_SAT
|
||||||
@@ -518,6 +518,27 @@ class LNGossip(LNWorker):
|
|||||||
progress_percent = 0
|
progress_percent = 0
|
||||||
return current_est, total_est, progress_percent
|
return current_est, total_est, progress_percent
|
||||||
|
|
||||||
|
async def process_gossip(self, chan_anns, node_anns, chan_upds):
|
||||||
|
await self.channel_db.data_loaded.wait()
|
||||||
|
self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}')
|
||||||
|
# note: data processed in chunks to avoid taking sql lock for too long
|
||||||
|
# channel announcements
|
||||||
|
for chan_anns_chunk in chunks(chan_anns, 300):
|
||||||
|
self.channel_db.add_channel_announcement(chan_anns_chunk)
|
||||||
|
# node announcements
|
||||||
|
for node_anns_chunk in chunks(node_anns, 100):
|
||||||
|
self.channel_db.add_node_announcement(node_anns_chunk)
|
||||||
|
# channel updates
|
||||||
|
for chan_upds_chunk in chunks(chan_upds, 1000):
|
||||||
|
categorized_chan_upds = self.channel_db.add_channel_updates(
|
||||||
|
chan_upds_chunk, max_age=self.max_age)
|
||||||
|
orphaned = categorized_chan_upds.orphaned
|
||||||
|
if orphaned:
|
||||||
|
self.logger.info(f'adding {len(orphaned)} unknown channel ids')
|
||||||
|
orphaned_ids = [c['short_channel_id'] for c in orphaned]
|
||||||
|
await self.add_new_ids(orphaned_ids)
|
||||||
|
if categorized_chan_upds.good:
|
||||||
|
self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}')
|
||||||
|
|
||||||
class LNWallet(LNWorker):
|
class LNWallet(LNWorker):
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user