Merge pull request #9740 from f321x/update_swap_liquidity_on_events
swaps: Improve accuracy of swapserver liquidity announcement.
This commit is contained in:
@@ -210,7 +210,6 @@ class SwapManager(Logger):
|
||||
self.is_server = False # overriden by swapserver plugin if enabled
|
||||
self.is_initialized = asyncio.Event()
|
||||
self.pairs_updated = asyncio.Event()
|
||||
self._liquidity_changed = asyncio.Event()
|
||||
|
||||
def start_network(self, network: 'Network'):
|
||||
assert network
|
||||
@@ -229,23 +228,33 @@ class SwapManager(Logger):
|
||||
async def run_nostr_server(self):
|
||||
await self.set_nostr_proof_of_work()
|
||||
with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport:
|
||||
# wait a bit so we don't publish 0 liquidity on startup if channels are not yet reestablished
|
||||
await asyncio.sleep(10)
|
||||
await transport.is_connected.wait()
|
||||
self.logger.info(f'nostr is connected')
|
||||
# will publish a new announcement if liquidity changed or every OFFER_UPDATE_INTERVAL_SEC
|
||||
last_update = time.time()
|
||||
while True:
|
||||
# todo: publish everytime fees have changed
|
||||
self.server_update_pairs()
|
||||
await transport.publish_offer(self)
|
||||
await asyncio.sleep(transport.LIQUIDITY_UPDATE_INTERVAL_SEC)
|
||||
|
||||
previous_max_forward = self._max_forward
|
||||
previous_max_reverse = self._max_reverse
|
||||
previous_mining_fee = self.mining_fee
|
||||
try:
|
||||
await wait_for2(
|
||||
self._liquidity_changed.wait(),
|
||||
timeout=transport.OFFER_UPDATE_INTERVAL_SEC
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
self.server_update_pairs()
|
||||
except Exception:
|
||||
self.logger.exception("server_update_pairs failed")
|
||||
continue
|
||||
|
||||
liquidity_changed = self._max_forward != previous_max_forward \
|
||||
or self._max_reverse != previous_max_reverse
|
||||
mining_fees_changed = self.mining_fee != previous_mining_fee
|
||||
if liquidity_changed or mining_fees_changed:
|
||||
self.logger.debug(f"updating announcement: {liquidity_changed=}, {mining_fees_changed=}")
|
||||
elif time.time() - last_update < transport.OFFER_UPDATE_INTERVAL_SEC:
|
||||
continue
|
||||
|
||||
await transport.publish_offer(self)
|
||||
last_update = time.time()
|
||||
|
||||
@log_exceptions
|
||||
async def main_loop(self):
|
||||
tasks = [self.pay_pending_invoices()]
|
||||
@@ -444,7 +453,6 @@ class SwapManager(Logger):
|
||||
except BelowDustLimit:
|
||||
self.logger.info('utxo value below dust threshold')
|
||||
return
|
||||
self.server_maybe_trigger_liquidity_update()
|
||||
|
||||
def get_swap_tx_fee(self):
|
||||
return self._get_tx_fee(self.config.FEE_POLICY)
|
||||
@@ -940,7 +948,10 @@ class SwapManager(Logger):
|
||||
max_reverse: int = min(int(self.lnworker.num_sats_can_send()), 10000000)
|
||||
self._max_forward: int = self._keep_leading_digits(max_forward, 2)
|
||||
self._max_reverse: int = self._keep_leading_digits(max_reverse, 2)
|
||||
self.mining_fee = self.get_fee_for_txbatcher()
|
||||
new_mining_fee = self.get_fee_for_txbatcher()
|
||||
if self.mining_fee is None \
|
||||
or abs(self.mining_fee - new_mining_fee) / self.mining_fee > 0.1:
|
||||
self.mining_fee = new_mining_fee
|
||||
|
||||
@staticmethod
|
||||
def _keep_leading_digits(num: int, digits: int) -> int:
|
||||
@@ -968,23 +979,6 @@ class SwapManager(Logger):
|
||||
|
||||
run_sync_function_on_asyncio_thread(trigger, block=True)
|
||||
|
||||
def server_maybe_trigger_liquidity_update(self) -> None:
|
||||
"""
|
||||
To be called when the available liquidity changes so the new liquidity is announced.
|
||||
(ln in/out, onchain in/out)
|
||||
"""
|
||||
if not self.is_server:
|
||||
return
|
||||
assert get_running_loop() == get_asyncio_loop(), "Events must be set in the asyncio thread"
|
||||
previous_max_forward = self._max_forward
|
||||
previous_max_reverse = self._max_reverse
|
||||
self.server_update_pairs()
|
||||
# if liquidity really changed the event is triggered so a new provider announcement is published
|
||||
if self._max_forward != previous_max_forward or self._max_reverse != previous_max_reverse:
|
||||
self.logger.debug(f"liquidity changed, updating announcement")
|
||||
self._liquidity_changed.set()
|
||||
self._liquidity_changed.clear()
|
||||
|
||||
def get_provider_max_forward_amount(self) -> int:
|
||||
"""in sat"""
|
||||
return self._max_forward
|
||||
@@ -1367,6 +1361,7 @@ class NostrTransport(SwapServerTransport):
|
||||
USER_STATUS_NIP38 = 30315
|
||||
NOSTR_EVENT_VERSION = 5
|
||||
OFFER_UPDATE_INTERVAL_SEC = 60 * 10
|
||||
LIQUIDITY_UPDATE_INTERVAL_SEC = 30
|
||||
|
||||
def __init__(self, config, sm, keypair):
|
||||
SwapServerTransport.__init__(self, config=config, sm=sm)
|
||||
@@ -1655,7 +1650,6 @@ class NostrTransport(SwapServerTransport):
|
||||
r['reply_to'] = event_id
|
||||
self.logger.debug(f'sending response id={event_id}')
|
||||
await self.send_direct_message(event_pubkey, json.dumps(r))
|
||||
self.sm.server_maybe_trigger_liquidity_update()
|
||||
|
||||
def _store_last_swapserver_relays(self, relays: Sequence[str]):
|
||||
self._last_swapserver_relays = relays
|
||||
|
||||
Reference in New Issue
Block a user