lnpeer: rate-limit reply_channel_range
This commit is contained in:
@@ -106,7 +106,7 @@ class Peer(Logger, EventListener):
|
|||||||
assert self.node_ids[0] != self.node_ids[1]
|
assert self.node_ids[0] != self.node_ids[1]
|
||||||
self.last_message_time = 0
|
self.last_message_time = 0
|
||||||
self.pong_event = asyncio.Event()
|
self.pong_event = asyncio.Event()
|
||||||
self.reply_channel_range = asyncio.Queue()
|
self.reply_channel_range = None # type: Optional[asyncio.Queue]
|
||||||
# gossip uses a single queue to preserve message order
|
# gossip uses a single queue to preserve message order
|
||||||
self.recv_gossip_queue = asyncio.Queue(maxsize=self.RECV_GOSSIP_QUEUE_HARD_MAXSIZE)
|
self.recv_gossip_queue = asyncio.Queue(maxsize=self.RECV_GOSSIP_QUEUE_HARD_MAXSIZE)
|
||||||
self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
self.our_gossip_timestamp_filter = None # type: Optional[GossipTimestampFilter]
|
||||||
@@ -709,6 +709,7 @@ class Peer(Logger, EventListener):
|
|||||||
self.outgoing_gossip_reply = False
|
self.outgoing_gossip_reply = False
|
||||||
|
|
||||||
async def get_channel_range(self):
|
async def get_channel_range(self):
|
||||||
|
self.reply_channel_range = asyncio.Queue()
|
||||||
first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
|
first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
|
||||||
num_blocks = self.lnworker.network.get_local_height() - first_block
|
num_blocks = self.lnworker.network.get_local_height() - first_block
|
||||||
self.query_channel_range(first_block, num_blocks)
|
self.query_channel_range(first_block, num_blocks)
|
||||||
@@ -747,6 +748,7 @@ class Peer(Logger, EventListener):
|
|||||||
a, b = intervals[0]
|
a, b = intervals[0]
|
||||||
if a <= first_block and b >= first_block + num_blocks:
|
if a <= first_block and b >= first_block + num_blocks:
|
||||||
break
|
break
|
||||||
|
self.reply_channel_range = None
|
||||||
return ids, complete
|
return ids, complete
|
||||||
|
|
||||||
def request_gossip(self, timestamp=0):
|
def request_gossip(self, timestamp=0):
|
||||||
@@ -784,7 +786,7 @@ class Peer(Logger, EventListener):
|
|||||||
ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
|
ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
|
||||||
return ids
|
return ids
|
||||||
|
|
||||||
def on_reply_channel_range(self, payload):
|
async def on_reply_channel_range(self, payload):
|
||||||
first = payload['first_blocknum']
|
first = payload['first_blocknum']
|
||||||
num = payload['number_of_blocks']
|
num = payload['number_of_blocks']
|
||||||
complete = bool(int.from_bytes(payload['sync_complete'], 'big'))
|
complete = bool(int.from_bytes(payload['sync_complete'], 'big'))
|
||||||
@@ -792,6 +794,12 @@ class Peer(Logger, EventListener):
|
|||||||
ids = self.decode_short_ids(encoded)
|
ids = self.decode_short_ids(encoded)
|
||||||
# self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, "
|
# self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, "
|
||||||
# f"num_ids {len(ids)}, complete {complete}")
|
# f"num_ids {len(ids)}, complete {complete}")
|
||||||
|
if self.reply_channel_range is None:
|
||||||
|
raise Exception("received 'reply_channel_range' without corresponding 'query_channel_range'")
|
||||||
|
while self.reply_channel_range.qsize() > 10:
|
||||||
|
# we block process_message until the queue gets consumed
|
||||||
|
self.logger.info("reply_channel_range queue is overflowing. sleeping...")
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
self.reply_channel_range.put_nowait((first, num, complete, ids))
|
self.reply_channel_range.put_nowait((first, num, complete, ids))
|
||||||
|
|
||||||
async def _send_reply_short_channel_ids(self, payload: dict):
|
async def _send_reply_short_channel_ids(self, payload: dict):
|
||||||
|
|||||||
Reference in New Issue
Block a user