ln gossip: run sig checks in a thread
to avoid blocking the asyncio event loop
This commit is contained in:
@@ -556,28 +556,30 @@ class LNGossip(LNWorker):
|
||||
# and disconnect only from that peer
|
||||
await self.channel_db.data_loaded.wait()
|
||||
self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}')
|
||||
# note: data processed in chunks to avoid taking sql lock for too long
|
||||
# channel announcements
|
||||
for payload in chan_anns:
|
||||
self.channel_db.verify_channel_announcement(payload)
|
||||
for chan_anns_chunk in chunks(chan_anns, 300):
|
||||
self.channel_db.add_channel_announcements(chan_anns_chunk)
|
||||
def process_chan_anns():
|
||||
for payload in chan_anns:
|
||||
self.channel_db.verify_channel_announcement(payload)
|
||||
self.channel_db.add_channel_announcements(chan_anns)
|
||||
await run_in_thread(process_chan_anns)
|
||||
# node announcements
|
||||
for payload in node_anns:
|
||||
self.channel_db.verify_node_announcement(payload)
|
||||
for node_anns_chunk in chunks(node_anns, 100):
|
||||
self.channel_db.add_node_announcements(node_anns_chunk)
|
||||
def process_node_anns():
|
||||
for payload in node_anns:
|
||||
self.channel_db.verify_node_announcement(payload)
|
||||
self.channel_db.add_node_announcements(node_anns)
|
||||
await run_in_thread(process_node_anns)
|
||||
# channel updates
|
||||
for chan_upds_chunk in chunks(chan_upds, 1000):
|
||||
categorized_chan_upds = self.channel_db.add_channel_updates(
|
||||
chan_upds_chunk, max_age=self.max_age)
|
||||
orphaned = categorized_chan_upds.orphaned
|
||||
if orphaned:
|
||||
self.logger.info(f'adding {len(orphaned)} unknown channel ids')
|
||||
orphaned_ids = [c['short_channel_id'] for c in orphaned]
|
||||
await self.add_new_ids(orphaned_ids)
|
||||
if categorized_chan_upds.good:
|
||||
self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}')
|
||||
categorized_chan_upds = await run_in_thread(partial(
|
||||
self.channel_db.add_channel_updates,
|
||||
chan_upds,
|
||||
max_age=self.max_age))
|
||||
orphaned = categorized_chan_upds.orphaned
|
||||
if orphaned:
|
||||
self.logger.info(f'adding {len(orphaned)} unknown channel ids')
|
||||
orphaned_ids = [c['short_channel_id'] for c in orphaned]
|
||||
await self.add_new_ids(orphaned_ids)
|
||||
if categorized_chan_upds.good:
|
||||
self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds)}')
|
||||
|
||||
|
||||
class LNWallet(LNWorker):
|
||||
|
||||
@@ -13,7 +13,7 @@ def sql(func):
|
||||
"""wrapper for sql methods"""
|
||||
def wrapper(self: 'SqlDB', *args, **kwargs):
|
||||
assert threading.currentThread() != self.sql_thread
|
||||
f = asyncio.Future()
|
||||
f = self.asyncio_loop.create_future()
|
||||
self.db_requests.put((f, func, args, kwargs))
|
||||
return f
|
||||
return wrapper
|
||||
|
||||
Reference in New Issue
Block a user