lnpeer: rate-limit ordered_message_queues
This commit is contained in:
@@ -12,6 +12,7 @@ import time
|
|||||||
from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set, Callable, Awaitable, List
|
from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union, Set, Callable, Awaitable, List
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import functools
|
import functools
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
import electrum_ecc as ecc
|
import electrum_ecc as ecc
|
||||||
from electrum_ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey
|
from electrum_ecc import ecdsa_sig64_from_r_and_s, ecdsa_der_sig_from_ecdsa_sig64, ECPubkey
|
||||||
@@ -112,7 +113,7 @@ class Peer(Logger, EventListener):
|
|||||||
self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
||||||
self.their_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
self.their_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
||||||
self.outgoing_gossip_reply = False # type: bool
|
self.outgoing_gossip_reply = False # type: bool
|
||||||
self.ordered_message_queues = defaultdict(asyncio.Queue) # type: Dict[bytes, asyncio.Queue] # for messages that are ordered
|
self.ordered_message_queues = defaultdict(partial(asyncio.Queue, maxsize=10)) # type: Dict[bytes, asyncio.Queue] # for messages that are ordered
|
||||||
self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages
|
self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages
|
||||||
self.funding_created_sent = set() # for channels in PREOPENING
|
self.funding_created_sent = set() # for channels in PREOPENING
|
||||||
self.funding_signed_sent = set() # for channels in PREOPENING
|
self.funding_signed_sent = set() # for channels in PREOPENING
|
||||||
@@ -228,6 +229,12 @@ class Peer(Logger, EventListener):
|
|||||||
return
|
return
|
||||||
if message_type in self.ORDERED_MESSAGES:
|
if message_type in self.ORDERED_MESSAGES:
|
||||||
chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
|
chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
|
||||||
|
if (
|
||||||
|
chan_id not in self.channels
|
||||||
|
and chan_id not in self.temp_id_to_id
|
||||||
|
and chan_id not in self.temp_id_to_id.values()
|
||||||
|
):
|
||||||
|
raise Exception(f"received {message_type} for unknown {chan_id.hex()=}")
|
||||||
self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
|
self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
|
||||||
else:
|
else:
|
||||||
if message_type not in ('error', 'warning') and 'channel_id' in payload:
|
if message_type not in ('error', 'warning') and 'channel_id' in payload:
|
||||||
@@ -1085,8 +1092,8 @@ class Peer(Logger, EventListener):
|
|||||||
int.from_bytes(per_commitment_secret_first, 'big'))
|
int.from_bytes(per_commitment_secret_first, 'big'))
|
||||||
|
|
||||||
# store the temp id now, so that it is recognized for e.g. 'error' messages
|
# store the temp id now, so that it is recognized for e.g. 'error' messages
|
||||||
# TODO: this is never cleaned up; the dict grows unbounded until disconnect
|
|
||||||
self.temp_id_to_id[temp_channel_id] = None
|
self.temp_id_to_id[temp_channel_id] = None
|
||||||
|
self._cleanup_temp_channelids()
|
||||||
self.send_message(
|
self.send_message(
|
||||||
"open_channel",
|
"open_channel",
|
||||||
temporary_channel_id=temp_channel_id,
|
temporary_channel_id=temp_channel_id,
|
||||||
@@ -1305,8 +1312,8 @@ class Peer(Logger, EventListener):
|
|||||||
feerate = payload['feerate_per_kw'] # note: we are not validating this
|
feerate = payload['feerate_per_kw'] # note: we are not validating this
|
||||||
temp_chan_id = payload['temporary_channel_id']
|
temp_chan_id = payload['temporary_channel_id']
|
||||||
# store the temp id now, so that it is recognized for e.g. 'error' messages
|
# store the temp id now, so that it is recognized for e.g. 'error' messages
|
||||||
# TODO: this is never cleaned up; the dict grows unbounded until disconnect
|
|
||||||
self.temp_id_to_id[temp_chan_id] = None
|
self.temp_id_to_id[temp_chan_id] = None
|
||||||
|
self._cleanup_temp_channelids()
|
||||||
channel_opening_fee = open_channel_tlvs.get('channel_opening_fee') if open_channel_tlvs else None
|
channel_opening_fee = open_channel_tlvs.get('channel_opening_fee') if open_channel_tlvs else None
|
||||||
if channel_opening_fee:
|
if channel_opening_fee:
|
||||||
# todo check that the fee is reasonable
|
# todo check that the fee is reasonable
|
||||||
@@ -1448,6 +1455,15 @@ class Peer(Logger, EventListener):
|
|||||||
self.send_channel_ready(chan)
|
self.send_channel_ready(chan)
|
||||||
self.lnworker.add_new_channel(chan)
|
self.lnworker.add_new_channel(chan)
|
||||||
|
|
||||||
|
def _cleanup_temp_channelids(self) -> None:
|
||||||
|
self.temp_id_to_id = {
|
||||||
|
tmp_id: chan_id for (tmp_id, chan_id) in self.temp_id_to_id.items()
|
||||||
|
if chan_id not in self.channels
|
||||||
|
}
|
||||||
|
if len(self.temp_id_to_id) > 25:
|
||||||
|
# which one of us is opening all these chans?! let's disconnect
|
||||||
|
raise Exception("temp_id_to_id is getting too large.")
|
||||||
|
|
||||||
async def request_force_close(self, channel_id: bytes):
|
async def request_force_close(self, channel_id: bytes):
|
||||||
"""Try to trigger the remote peer to force-close."""
|
"""Try to trigger the remote peer to force-close."""
|
||||||
await self.initialized
|
await self.initialized
|
||||||
|
|||||||
Reference in New Issue
Block a user