swaps: rate limit swapserver requests
This commit is contained in:
@@ -1518,6 +1518,7 @@ class NostrTransport(SwapServerTransport):
|
||||
self.relay_manager = None # type: Optional[aionostr.Manager]
|
||||
self.taskgroup = OldTaskGroup()
|
||||
self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]]
|
||||
self._swap_server_requests = asyncio.Queue(maxsize=5) # type: asyncio.Queue[dict]
|
||||
|
||||
def __enter__(self):
|
||||
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
||||
@@ -1547,6 +1548,7 @@ class NostrTransport(SwapServerTransport):
|
||||
if self.sm.is_server:
|
||||
tasks = [
|
||||
self.check_direct_messages(),
|
||||
self._handle_requests(),
|
||||
]
|
||||
else:
|
||||
tasks = [
|
||||
@@ -1807,37 +1809,42 @@ class NostrTransport(SwapServerTransport):
|
||||
if fut:
|
||||
fut.set_result(content)
|
||||
elif self.sm.is_server and 'method' in content:
|
||||
try:
|
||||
await self._handle_request(content)
|
||||
except Exception as e:
|
||||
self.logger.exception(f"failed to handle request: {content}")
|
||||
error_response = json.dumps({
|
||||
"error": str(e)[:100],
|
||||
"reply_to": event.id,
|
||||
})
|
||||
await self.taskgroup.spawn(self.send_direct_message(event.pubkey, error_response))
|
||||
if self._swap_server_requests.full():
|
||||
self.logger.warning(f"too many swap requests, dropping incoming request: {event.id[:10]}...")
|
||||
continue
|
||||
await self._swap_server_requests.put(content)
|
||||
else:
|
||||
self.logger.info(f'unknown message {content}')
|
||||
|
||||
@log_exceptions
|
||||
async def _handle_request(self, request: dict) -> None:
|
||||
async def _handle_requests(self) -> None:
|
||||
assert self.sm.is_server
|
||||
# todo: remember event_id of already processed requests
|
||||
method = request.pop('method')
|
||||
event_id = request.pop('event_id')
|
||||
event_pubkey = request.pop('event_pubkey')
|
||||
self.logger.info(f'handle_request: id={event_id} {method} {request}')
|
||||
if method == 'addswapinvoice':
|
||||
r = self.sm.server_add_swap_invoice(request)
|
||||
elif method == 'createswap':
|
||||
r = self.sm.server_create_swap(request)
|
||||
elif method == 'createnormalswap':
|
||||
r = self.sm.server_create_normal_swap(request)
|
||||
else:
|
||||
raise Exception(method)
|
||||
r['reply_to'] = event_id
|
||||
self.logger.debug(f'sending response id={event_id}')
|
||||
await self.taskgroup.spawn(self.send_direct_message(event_pubkey, json.dumps(r), retries=2))
|
||||
while True:
|
||||
await asyncio.sleep(5)
|
||||
request = await self._swap_server_requests.get()
|
||||
event_id = request.pop('event_id')
|
||||
event_pubkey = request.pop('event_pubkey')
|
||||
try:
|
||||
method = request.pop('method')
|
||||
self.logger.info(f'handle_request: id={event_id} {method} {request}')
|
||||
if method == 'addswapinvoice':
|
||||
r = self.sm.server_add_swap_invoice(request)
|
||||
elif method == 'createswap':
|
||||
r = self.sm.server_create_swap(request)
|
||||
elif method == 'createnormalswap':
|
||||
r = self.sm.server_create_normal_swap(request)
|
||||
else:
|
||||
raise Exception(method)
|
||||
r['reply_to'] = event_id
|
||||
self.logger.debug(f'sending response id={event_id}')
|
||||
await self.taskgroup.spawn(self.send_direct_message(event_pubkey, json.dumps(r), retries=2))
|
||||
except Exception as e:
|
||||
self.logger.exception(f"failed to handle {request=}")
|
||||
error_response = json.dumps({
|
||||
"error": str(e)[:100],
|
||||
"reply_to": event_id,
|
||||
})
|
||||
await self.taskgroup.spawn(self.send_direct_message(event_pubkey, error_response))
|
||||
|
||||
def _store_last_swapserver_relays(self, relays: Sequence[str]):
|
||||
self._last_swapserver_relays = relays
|
||||
|
||||
Reference in New Issue
Block a user