From b8d989e13b6c4c1e9dec3d795f511f32e21321ff Mon Sep 17 00:00:00 2001 From: SomberNight Date: Tue, 19 Aug 2025 14:49:38 +0000 Subject: [PATCH] lnpeer: rate-limit reply_channel_range --- electrum/lnpeer.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 216bdd895..1b10d2c40 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -106,7 +106,7 @@ class Peer(Logger, EventListener): assert self.node_ids[0] != self.node_ids[1] self.last_message_time = 0 self.pong_event = asyncio.Event() - self.reply_channel_range = asyncio.Queue() + self.reply_channel_range = None # type: Optional[asyncio.Queue] # gossip uses a single queue to preserve message order self.recv_gossip_queue = asyncio.Queue(maxsize=self.RECV_GOSSIP_QUEUE_HARD_MAXSIZE) self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter] @@ -709,6 +709,7 @@ class Peer(Logger, EventListener): self.outgoing_gossip_reply = False async def get_channel_range(self): + self.reply_channel_range = asyncio.Queue() first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS num_blocks = self.lnworker.network.get_local_height() - first_block self.query_channel_range(first_block, num_blocks) @@ -747,6 +748,7 @@ class Peer(Logger, EventListener): a, b = intervals[0] if a <= first_block and b >= first_block + num_blocks: break + self.reply_channel_range = None return ids, complete def request_gossip(self, timestamp=0): @@ -784,7 +786,7 @@ class Peer(Logger, EventListener): ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)] return ids - def on_reply_channel_range(self, payload): + async def on_reply_channel_range(self, payload): first = payload['first_blocknum'] num = payload['number_of_blocks'] complete = bool(int.from_bytes(payload['sync_complete'], 'big')) @@ -792,6 +794,12 @@ class Peer(Logger, EventListener): ids = self.decode_short_ids(encoded) # self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, " # f"num_ids {len(ids)}, complete {complete}") + if self.reply_channel_range is None: + raise Exception("received 'reply_channel_range' without corresponding 'query_channel_range'") + while self.reply_channel_range.qsize() > 10: + # we block process_message until the queue gets consumed + self.logger.info("reply_channel_range queue is overflowing. sleeping...") + await asyncio.sleep(0.1) self.reply_channel_range.put_nowait((first, num, complete, ids)) async def _send_reply_short_channel_ids(self, payload: dict):