diff --git a/electrum/submarine_swaps.py b/electrum/submarine_swaps.py index 548825053..4d89da8aa 100644 --- a/electrum/submarine_swaps.py +++ b/electrum/submarine_swaps.py @@ -202,6 +202,7 @@ class SwapManager(Logger): self.prepayments[swap.prepay_hash] = bytes.fromhex(k) self.is_server = False # overriden by swapserver plugin if enabled self.is_initialized = asyncio.Event() + self.pairs_updated = asyncio.Event() def start_network(self, network: 'Network'): assert network @@ -922,7 +923,15 @@ class SwapManager(Logger): self.percentage = pairs.percentage self._min_amount = pairs.min_amount self._max_amount = pairs.max_amount - self.is_initialized.set() + self.trigger_pairs_updated_threadsafe() + + def trigger_pairs_updated_threadsafe(self): + def trigger(): + self.is_initialized.set() + self.pairs_updated.set() + self.pairs_updated.clear() + loop = get_asyncio_loop() + loop.call_soon_threadsafe(trigger) def get_max_amount(self) -> int: """in satoshis""" @@ -1065,7 +1074,7 @@ class SwapManager(Logger): *, txin: PartialTxInput, swap: SwapData, - ) -> PartialTransaction: + ) -> Tuple[PartialTxInput, Optional[int]]: if swap.is_reverse: # successful reverse swap locktime = None # preimage will be set in sign_tx @@ -1280,11 +1289,11 @@ class NostrTransport(SwapServerTransport): self.private_key = keypair.privkey self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex()) self.nostr_pubkey = keypair.pubkey.hex()[2:] - self.dm_replies = defaultdict(asyncio.Future) # type: Dict[bytes, asyncio.Future] + self.dm_replies = defaultdict(asyncio.Future) # type: Dict[str, asyncio.Future] self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path) self.relay_manager = None self.taskgroup = OldTaskGroup() - self.server_relays = None + self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]] def __enter__(self): asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop) @@ -1311,8 +1320,8 @@ class NostrTransport(SwapServerTransport): else: tasks = [ self.check_direct_messages(), - self.receive_offers(), self.get_pairs(), + self.update_relays() ] try: async with self.taskgroup as group: @@ -1333,7 +1342,11 @@ class NostrTransport(SwapServerTransport): @property def relays(self): - return self.network.config.NOSTR_RELAYS.split(',') + our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else [] + if self.sm.is_server: + return our_relays + last_swapserver_relays = self._last_swapserver_relays or [] + return list(set(our_relays + last_swapserver_relays)) def get_relay_manager(self): assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!" @@ -1396,7 +1409,7 @@ class NostrTransport(SwapServerTransport): private_key=self.nostr_private_key) self.logger.info(f"published offer {event_id}") - async def send_direct_message(self, pubkey: str, relays, content: str) -> str: + async def send_direct_message(self, pubkey: str, content: str) -> str: our_private_key = aionostr.key.PrivateKey(self.private_key) recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex) @@ -1411,27 +1424,29 @@ class NostrTransport(SwapServerTransport): @log_exceptions async def send_request_to_server(self, method: str, request_data: dict) -> dict: + self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}") request_data['method'] = method - request_data['relays'] = self.config.NOSTR_RELAYS server_pubkey = self.config.SWAPSERVER_NPUB - event_id = await self.send_direct_message(server_pubkey, self.server_relays, json.dumps(request_data)) + event_id = await self.send_direct_message(server_pubkey, json.dumps(request_data)) response = await self.dm_replies[event_id] return response - async def receive_offers(self): + async def get_pairs(self): await self.is_connected.wait() query = { "kinds": [self.USER_STATUS_NIP38], "limit":10, "#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"], "#r": [f"net:{constants.net.NET_NAME}"], - "since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC + "since": int(time.time()) - 60 * 60, + "until": int(time.time()) + 60 * 60, } async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False): try: content = json.loads(event.content) tags = {k: v for k, v in event.tags} except Exception as e: + self.logger.debug(f"failed to parse event: {e}") continue if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}": continue @@ -1440,8 +1455,9 @@ class NostrTransport(SwapServerTransport): # check if this is the most recent event for this pubkey pubkey = event.pubkey ts = self._offers.get(pubkey, {}).get('timestamp', 0) - if event.created_at <= ts: - #print('skipping old event', pubkey[0:10], event.id) + if (event.created_at <= ts + or event.created_at > time.time() + 60 * 60 + or event.created_at < time.time() - 60 * 60): continue try: pow_bits = get_nostr_ann_pow_amount( @@ -1456,40 +1472,30 @@ class NostrTransport(SwapServerTransport): content['pow_bits'] = pow_bits content['pubkey'] = pubkey content['timestamp'] = event.created_at - self._offers[pubkey] = content - # mirror event to other relays server_relays = content['relays'].split(',') if 'relays' in content else [] + content['relays'] = server_relays[:10] # limit to 10 relays + self._offers[pubkey] = content + if self.config.SWAPSERVER_NPUB == pubkey: + pairs = self._parse_offer(content) + self.sm.update_pairs(pairs) + # mirror event to other relays await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays)) - async def get_pairs(self): - if not self.config.SWAPSERVER_NPUB: - return - query = { - "kinds": [self.USER_STATUS_NIP38], - "authors": [self.config.SWAPSERVER_NPUB], - "#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"], - "#r": [f"net:{constants.net.NET_NAME}"], - "since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC, - "limit": 1 - } - async for event in self.relay_manager.get_events(query, single_event=True, only_stored=False): - try: - content = json.loads(event.content) - tags = {k: v for k, v in event.tags} - except Exception: - continue - if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}": - continue - if tags.get('r') != f"net:{constants.net.NET_NAME}": - continue - # check if this is the most recent event for this pubkey - pubkey = event.pubkey - content['pubkey'] = pubkey - content['timestamp'] = event.created_at - self.logger.info(f'received offer from {age(event.created_at)}') - pairs = self._parse_offer(content) - self.sm.update_pairs(pairs) - self.server_relays = content['relays'].split(',') + async def update_relays(self): + """ + Update the relays when update_pairs is called. + This ensures we try to connect to the same relays as the ones announced by the swap server. + """ + while True: + previous_relays = self._last_swapserver_relays + await self.sm.pairs_updated.wait() + latest_known_relays = self._offers[self.config.SWAPSERVER_NPUB]['relays'] + if latest_known_relays != previous_relays: + self.logger.debug(f"swapserver relays changed, updating relay list.") + # store the latest known relays to a file + self._store_last_swapserver_relays(latest_known_relays) + # update the relay manager + await self.relay_manager.update_relays(self.relays) async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]): """If the relays of the origin server are different from our relays we rebroadcast the @@ -1533,7 +1539,6 @@ class NostrTransport(SwapServerTransport): event_id = request.pop('event_id') event_pubkey = request.pop('event_pubkey') self.logger.info(f'handle_request: id={event_id} {method} {request}') - relays = request.pop('relays').split(',') if method == 'addswapinvoice': r = self.sm.server_add_swap_invoice(request) elif method == 'createswap': @@ -1543,5 +1548,28 @@ class NostrTransport(SwapServerTransport): else: raise Exception(method) r['reply_to'] = event_id - self.logger.info(f'sending response id={event_id}') - await self.send_direct_message(event_pubkey, relays, json.dumps(r)) + self.logger.debug(f'sending response id={event_id}') + await self.send_direct_message(event_pubkey, json.dumps(r)) + + def _store_last_swapserver_relays(self, relays: Sequence[str]): + self._last_swapserver_relays = relays + if not self.config.path or not relays: + return + storage_path = os.path.join(self.config.path, 'recent_swapserver_relays') + try: + with open(storage_path, 'w', encoding="utf-8") as f: + json.dump(relays, f, indent=4, sort_keys=True) # type: ignore + except Exception: + self.logger.exception(f"failed to write last swapserver relays to {storage_path}") + + def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]: + storage_path = os.path.join(self.config.path, 'recent_swapserver_relays') + if not os.path.exists(storage_path): + return None + try: + with open(storage_path, 'r', encoding="utf-8") as f: + relays = json.load(f) + except Exception: + self.logger.exception(f"failed to read last swapserver relays from {storage_path}") + return None + return relays