lnpeer: slow down peers sending too much gossip
This commit is contained in:
@@ -78,6 +78,8 @@ class Peer(Logger, EventListener):
|
||||
'query_short_channel_ids', 'reply_short_channel_ids', 'reply_short_channel_ids_end')
|
||||
|
||||
DELAY_INC_MSG_PROCESSING_SLEEP = 0.01
|
||||
RECV_GOSSIP_QUEUE_SOFT_MAXSIZE = 2000
|
||||
RECV_GOSSIP_QUEUE_HARD_MAXSIZE = 5000
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -106,8 +108,9 @@ class Peer(Logger, EventListener):
|
||||
self.pong_event = asyncio.Event()
|
||||
self.reply_channel_range = asyncio.Queue()
|
||||
# gossip uses a single queue to preserve message order
|
||||
self.gossip_queue = asyncio.Queue()
|
||||
self.gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
||||
self.recv_gossip_queue = asyncio.Queue(maxsize=self.RECV_GOSSIP_QUEUE_HARD_MAXSIZE)
|
||||
self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
||||
self.their_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
||||
self.outgoing_gossip_reply = False # type: bool
|
||||
self.ordered_message_queues = defaultdict(asyncio.Queue) # type: Dict[bytes, asyncio.Queue] # for messages that are ordered
|
||||
self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages
|
||||
@@ -399,17 +402,26 @@ class Peer(Logger, EventListener):
|
||||
self.maybe_set_initialized()
|
||||
|
||||
def on_node_announcement(self, payload):
|
||||
if not self.lnworker.uses_trampoline():
|
||||
self.gossip_queue.put_nowait(('node_announcement', payload))
|
||||
if self.lnworker.uses_trampoline():
|
||||
return
|
||||
if self.our_gossip_timestamp_filter is None:
|
||||
return # why is the peer sending this? should we disconnect?
|
||||
self.recv_gossip_queue.put_nowait(('node_announcement', payload))
|
||||
|
||||
def on_channel_announcement(self, payload):
|
||||
if not self.lnworker.uses_trampoline():
|
||||
self.gossip_queue.put_nowait(('channel_announcement', payload))
|
||||
if self.lnworker.uses_trampoline():
|
||||
return
|
||||
if self.our_gossip_timestamp_filter is None:
|
||||
return # why is the peer sending this? should we disconnect?
|
||||
self.recv_gossip_queue.put_nowait(('channel_announcement', payload))
|
||||
|
||||
def on_channel_update(self, payload):
|
||||
self.maybe_save_remote_update(payload)
|
||||
if not self.lnworker.uses_trampoline():
|
||||
self.gossip_queue.put_nowait(('channel_update', payload))
|
||||
if self.lnworker.uses_trampoline():
|
||||
return
|
||||
if self.our_gossip_timestamp_filter is None:
|
||||
return # why is the peer sending this? should we disconnect?
|
||||
self.recv_gossip_queue.put_nowait(('channel_update', payload))
|
||||
|
||||
def on_query_channel_range(self, payload):
|
||||
if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip():
|
||||
@@ -419,7 +431,7 @@ class Peer(Logger, EventListener):
|
||||
if self.outgoing_gossip_reply:
|
||||
return self.send_warning(bytes(32), "received multiple queries at the same time")
|
||||
self.outgoing_gossip_reply = True
|
||||
self.gossip_queue.put_nowait(('query_channel_range', payload))
|
||||
self.recv_gossip_queue.put_nowait(('query_channel_range', payload))
|
||||
|
||||
def on_query_short_channel_ids(self, payload):
|
||||
if self.lnworker == self.lnworker.network.lngossip or not self._should_forward_gossip():
|
||||
@@ -429,7 +441,7 @@ class Peer(Logger, EventListener):
|
||||
if not self._is_valid_short_channel_id_query(payload):
|
||||
return self.send_warning(bytes(32), "invalid query_short_channel_ids")
|
||||
self.outgoing_gossip_reply = True
|
||||
self.gossip_queue.put_nowait(('query_short_channel_ids', payload))
|
||||
self.recv_gossip_queue.put_nowait(('query_short_channel_ids', payload))
|
||||
|
||||
def on_gossip_timestamp_filter(self, payload):
|
||||
if self._should_forward_gossip():
|
||||
@@ -441,11 +453,11 @@ class Peer(Logger, EventListener):
|
||||
if payload.get('chain_hash') != constants.net.rev_genesis_bytes():
|
||||
return
|
||||
filter = GossipTimestampFilter.from_payload(payload)
|
||||
self.gossip_timestamp_filter = filter
|
||||
self.their_gossip_timestamp_filter = filter
|
||||
self.logger.debug(f"got gossip_ts_filter from peer {self.pubkey.hex()}: "
|
||||
f"{str(self.gossip_timestamp_filter)}")
|
||||
f"{str(self.their_gossip_timestamp_filter)}")
|
||||
if filter and not filter.only_forwarding:
|
||||
self.gossip_queue.put_nowait(('gossip_timestamp_filter', None))
|
||||
self.recv_gossip_queue.put_nowait(('gossip_timestamp_filter', None))
|
||||
|
||||
def maybe_save_remote_update(self, payload):
|
||||
if not self.channels:
|
||||
@@ -521,7 +533,7 @@ class Peer(Logger, EventListener):
|
||||
chan_upds = []
|
||||
node_anns = []
|
||||
while True:
|
||||
name, payload = await self.gossip_queue.get()
|
||||
name, payload = await self.recv_gossip_queue.get()
|
||||
if name == 'channel_announcement':
|
||||
chan_anns.append(payload)
|
||||
elif name == 'channel_update':
|
||||
@@ -536,7 +548,7 @@ class Peer(Logger, EventListener):
|
||||
await self.taskgroup.spawn(self._handle_historical_gossip_request())
|
||||
else:
|
||||
raise Exception('unknown message')
|
||||
if self.gossip_queue.empty():
|
||||
if self.recv_gossip_queue.empty():
|
||||
break
|
||||
if self.network.lngossip:
|
||||
await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
|
||||
@@ -577,7 +589,7 @@ class Peer(Logger, EventListener):
|
||||
last_gossip_batch_ts = 0
|
||||
while True:
|
||||
await asyncio.sleep(10)
|
||||
if not self.gossip_timestamp_filter:
|
||||
if not self.their_gossip_timestamp_filter:
|
||||
continue # peer didn't request gossip
|
||||
|
||||
new_gossip, last_lngossip_refresh_ts = await lngossip.get_forwarding_gossip()
|
||||
@@ -589,7 +601,7 @@ class Peer(Logger, EventListener):
|
||||
|
||||
async def _handle_historical_gossip_request(self):
|
||||
"""Called when a peer requests historical gossip with a gossip_timestamp_filter query."""
|
||||
filter = self.gossip_timestamp_filter
|
||||
filter = self.their_gossip_timestamp_filter
|
||||
if not self._should_forward_gossip() or not filter or filter.only_forwarding:
|
||||
return
|
||||
async with self.network.lngossip.gossip_request_semaphore:
|
||||
@@ -603,7 +615,7 @@ class Peer(Logger, EventListener):
|
||||
async def _send_gossip_messages(self, messages: List[GossipForwardingMessage]) -> int:
|
||||
amount_sent = 0
|
||||
for msg in messages:
|
||||
if self.gossip_timestamp_filter.in_range(msg.timestamp) \
|
||||
if self.their_gossip_timestamp_filter.in_range(msg.timestamp) \
|
||||
and self.pubkey != msg.sender_node_id:
|
||||
await self.transport.send_bytes_and_drain(msg.msg)
|
||||
amount_sent += 1
|
||||
@@ -742,11 +754,17 @@ class Peer(Logger, EventListener):
|
||||
self.logger.info('requesting whole channel graph')
|
||||
else:
|
||||
self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).isoformat()}')
|
||||
timestamp_range = 0xFFFFFFFF
|
||||
self.our_gossip_timestamp_filter = GossipTimestampFilter(
|
||||
first_timestamp=timestamp,
|
||||
timestamp_range=timestamp_range,
|
||||
)
|
||||
self.send_message(
|
||||
'gossip_timestamp_filter',
|
||||
chain_hash=constants.net.rev_genesis_bytes(),
|
||||
first_timestamp=timestamp,
|
||||
timestamp_range=b'\xff'*4)
|
||||
timestamp_range=timestamp_range,
|
||||
)
|
||||
|
||||
def query_channel_range(self, first_block, num_blocks):
|
||||
self.logger.info(f'query channel range {first_block} {num_blocks}')
|
||||
@@ -830,6 +848,15 @@ class Peer(Logger, EventListener):
|
||||
# rate-limit message-processing a bit, to make it harder
|
||||
# for a single peer to bog down the event loop / cpu:
|
||||
await asyncio.sleep(self.DELAY_INC_MSG_PROCESSING_SLEEP)
|
||||
# If receiving too much gossip from this peer, we need to slow them down.
|
||||
# note: if the gossip queue gets full, we will disconnect from them
|
||||
# and throw away unprocessed gossip.
|
||||
if self.recv_gossip_queue.qsize() > self.RECV_GOSSIP_QUEUE_SOFT_MAXSIZE:
|
||||
sleep = self.recv_gossip_queue.qsize() / 1000
|
||||
self.logger.debug(
|
||||
f"message_loop sleeping due to getting much gossip. qsize={self.recv_gossip_queue.qsize()}. "
|
||||
f"waiting for existing gossip data to be processed first.")
|
||||
await asyncio.sleep(sleep)
|
||||
|
||||
def on_reply_short_channel_ids_end(self, payload):
|
||||
self.querying.set()
|
||||
|
||||
Reference in New Issue
Block a user