1
0

dynamically update relays and remove redundant nostr query, store last

swapserver relays in file
This commit is contained in:
f321x
2025-02-14 16:28:11 +01:00
parent 937b157d29
commit 21e3fd91dd

View File

@@ -202,6 +202,7 @@ class SwapManager(Logger):
self.prepayments[swap.prepay_hash] = bytes.fromhex(k)
self.is_server = False # overriden by swapserver plugin if enabled
self.is_initialized = asyncio.Event()
self.pairs_updated = asyncio.Event()
def start_network(self, network: 'Network'):
assert network
@@ -922,7 +923,15 @@ class SwapManager(Logger):
self.percentage = pairs.percentage
self._min_amount = pairs.min_amount
self._max_amount = pairs.max_amount
self.is_initialized.set()
self.trigger_pairs_updated_threadsafe()
def trigger_pairs_updated_threadsafe(self):
def trigger():
self.is_initialized.set()
self.pairs_updated.set()
self.pairs_updated.clear()
loop = get_asyncio_loop()
loop.call_soon_threadsafe(trigger)
def get_max_amount(self) -> int:
"""in satoshis"""
@@ -1065,7 +1074,7 @@ class SwapManager(Logger):
*,
txin: PartialTxInput,
swap: SwapData,
) -> PartialTransaction:
) -> Tuple[PartialTxInput, Optional[int]]:
if swap.is_reverse: # successful reverse swap
locktime = None
# preimage will be set in sign_tx
@@ -1280,11 +1289,11 @@ class NostrTransport(SwapServerTransport):
self.private_key = keypair.privkey
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
self.nostr_pubkey = keypair.pubkey.hex()[2:]
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[bytes, asyncio.Future]
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[str, asyncio.Future]
self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
self.relay_manager = None
self.taskgroup = OldTaskGroup()
self.server_relays = None
self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]]
def __enter__(self):
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
@@ -1311,8 +1320,8 @@ class NostrTransport(SwapServerTransport):
else:
tasks = [
self.check_direct_messages(),
self.receive_offers(),
self.get_pairs(),
self.update_relays()
]
try:
async with self.taskgroup as group:
@@ -1333,7 +1342,11 @@ class NostrTransport(SwapServerTransport):
@property
def relays(self):
return self.network.config.NOSTR_RELAYS.split(',')
our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else []
if self.sm.is_server:
return our_relays
last_swapserver_relays = self._last_swapserver_relays or []
return list(set(our_relays + last_swapserver_relays))
def get_relay_manager(self):
assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
@@ -1396,7 +1409,7 @@ class NostrTransport(SwapServerTransport):
private_key=self.nostr_private_key)
self.logger.info(f"published offer {event_id}")
async def send_direct_message(self, pubkey: str, relays, content: str) -> str:
async def send_direct_message(self, pubkey: str, content: str) -> str:
our_private_key = aionostr.key.PrivateKey(self.private_key)
recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
@@ -1411,27 +1424,29 @@ class NostrTransport(SwapServerTransport):
@log_exceptions
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}")
request_data['method'] = method
request_data['relays'] = self.config.NOSTR_RELAYS
server_pubkey = self.config.SWAPSERVER_NPUB
event_id = await self.send_direct_message(server_pubkey, self.server_relays, json.dumps(request_data))
event_id = await self.send_direct_message(server_pubkey, json.dumps(request_data))
response = await self.dm_replies[event_id]
return response
async def receive_offers(self):
async def get_pairs(self):
await self.is_connected.wait()
query = {
"kinds": [self.USER_STATUS_NIP38],
"limit":10,
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
"#r": [f"net:{constants.net.NET_NAME}"],
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC
"since": int(time.time()) - 60 * 60,
"until": int(time.time()) + 60 * 60,
}
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
try:
content = json.loads(event.content)
tags = {k: v for k, v in event.tags}
except Exception as e:
self.logger.debug(f"failed to parse event: {e}")
continue
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
continue
@@ -1440,8 +1455,9 @@ class NostrTransport(SwapServerTransport):
# check if this is the most recent event for this pubkey
pubkey = event.pubkey
ts = self._offers.get(pubkey, {}).get('timestamp', 0)
if event.created_at <= ts:
#print('skipping old event', pubkey[0:10], event.id)
if (event.created_at <= ts
or event.created_at > time.time() + 60 * 60
or event.created_at < time.time() - 60 * 60):
continue
try:
pow_bits = get_nostr_ann_pow_amount(
@@ -1456,40 +1472,30 @@ class NostrTransport(SwapServerTransport):
content['pow_bits'] = pow_bits
content['pubkey'] = pubkey
content['timestamp'] = event.created_at
self._offers[pubkey] = content
# mirror event to other relays
server_relays = content['relays'].split(',') if 'relays' in content else []
content['relays'] = server_relays[:10] # limit to 10 relays
self._offers[pubkey] = content
if self.config.SWAPSERVER_NPUB == pubkey:
pairs = self._parse_offer(content)
self.sm.update_pairs(pairs)
# mirror event to other relays
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))
async def get_pairs(self):
if not self.config.SWAPSERVER_NPUB:
return
query = {
"kinds": [self.USER_STATUS_NIP38],
"authors": [self.config.SWAPSERVER_NPUB],
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
"#r": [f"net:{constants.net.NET_NAME}"],
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC,
"limit": 1
}
async for event in self.relay_manager.get_events(query, single_event=True, only_stored=False):
try:
content = json.loads(event.content)
tags = {k: v for k, v in event.tags}
except Exception:
continue
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
continue
if tags.get('r') != f"net:{constants.net.NET_NAME}":
continue
# check if this is the most recent event for this pubkey
pubkey = event.pubkey
content['pubkey'] = pubkey
content['timestamp'] = event.created_at
self.logger.info(f'received offer from {age(event.created_at)}')
pairs = self._parse_offer(content)
self.sm.update_pairs(pairs)
self.server_relays = content['relays'].split(',')
async def update_relays(self):
"""
Update the relays when update_pairs is called.
This ensures we try to connect to the same relays as the ones announced by the swap server.
"""
while True:
previous_relays = self._last_swapserver_relays
await self.sm.pairs_updated.wait()
latest_known_relays = self._offers[self.config.SWAPSERVER_NPUB]['relays']
if latest_known_relays != previous_relays:
self.logger.debug(f"swapserver relays changed, updating relay list.")
# store the latest known relays to a file
self._store_last_swapserver_relays(latest_known_relays)
# update the relay manager
await self.relay_manager.update_relays(self.relays)
async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
"""If the relays of the origin server are different from our relays we rebroadcast the
@@ -1533,7 +1539,6 @@ class NostrTransport(SwapServerTransport):
event_id = request.pop('event_id')
event_pubkey = request.pop('event_pubkey')
self.logger.info(f'handle_request: id={event_id} {method} {request}')
relays = request.pop('relays').split(',')
if method == 'addswapinvoice':
r = self.sm.server_add_swap_invoice(request)
elif method == 'createswap':
@@ -1543,5 +1548,28 @@ class NostrTransport(SwapServerTransport):
else:
raise Exception(method)
r['reply_to'] = event_id
self.logger.info(f'sending response id={event_id}')
await self.send_direct_message(event_pubkey, relays, json.dumps(r))
self.logger.debug(f'sending response id={event_id}')
await self.send_direct_message(event_pubkey, json.dumps(r))
def _store_last_swapserver_relays(self, relays: Sequence[str]):
self._last_swapserver_relays = relays
if not self.config.path or not relays:
return
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
try:
with open(storage_path, 'w', encoding="utf-8") as f:
json.dump(relays, f, indent=4, sort_keys=True) # type: ignore
except Exception:
self.logger.exception(f"failed to write last swapserver relays to {storage_path}")
def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]:
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
if not os.path.exists(storage_path):
return None
try:
with open(storage_path, 'r', encoding="utf-8") as f:
relays = json.load(f)
except Exception:
self.logger.exception(f"failed to read last swapserver relays from {storage_path}")
return None
return relays