diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 1b10d2c40..90ad535a3 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -12,6 +12,7 @@ import time from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set, Callable, Awaitable, List from datetime import datetime import functools +from functools import partial import electrum_ecc as ecc from electrum_ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey @@ -112,7 +113,7 @@ class Peer(Logger, EventListener): self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter] self.their_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter] self.outgoing_gossip_reply = False # type: bool - self.ordered_message_queues = defaultdict(asyncio.Queue) # type: Dict[bytes, asyncio.Queue] # for messages that are ordered + self.ordered_message_queues = defaultdict(partial(asyncio.Queue, maxsize=10)) # type: Dict[bytes, asyncio.Queue] # for messages that are ordered self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages self.funding_created_sent = set() # for channels in PREOPENING self.funding_signed_sent = set() # for channels in PREOPENING @@ -228,6 +229,12 @@ class Peer(Logger, EventListener): return if message_type in self.ORDERED_MESSAGES: chan_id = payload.get('channel_id') or payload["temporary_channel_id"] + if ( + chan_id not in self.channels + and chan_id not in self.temp_id_to_id + and chan_id not in self.temp_id_to_id.values() + ): + raise Exception(f"received {message_type} for unknown {chan_id.hex()=}") self.ordered_message_queues[chan_id].put_nowait((message_type, payload)) else: if message_type not in ('error', 'warning') and 'channel_id' in payload: @@ -1085,8 +1092,8 @@ class Peer(Logger, EventListener): int.from_bytes(per_commitment_secret_first, 'big')) # store the temp id now, so that it is recognized for e.g. 'error' messages - # TODO: this is never cleaned up; the dict grows unbounded until disconnect self.temp_id_to_id[temp_channel_id] = None + self._cleanup_temp_channelids() self.send_message( "open_channel", temporary_channel_id=temp_channel_id, @@ -1305,8 +1312,8 @@ class Peer(Logger, EventListener): feerate = payload['feerate_per_kw'] # note: we are not validating this temp_chan_id = payload['temporary_channel_id'] # store the temp id now, so that it is recognized for e.g. 'error' messages - # TODO: this is never cleaned up; the dict grows unbounded until disconnect self.temp_id_to_id[temp_chan_id] = None + self._cleanup_temp_channelids() channel_opening_fee = open_channel_tlvs.get('channel_opening_fee') if open_channel_tlvs else None if channel_opening_fee: # todo check that the fee is reasonable @@ -1448,6 +1455,15 @@ class Peer(Logger, EventListener): self.send_channel_ready(chan) self.lnworker.add_new_channel(chan) + def _cleanup_temp_channelids(self) -> None: + self.temp_id_to_id = { + tmp_id: chan_id for (tmp_id, chan_id) in self.temp_id_to_id.items() + if chan_id not in self.channels + } + if len(self.temp_id_to_id) > 25: + # which one of us is opening all these chans?! let's disconnect + raise Exception("temp_id_to_id is getting too large.") + async def request_force_close(self, channel_id: bytes): """Try to trigger the remote peer to force-close.""" await self.initialized