Merge pull request #9640 from f321x/swap_dm_relays
swaps: Update submarine swap nostr relays dynamically and remove redundant query
This commit is contained in:
@@ -201,6 +201,7 @@ class SwapManager(Logger):
|
|||||||
self.prepayments[swap.prepay_hash] = bytes.fromhex(k)
|
self.prepayments[swap.prepay_hash] = bytes.fromhex(k)
|
||||||
self.is_server = False # overriden by swapserver plugin if enabled
|
self.is_server = False # overriden by swapserver plugin if enabled
|
||||||
self.is_initialized = asyncio.Event()
|
self.is_initialized = asyncio.Event()
|
||||||
|
self.pairs_updated = asyncio.Event()
|
||||||
|
|
||||||
def start_network(self, network: 'Network'):
|
def start_network(self, network: 'Network'):
|
||||||
assert network
|
assert network
|
||||||
@@ -917,7 +918,15 @@ class SwapManager(Logger):
|
|||||||
self.percentage = pairs.percentage
|
self.percentage = pairs.percentage
|
||||||
self._min_amount = pairs.min_amount
|
self._min_amount = pairs.min_amount
|
||||||
self._max_amount = pairs.max_amount
|
self._max_amount = pairs.max_amount
|
||||||
self.is_initialized.set()
|
self.trigger_pairs_updated_threadsafe()
|
||||||
|
|
||||||
|
def trigger_pairs_updated_threadsafe(self):
|
||||||
|
def trigger():
|
||||||
|
self.is_initialized.set()
|
||||||
|
self.pairs_updated.set()
|
||||||
|
self.pairs_updated.clear()
|
||||||
|
loop = get_asyncio_loop()
|
||||||
|
loop.call_soon_threadsafe(trigger)
|
||||||
|
|
||||||
def get_max_amount(self) -> int:
|
def get_max_amount(self) -> int:
|
||||||
"""in satoshis"""
|
"""in satoshis"""
|
||||||
@@ -1060,7 +1069,7 @@ class SwapManager(Logger):
|
|||||||
*,
|
*,
|
||||||
txin: PartialTxInput,
|
txin: PartialTxInput,
|
||||||
swap: SwapData,
|
swap: SwapData,
|
||||||
) -> PartialTransaction:
|
) -> Tuple[PartialTxInput, Optional[int]]:
|
||||||
if swap.is_reverse: # successful reverse swap
|
if swap.is_reverse: # successful reverse swap
|
||||||
locktime = None
|
locktime = None
|
||||||
# preimage will be set in sign_tx
|
# preimage will be set in sign_tx
|
||||||
@@ -1274,11 +1283,11 @@ class NostrTransport(SwapServerTransport):
|
|||||||
self.private_key = keypair.privkey
|
self.private_key = keypair.privkey
|
||||||
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
|
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
|
||||||
self.nostr_pubkey = keypair.pubkey.hex()[2:]
|
self.nostr_pubkey = keypair.pubkey.hex()[2:]
|
||||||
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[bytes, asyncio.Future]
|
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[str, asyncio.Future]
|
||||||
self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
|
self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
|
||||||
self.relay_manager = None
|
self.relay_manager = None
|
||||||
self.taskgroup = OldTaskGroup()
|
self.taskgroup = OldTaskGroup()
|
||||||
self.server_relays = None
|
self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]]
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
||||||
@@ -1305,8 +1314,8 @@ class NostrTransport(SwapServerTransport):
|
|||||||
else:
|
else:
|
||||||
tasks = [
|
tasks = [
|
||||||
self.check_direct_messages(),
|
self.check_direct_messages(),
|
||||||
self.receive_offers(),
|
|
||||||
self.get_pairs(),
|
self.get_pairs(),
|
||||||
|
self.update_relays()
|
||||||
]
|
]
|
||||||
try:
|
try:
|
||||||
async with self.taskgroup as group:
|
async with self.taskgroup as group:
|
||||||
@@ -1327,7 +1336,11 @@ class NostrTransport(SwapServerTransport):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def relays(self):
|
def relays(self):
|
||||||
return self.network.config.NOSTR_RELAYS.split(',')
|
our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else []
|
||||||
|
if self.sm.is_server:
|
||||||
|
return our_relays
|
||||||
|
last_swapserver_relays = self._last_swapserver_relays or []
|
||||||
|
return list(set(our_relays + last_swapserver_relays))
|
||||||
|
|
||||||
def get_relay_manager(self):
|
def get_relay_manager(self):
|
||||||
assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
|
assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
|
||||||
@@ -1390,7 +1403,7 @@ class NostrTransport(SwapServerTransport):
|
|||||||
private_key=self.nostr_private_key)
|
private_key=self.nostr_private_key)
|
||||||
self.logger.info(f"published offer {event_id}")
|
self.logger.info(f"published offer {event_id}")
|
||||||
|
|
||||||
async def send_direct_message(self, pubkey: str, relays, content: str) -> str:
|
async def send_direct_message(self, pubkey: str, content: str) -> str:
|
||||||
our_private_key = aionostr.key.PrivateKey(self.private_key)
|
our_private_key = aionostr.key.PrivateKey(self.private_key)
|
||||||
recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
|
recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
|
||||||
encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
|
encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
|
||||||
@@ -1405,27 +1418,29 @@ class NostrTransport(SwapServerTransport):
|
|||||||
|
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
|
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
|
||||||
|
self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}")
|
||||||
request_data['method'] = method
|
request_data['method'] = method
|
||||||
request_data['relays'] = self.config.NOSTR_RELAYS
|
|
||||||
server_pubkey = self.config.SWAPSERVER_NPUB
|
server_pubkey = self.config.SWAPSERVER_NPUB
|
||||||
event_id = await self.send_direct_message(server_pubkey, self.server_relays, json.dumps(request_data))
|
event_id = await self.send_direct_message(server_pubkey, json.dumps(request_data))
|
||||||
response = await self.dm_replies[event_id]
|
response = await self.dm_replies[event_id]
|
||||||
return response
|
return response
|
||||||
|
|
||||||
async def receive_offers(self):
|
async def get_pairs(self):
|
||||||
await self.is_connected.wait()
|
await self.is_connected.wait()
|
||||||
query = {
|
query = {
|
||||||
"kinds": [self.USER_STATUS_NIP38],
|
"kinds": [self.USER_STATUS_NIP38],
|
||||||
"limit":10,
|
"limit":10,
|
||||||
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
|
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
|
||||||
"#r": [f"net:{constants.net.NET_NAME}"],
|
"#r": [f"net:{constants.net.NET_NAME}"],
|
||||||
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC
|
"since": int(time.time()) - 60 * 60,
|
||||||
|
"until": int(time.time()) + 60 * 60,
|
||||||
}
|
}
|
||||||
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
|
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
|
||||||
try:
|
try:
|
||||||
content = json.loads(event.content)
|
content = json.loads(event.content)
|
||||||
tags = {k: v for k, v in event.tags}
|
tags = {k: v for k, v in event.tags}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
self.logger.debug(f"failed to parse event: {e}")
|
||||||
continue
|
continue
|
||||||
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
|
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
|
||||||
continue
|
continue
|
||||||
@@ -1434,8 +1449,9 @@ class NostrTransport(SwapServerTransport):
|
|||||||
# check if this is the most recent event for this pubkey
|
# check if this is the most recent event for this pubkey
|
||||||
pubkey = event.pubkey
|
pubkey = event.pubkey
|
||||||
ts = self._offers.get(pubkey, {}).get('timestamp', 0)
|
ts = self._offers.get(pubkey, {}).get('timestamp', 0)
|
||||||
if event.created_at <= ts:
|
if (event.created_at <= ts
|
||||||
#print('skipping old event', pubkey[0:10], event.id)
|
or event.created_at > time.time() + 60 * 60
|
||||||
|
or event.created_at < time.time() - 60 * 60):
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
pow_bits = get_nostr_ann_pow_amount(
|
pow_bits = get_nostr_ann_pow_amount(
|
||||||
@@ -1450,40 +1466,30 @@ class NostrTransport(SwapServerTransport):
|
|||||||
content['pow_bits'] = pow_bits
|
content['pow_bits'] = pow_bits
|
||||||
content['pubkey'] = pubkey
|
content['pubkey'] = pubkey
|
||||||
content['timestamp'] = event.created_at
|
content['timestamp'] = event.created_at
|
||||||
self._offers[pubkey] = content
|
|
||||||
# mirror event to other relays
|
|
||||||
server_relays = content['relays'].split(',') if 'relays' in content else []
|
server_relays = content['relays'].split(',') if 'relays' in content else []
|
||||||
|
content['relays'] = server_relays[:10] # limit to 10 relays
|
||||||
|
self._offers[pubkey] = content
|
||||||
|
if self.config.SWAPSERVER_NPUB == pubkey:
|
||||||
|
pairs = self._parse_offer(content)
|
||||||
|
self.sm.update_pairs(pairs)
|
||||||
|
# mirror event to other relays
|
||||||
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))
|
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))
|
||||||
|
|
||||||
async def get_pairs(self):
|
async def update_relays(self):
|
||||||
if not self.config.SWAPSERVER_NPUB:
|
"""
|
||||||
return
|
Update the relays when update_pairs is called.
|
||||||
query = {
|
This ensures we try to connect to the same relays as the ones announced by the swap server.
|
||||||
"kinds": [self.USER_STATUS_NIP38],
|
"""
|
||||||
"authors": [self.config.SWAPSERVER_NPUB],
|
while True:
|
||||||
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
|
previous_relays = self._last_swapserver_relays
|
||||||
"#r": [f"net:{constants.net.NET_NAME}"],
|
await self.sm.pairs_updated.wait()
|
||||||
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC,
|
latest_known_relays = self._offers[self.config.SWAPSERVER_NPUB]['relays']
|
||||||
"limit": 1
|
if latest_known_relays != previous_relays:
|
||||||
}
|
self.logger.debug(f"swapserver relays changed, updating relay list.")
|
||||||
async for event in self.relay_manager.get_events(query, single_event=True, only_stored=False):
|
# store the latest known relays to a file
|
||||||
try:
|
self._store_last_swapserver_relays(latest_known_relays)
|
||||||
content = json.loads(event.content)
|
# update the relay manager
|
||||||
tags = {k: v for k, v in event.tags}
|
await self.relay_manager.update_relays(self.relays)
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
|
|
||||||
continue
|
|
||||||
if tags.get('r') != f"net:{constants.net.NET_NAME}":
|
|
||||||
continue
|
|
||||||
# check if this is the most recent event for this pubkey
|
|
||||||
pubkey = event.pubkey
|
|
||||||
content['pubkey'] = pubkey
|
|
||||||
content['timestamp'] = event.created_at
|
|
||||||
self.logger.info(f'received offer from {age(event.created_at)}')
|
|
||||||
pairs = self._parse_offer(content)
|
|
||||||
self.sm.update_pairs(pairs)
|
|
||||||
self.server_relays = content['relays'].split(',')
|
|
||||||
|
|
||||||
async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
|
async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
|
||||||
"""If the relays of the origin server are different from our relays we rebroadcast the
|
"""If the relays of the origin server are different from our relays we rebroadcast the
|
||||||
@@ -1527,7 +1533,6 @@ class NostrTransport(SwapServerTransport):
|
|||||||
event_id = request.pop('event_id')
|
event_id = request.pop('event_id')
|
||||||
event_pubkey = request.pop('event_pubkey')
|
event_pubkey = request.pop('event_pubkey')
|
||||||
self.logger.info(f'handle_request: id={event_id} {method} {request}')
|
self.logger.info(f'handle_request: id={event_id} {method} {request}')
|
||||||
relays = request.pop('relays').split(',')
|
|
||||||
if method == 'addswapinvoice':
|
if method == 'addswapinvoice':
|
||||||
r = self.sm.server_add_swap_invoice(request)
|
r = self.sm.server_add_swap_invoice(request)
|
||||||
elif method == 'createswap':
|
elif method == 'createswap':
|
||||||
@@ -1537,5 +1542,28 @@ class NostrTransport(SwapServerTransport):
|
|||||||
else:
|
else:
|
||||||
raise Exception(method)
|
raise Exception(method)
|
||||||
r['reply_to'] = event_id
|
r['reply_to'] = event_id
|
||||||
self.logger.info(f'sending response id={event_id}')
|
self.logger.debug(f'sending response id={event_id}')
|
||||||
await self.send_direct_message(event_pubkey, relays, json.dumps(r))
|
await self.send_direct_message(event_pubkey, json.dumps(r))
|
||||||
|
|
||||||
|
def _store_last_swapserver_relays(self, relays: Sequence[str]):
|
||||||
|
self._last_swapserver_relays = relays
|
||||||
|
if not self.config.path or not relays:
|
||||||
|
return
|
||||||
|
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
|
||||||
|
try:
|
||||||
|
with open(storage_path, 'w', encoding="utf-8") as f:
|
||||||
|
json.dump(relays, f, indent=4, sort_keys=True) # type: ignore
|
||||||
|
except Exception:
|
||||||
|
self.logger.exception(f"failed to write last swapserver relays to {storage_path}")
|
||||||
|
|
||||||
|
def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]:
|
||||||
|
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
|
||||||
|
if not os.path.exists(storage_path):
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
with open(storage_path, 'r', encoding="utf-8") as f:
|
||||||
|
relays = json.load(f)
|
||||||
|
except Exception:
|
||||||
|
self.logger.exception(f"failed to read last swapserver relays from {storage_path}")
|
||||||
|
return None
|
||||||
|
return relays
|
||||||
|
|||||||
Reference in New Issue
Block a user