diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 3b516ecdb..216bdd895 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -78,6 +78,8 @@ class Peer(Logger, EventListener): 'query_short_channel_ids', 'reply_short_channel_ids', 'reply_short_channel_ids_end') DELAY_INC_MSG_PROCESSING_SLEEP = 0.01 + RECV_GOSSIP_QUEUE_SOFT_MAXSIZE = 2000 + RECV_GOSSIP_QUEUE_HARD_MAXSIZE = 5000 def __init__( self, @@ -106,8 +108,9 @@ class Peer(Logger, EventListener): self.pong_event = asyncio.Event() self.reply_channel_range = asyncio.Queue() # gossip uses a single queue to preserve message order - self.gossip_queue = asyncio.Queue() - self.gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter] + self.recv_gossip_queue = asyncio.Queue(maxsize=self.RECV_GOSSIP_QUEUE_HARD_MAXSIZE) + self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter] + self.their_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter] self.outgoing_gossip_reply = False # type: bool self.ordered_message_queues = defaultdict(asyncio.Queue) # type: Dict[bytes, asyncio.Queue] # for messages that are ordered self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages @@ -399,17 +402,26 @@ class Peer(Logger, EventListener): self.maybe_set_initialized() def on_node_announcement(self, payload): - if not self.lnworker.uses_trampoline(): - self.gossip_queue.put_nowait(('node_announcement', payload)) + if self.lnworker.uses_trampoline(): + return + if self.our_gossip_timestamp_filter is None: + return # why is the peer sending this? should we disconnect? + self.recv_gossip_queue.put_nowait(('node_announcement', payload)) def on_channel_announcement(self, payload): - if not self.lnworker.uses_trampoline(): - self.gossip_queue.put_nowait(('channel_announcement', payload)) + if self.lnworker.uses_trampoline(): + return + if self.our_gossip_timestamp_filter is None: + return # why is the peer sending this? should we disconnect? + self.recv_gossip_queue.put_nowait(('channel_announcement', payload)) def on_channel_update(self, payload): self.maybe_save_remote_update(payload) - if not self.lnworker.uses_trampoline(): - self.gossip_queue.put_nowait(('channel_update', payload)) + if self.lnworker.uses_trampoline(): + return + if self.our_gossip_timestamp_filter is None: + return # why is the peer sending this? should we disconnect? + self.recv_gossip_queue.put_nowait(('channel_update', payload)) def on_query_channel_range(self, payload): if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip(): @@ -419,7 +431,7 @@ class Peer(Logger, EventListener): if self.outgoing_gossip_reply: return self.send_warning(bytes(32), "received multiple queries at the same time") self.outgoing_gossip_reply = True - self.gossip_queue.put_nowait(('query_channel_range', payload)) + self.recv_gossip_queue.put_nowait(('query_channel_range', payload)) def on_query_short_channel_ids(self, payload): if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip(): @@ -429,7 +441,7 @@ class Peer(Logger, EventListener): if not self._is_valid_short_channel_id_query(payload): return self.send_warning(bytes(32), "invalid query_short_channel_ids") self.outgoing_gossip_reply = True - self.gossip_queue.put_nowait(('query_short_channel_ids', payload)) + self.recv_gossip_queue.put_nowait(('query_short_channel_ids', payload)) def on_gossip_timestamp_filter(self, payload): if self._should_forward_gossip(): @@ -441,11 +453,11 @@ class Peer(Logger, EventListener): if payload.get('chain_hash') != constants.net.rev_genesis_bytes(): return filter = GossipTimestampFilter.from_payload(payload) - self.gossip_timestamp_filter = filter + self.their_gossip_timestamp_filter = filter self.logger.debug(f"got gossip_ts_filter from peer {self.pubkey.hex()}: " - f"{str(self.gossip_timestamp_filter)}") + f"{str(self.their_gossip_timestamp_filter)}") if filter and not filter.only_forwarding: - self.gossip_queue.put_nowait(('gossip_timestamp_filter', None)) + self.recv_gossip_queue.put_nowait(('gossip_timestamp_filter', None)) def maybe_save_remote_update(self, payload): if not self.channels: @@ -521,7 +533,7 @@ class Peer(Logger, EventListener): chan_upds = [] node_anns = [] while True: - name, payload = await self.gossip_queue.get() + name, payload = await self.recv_gossip_queue.get() if name == 'channel_announcement': chan_anns.append(payload) elif name == 'channel_update': @@ -536,7 +548,7 @@ class Peer(Logger, EventListener): await self.taskgroup.spawn(self._handle_historical_gossip_request()) else: raise Exception('unknown message') - if self.gossip_queue.empty(): + if self.recv_gossip_queue.empty(): break if self.network.lngossip: await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds) @@ -577,7 +589,7 @@ class Peer(Logger, EventListener): last_gossip_batch_ts = 0 while True: await asyncio.sleep(10) - if not self.gossip_timestamp_filter: + if not self.their_gossip_timestamp_filter: continue # peer didn't request gossip new_gossip, last_lngossip_refresh_ts = await lngossip.get_forwarding_gossip() @@ -589,7 +601,7 @@ class Peer(Logger, EventListener): async def _handle_historical_gossip_request(self): """Called when a peer requests historical gossip with a gossip_timestamp_filter query.""" - filter = self.gossip_timestamp_filter + filter = self.their_gossip_timestamp_filter if not self._should_forward_gossip() or not filter or filter.only_forwarding: return async with self.network.lngossip.gossip_request_semaphore: @@ -603,7 +615,7 @@ class Peer(Logger, EventListener): async def _send_gossip_messages(self, messages: List[GossipForwardingMessage]) -> int: amount_sent = 0 for msg in messages: - if self.gossip_timestamp_filter.in_range(msg.timestamp) \ + if self.their_gossip_timestamp_filter.in_range(msg.timestamp) \ and self.pubkey != msg.sender_node_id: await self.transport.send_bytes_and_drain(msg.msg) amount_sent += 1 @@ -742,11 +754,17 @@ class Peer(Logger, EventListener): self.logger.info('requesting whole channel graph') else: self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).isoformat()}') + timestamp_range = 0xFFFFFFFF + self.our_gossip_timestamp_filter = GossipTimestampFilter( + first_timestamp=timestamp, + timestamp_range=timestamp_range, + ) self.send_message( 'gossip_timestamp_filter', chain_hash=constants.net.rev_genesis_bytes(), first_timestamp=timestamp, - timestamp_range=b'\xff'*4) + timestamp_range=timestamp_range, + ) def query_channel_range(self, first_block, num_blocks): self.logger.info(f'query channel range {first_block} {num_blocks}') @@ -830,6 +848,15 @@ class Peer(Logger, EventListener): # rate-limit message-processing a bit, to make it harder # for a single peer to bog down the event loop / cpu: await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP) + # If receiving too much gossip from this peer, we need to slow them down. + # note: if the gossip queue gets full, we will disconnect from them + # and throw away unprocessed gossip. + if self.recv_gossip_queue.qsize() > self.RECV_GOSSIP_QUEUE_SOFT_MAXSIZE: + sleep = self.recv_gossip_queue.qsize() / 1000 + self.logger.debug( + f"message_loop sleeping due to getting much gossip. qsize={self.recv_gossip_queue.qsize()}. " + f"waiting for existing gossip data to be processed first.") + await asyncio.sleep(sleep) def on_reply_short_channel_ids_end(self, payload): self.querying.set()