lnworker: limit max number of incoming channel-less peers
This commit is contained in:
@@ -202,6 +202,7 @@ class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
|
|||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
self.node_keypair = node_keypair
|
self.node_keypair = node_keypair
|
||||||
self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock
|
self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock
|
||||||
|
self._channelless_incoming_peers = set() # type: Set[bytes] # node_ids # needs self.lock
|
||||||
self.taskgroup = OldTaskGroup()
|
self.taskgroup = OldTaskGroup()
|
||||||
self.listen_server = None # type: Optional[asyncio.AbstractServer]
|
self.listen_server = None # type: Optional[asyncio.AbstractServer]
|
||||||
self.features = features
|
self.features = features
|
||||||
@@ -252,13 +253,15 @@ class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
|
|||||||
return
|
return
|
||||||
addr = str(netaddr.host)
|
addr = str(netaddr.host)
|
||||||
|
|
||||||
async def cb(reader, writer):
|
async def cb(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||||
transport = LNResponderTransport(self.node_keypair.privkey, reader, writer)
|
transport = LNResponderTransport(self.node_keypair.privkey, reader, writer)
|
||||||
try:
|
try:
|
||||||
node_id = await transport.handshake()
|
node_id = await transport.handshake()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.info(f'handshake failure from incoming connection: {e!r}')
|
self.logger.info(f'handshake failure from incoming connection: {e!r}')
|
||||||
return
|
return
|
||||||
|
peername = writer.get_extra_info('peername')
|
||||||
|
self.logger.debug(f"handshake done for incoming peer: {peername=}, node_id={node_id.hex()}")
|
||||||
await self._add_peer_from_transport(node_id=node_id, transport=transport)
|
await self._add_peer_from_transport(node_id=node_id, transport=transport)
|
||||||
try:
|
try:
|
||||||
self.listen_server = await asyncio.start_server(cb, addr, netaddr.port)
|
self.listen_server = await asyncio.start_server(cb, addr, netaddr.port)
|
||||||
@@ -315,11 +318,29 @@ class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
|
|||||||
# both keep trying to reconnect, resulting in neither being usable.
|
# both keep trying to reconnect, resulting in neither being usable.
|
||||||
if existing_peer.is_initialized():
|
if existing_peer.is_initialized():
|
||||||
# give priority to the existing connection
|
# give priority to the existing connection
|
||||||
return
|
transport.close()
|
||||||
|
return None
|
||||||
else:
|
else:
|
||||||
# Use the new connection. (e.g. old peer might be an outgoing connection
|
# Use the new connection. (e.g. old peer might be an outgoing connection
|
||||||
# for an outdated host/port that will never connect)
|
# for an outdated host/port that will never connect)
|
||||||
existing_peer.close_and_cleanup()
|
existing_peer.close_and_cleanup()
|
||||||
|
# limit max number of incoming channel-less peers.
|
||||||
|
# what to do if limit is reached?
|
||||||
|
# - chosen strategy: we don't allow new connections.
|
||||||
|
# - drawback: attacker can use up all our slots
|
||||||
|
# - alternative: kick oldest channel-less peer
|
||||||
|
# - drawback: if many legit peers want to connect to us, we will keep kicking them
|
||||||
|
# in round-robin, and they will keep reconnecting. no stable state -> we self-DOS
|
||||||
|
# TODO make slots IP-based?
|
||||||
|
if isinstance(transport, LNResponderTransport):
|
||||||
|
assert node_id not in self._channelless_incoming_peers
|
||||||
|
chans = [chan for chan in self.channels_for_peer(node_id).values() if chan.is_funded()]
|
||||||
|
if not chans:
|
||||||
|
if len(self._channelless_incoming_peers) > 100:
|
||||||
|
transport.close()
|
||||||
|
return None
|
||||||
|
self._channelless_incoming_peers.add(node_id)
|
||||||
|
# checks done: we are adding this peer.
|
||||||
peer = Peer(self, node_id, transport)
|
peer = Peer(self, node_id, transport)
|
||||||
assert node_id not in self._peers
|
assert node_id not in self._peers
|
||||||
self._peers[node_id] = peer
|
self._peers[node_id] = peer
|
||||||
@@ -331,6 +352,7 @@ class LNWorker(Logger, EventListener, NetworkRetryManager[LNPeerAddr]):
|
|||||||
peer2 = self._peers.get(peer.pubkey)
|
peer2 = self._peers.get(peer.pubkey)
|
||||||
if peer2 is peer:
|
if peer2 is peer:
|
||||||
self._peers.pop(peer.pubkey)
|
self._peers.pop(peer.pubkey)
|
||||||
|
self._channelless_incoming_peers.discard(peer.pubkey)
|
||||||
|
|
||||||
def num_peers(self) -> int:
|
def num_peers(self) -> int:
|
||||||
return sum([p.is_initialized() for p in self.peers.values()])
|
return sum([p.is_initialized() for p in self.peers.values()])
|
||||||
|
|||||||
Reference in New Issue
Block a user