diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index 814cf12dd..616dae332 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -1518,6 +1518,7 @@ class NostrTransport(SwapServerTransport): self.relay_manager = None # type: Optional[aionostr.Manager] self.taskgroup = OldTaskGroup() self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]] + self._swap_server_requests = asyncio.Queue(maxsize=5) # type: asyncio.Queue[dict] def __enter__(self): asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) @@ -1547,6 +1548,7 @@ class NostrTransport(SwapServerTransport): if self.sm.is_server: tasks = [ self.check_direct_messages(), + self._handle_requests(), ] else: tasks = [ @@ -1807,37 +1809,42 @@ class NostrTransport(SwapServerTransport): if fut: fut.set_result(content) elif self.sm.is_server and 'method' in content: - try: - await self._handle_request(content) - except Exception as e: - self.logger.exception(f"failed to handle request: {content}") - error_response = json.dumps({ - "error": str(e)[:100], - "reply_to": event.id, - }) - await self.taskgroup.spawn(self.send_direct_message(event.pubkey, error_response)) + if self._swap_server_requests.full(): + self.logger.warning(f"too many swap requests, dropping incoming request: {event.id[:10]}...") + continue + await self._swap_server_requests.put(content) else: self.logger.info(f'unknown message {content}') @log_exceptions - async def _handle_request(self, request: dict) -> None: + async def _handle_requests(self) -> None: assert self.sm.is_server - # todo: remember event_id of already processed requests - method = request.pop('method') - event_id = request.pop('event_id') - event_pubkey = request.pop('event_pubkey') - self.logger.info(f'handle_request: id={event_id} {method} {request}') - if method == 'addswapinvoice': - r = self.sm.server_add_swap_invoice(request) - elif method == 'createswap': - r = self.sm.server_create_swap(request) - elif method == 'createnormalswap': - r = self.sm.server_create_normal_swap(request) - else: - raise Exception(method) - r['reply_to'] = event_id - self.logger.debug(f'sending response id={event_id}') - await self.taskgroup.spawn(self.send_direct_message(event_pubkey, json.dumps(r), retries=2)) + while True: + await asyncio.sleep(5) + request = await self._swap_server_requests.get() + event_id = request.pop('event_id') + event_pubkey = request.pop('event_pubkey') + try: + method = request.pop('method') + self.logger.info(f'handle_request: id={event_id} {method} {request}') + if method == 'addswapinvoice': + r = self.sm.server_add_swap_invoice(request) + elif method == 'createswap': + r = self.sm.server_create_swap(request) + elif method == 'createnormalswap': + r = self.sm.server_create_normal_swap(request) + else: + raise Exception(method) + r['reply_to'] = event_id + self.logger.debug(f'sending response id={event_id}') + await self.taskgroup.spawn(self.send_direct_message(event_pubkey, json.dumps(r), retries=2)) + except Exception as e: + self.logger.exception(f"failed to handle {request=}") + error_response = json.dumps({ + "error": str(e)[:100], + "reply_to": event_id, + }) + await self.taskgroup.spawn(self.send_direct_message(event_pubkey, error_response)) def _store_last_swapserver_relays(self, relays: Sequence[str]): self._last_swapserver_relays = relays