- if fee estimates are high atm, some outputs are not worth to sweep
- however, fee estimates might be only-temporarily very high
- previously in such a case lnwatcher would just discard outputs as dust,
and mark the channel REDEEMED (and hence never watch it or try again)
- now, instead, if the outputs would not be dust if fee estimates were lower,
lnwatcher will keep watching the channel
- and if estimates go down, lnwatcher will sweep them then
- relatedly, previously txbatcher.is_dust() used allow_fallback_to_static_rates=True,
and it erroneously almost always fell back to the static rates (150 s/b) during
startup (race: lnwatcher was faster than the network managed to get estimates)
- now, instead, txbatcher.is_dust() does not fallback to static rates,
and the callers are supposed to handle NoDynamicFeeEstimates.
- I think this makes much more sense. The previous meaning of "is_dust"
with the fallback was weird. Now it means: "is dust at current feerates".
fixes https://github.com/spesmilo/electrum/issues/9980
1758 lines
74 KiB
Python
1758 lines
74 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
import ssl
|
|
import threading
|
|
from typing import TYPE_CHECKING, Optional, Dict, Sequence, Tuple, Iterable, List
|
|
from decimal import Decimal
|
|
import math
|
|
import time
|
|
|
|
import attr
|
|
import aiohttp
|
|
|
|
from electrum_ecc import ECPrivkey
|
|
|
|
import electrum_aionostr as aionostr
|
|
import electrum_aionostr.key
|
|
from electrum_aionostr.event import Event
|
|
from electrum_aionostr.util import to_nip19
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
from .i18n import _
|
|
from .logging import Logger
|
|
from .crypto import sha256, ripemd
|
|
from .bitcoin import script_to_p2wsh, opcodes, dust_threshold, DummyAddress, construct_witness, construct_script
|
|
from .transaction import (
|
|
PartialTxInput, PartialTxOutput, PartialTransaction, Transaction, TxInput, TxOutpoint, script_GetOp,
|
|
match_script_against_template, OPPushDataGeneric, OPPushDataPubkey
|
|
)
|
|
from .util import (
|
|
log_exceptions, ignore_exceptions, BelowDustLimit, OldTaskGroup, ca_path, gen_nostr_ann_pow,
|
|
get_nostr_ann_pow_amount, make_aiohttp_proxy_connector, get_running_loop, get_asyncio_loop, wait_for2,
|
|
run_sync_function_on_asyncio_thread, trigger_callback, NoDynamicFeeEstimates
|
|
)
|
|
from . import lnutil
|
|
from .lnutil import hex_to_bytes, REDEEM_AFTER_DOUBLE_SPENT_DELAY, Keypair
|
|
from .lnaddr import lndecode
|
|
from .json_db import StoredObject, stored_in
|
|
from . import constants
|
|
from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE
|
|
from .fee_policy import FeePolicy
|
|
from .invoices import Invoice
|
|
from .lnonion import OnionRoutingFailure, OnionFailureCode
|
|
from .lnsweep import SweepInfo
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from .network import Network
|
|
from .wallet import Abstract_Wallet
|
|
from .lnwatcher import LNWatcher
|
|
from .lnworker import LNWallet
|
|
from .lnchannel import Channel
|
|
from .simple_config import SimpleConfig
|
|
from aiohttp_socks import ProxyConnector
|
|
|
|
|
|
SWAP_TX_SIZE = 150 # default tx size, used for mining fee estimation
|
|
|
|
MIN_LOCKTIME_DELTA = 60
|
|
LOCKTIME_DELTA_REFUND = 70
|
|
MAX_LOCKTIME_DELTA = 100
|
|
MIN_FINAL_CLTV_DELTA_FOR_CLIENT = 3 * 144 # note: put in invoice, but is not enforced by receiver in lnpeer.py
|
|
assert MIN_LOCKTIME_DELTA <= LOCKTIME_DELTA_REFUND <= MAX_LOCKTIME_DELTA
|
|
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED
|
|
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_FOR_INVOICE
|
|
assert MAX_LOCKTIME_DELTA < MIN_FINAL_CLTV_DELTA_FOR_CLIENT
|
|
|
|
|
|
# The script of the reverse swaps has one extra check in it to verify
|
|
# that the length of the preimage is 32. This is required because in
|
|
# the reverse swaps the preimage is generated by the user and to
|
|
# settle the hold invoice, you need a preimage with 32 bytes . If that
|
|
# check wasn't there the user could generate a preimage with a
|
|
# different length which would still allow for claiming the onchain
|
|
# coins but the invoice couldn't be settled
|
|
|
|
WITNESS_TEMPLATE_REVERSE_SWAP = [
|
|
opcodes.OP_SIZE,
|
|
OPPushDataGeneric(None),
|
|
opcodes.OP_EQUAL,
|
|
opcodes.OP_IF,
|
|
opcodes.OP_HASH160,
|
|
OPPushDataGeneric(lambda x: x == 20),
|
|
opcodes.OP_EQUALVERIFY,
|
|
OPPushDataPubkey,
|
|
opcodes.OP_ELSE,
|
|
opcodes.OP_DROP,
|
|
OPPushDataGeneric(None),
|
|
opcodes.OP_CHECKLOCKTIMEVERIFY,
|
|
opcodes.OP_DROP,
|
|
OPPushDataPubkey,
|
|
opcodes.OP_ENDIF,
|
|
opcodes.OP_CHECKSIG
|
|
]
|
|
|
|
|
|
def check_reverse_redeem_script(
|
|
*,
|
|
redeem_script: bytes,
|
|
lockup_address: str,
|
|
payment_hash: bytes,
|
|
locktime: int,
|
|
refund_pubkey: bytes = None,
|
|
claim_pubkey: bytes = None,
|
|
) -> None:
|
|
parsed_script = [x for x in script_GetOp(redeem_script)]
|
|
if not match_script_against_template(redeem_script, WITNESS_TEMPLATE_REVERSE_SWAP):
|
|
raise Exception("rswap check failed: scriptcode does not match template")
|
|
if script_to_p2wsh(redeem_script) != lockup_address:
|
|
raise Exception("rswap check failed: inconsistent scriptcode and address")
|
|
if ripemd(payment_hash) != parsed_script[5][1]:
|
|
raise Exception("rswap check failed: our preimage not in script")
|
|
if claim_pubkey and claim_pubkey != parsed_script[7][1]:
|
|
raise Exception("rswap check failed: our pubkey not in script")
|
|
if refund_pubkey and refund_pubkey != parsed_script[13][1]:
|
|
raise Exception("rswap check failed: our pubkey not in script")
|
|
if locktime != int.from_bytes(parsed_script[10][1], byteorder='little'):
|
|
raise Exception("rswap check failed: inconsistent locktime and script")
|
|
|
|
|
|
class SwapServerError(Exception):
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
super().__init__(message)
|
|
|
|
def __str__(self):
|
|
if self.message:
|
|
return self.message
|
|
return _("The swap server errored or is unreachable.")
|
|
|
|
|
|
def now():
|
|
return int(time.time())
|
|
|
|
|
|
@attr.s(frozen=True)
|
|
class SwapFees:
|
|
percentage = attr.ib(type=int)
|
|
mining_fee = attr.ib(type=int)
|
|
min_amount = attr.ib(type=int)
|
|
max_forward = attr.ib(type=int)
|
|
max_reverse = attr.ib(type=int)
|
|
|
|
|
|
@attr.frozen
|
|
class SwapOffer:
|
|
pairs = attr.ib(type=SwapFees)
|
|
relays = attr.ib(type=list[str])
|
|
pow_bits = attr.ib(type=int)
|
|
server_pubkey = attr.ib(type=str)
|
|
timestamp = attr.ib(type=int)
|
|
|
|
@property
|
|
def server_npub(self):
|
|
return to_nip19('npub', self.server_pubkey)
|
|
|
|
|
|
@stored_in('submarine_swaps')
|
|
@attr.s
|
|
class SwapData(StoredObject):
|
|
is_reverse = attr.ib(type=bool) # for whoever is running code (PoV of client or server)
|
|
locktime = attr.ib(type=int)
|
|
onchain_amount = attr.ib(type=int) # in sats
|
|
lightning_amount = attr.ib(type=int) # in sats
|
|
redeem_script = attr.ib(type=bytes, converter=hex_to_bytes)
|
|
preimage = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
|
|
prepay_hash = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
|
|
privkey = attr.ib(type=bytes, converter=hex_to_bytes)
|
|
lockup_address = attr.ib(type=str)
|
|
receive_address = attr.ib(type=str)
|
|
funding_txid = attr.ib(type=Optional[str])
|
|
spending_txid = attr.ib(type=Optional[str])
|
|
is_redeemed = attr.ib(type=bool)
|
|
|
|
_funding_prevout = None # type: Optional[TxOutpoint] # for RBF
|
|
_payment_hash = None
|
|
_payment_pending = False # for forward swaps
|
|
|
|
@property
|
|
def payment_hash(self) -> bytes:
|
|
return self._payment_hash
|
|
|
|
def is_funded(self) -> bool:
|
|
return self._payment_pending or bool(self.funding_txid)
|
|
|
|
|
|
class SwapManager(Logger):
|
|
|
|
network: Optional['Network'] = None
|
|
lnwatcher: Optional['LNWatcher'] = None
|
|
|
|
def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'):
|
|
Logger.__init__(self)
|
|
self.mining_fee = None
|
|
self.percentage = None
|
|
self._min_amount = None
|
|
self._max_forward = None
|
|
self._max_reverse = None
|
|
|
|
self.wallet = wallet
|
|
self.config = wallet.config
|
|
self.lnworker = lnworker
|
|
self.lnwatcher = self.lnworker.lnwatcher
|
|
self.config = wallet.config
|
|
self.taskgroup = OldTaskGroup()
|
|
self.dummy_address = DummyAddress.SWAP
|
|
|
|
# note: accessing swaps dicts (besides simple lookup) needs swaps_lock
|
|
self.swaps_lock = threading.Lock()
|
|
self._swaps = self.wallet.db.get_dict('submarine_swaps') # type: Dict[str, SwapData]
|
|
self._swaps_by_funding_outpoint = {} # type: Dict[TxOutpoint, SwapData]
|
|
self._swaps_by_lockup_address = {} # type: Dict[str, SwapData]
|
|
for payment_hash_hex, swap in self._swaps.items():
|
|
payment_hash = bytes.fromhex(payment_hash_hex)
|
|
swap._payment_hash = payment_hash
|
|
self._add_or_reindex_swap(swap)
|
|
if not swap.is_reverse and not swap.is_redeemed:
|
|
self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
|
|
|
|
self._prepayments = {} # type: Dict[bytes, bytes] # fee_rhash -> rhash
|
|
for k, swap in self._swaps.items():
|
|
if swap.prepay_hash is not None:
|
|
self._prepayments[swap.prepay_hash] = bytes.fromhex(k)
|
|
self.is_server = False # overridden by swapserver plugin if enabled
|
|
self.is_initialized = asyncio.Event()
|
|
self.pairs_updated = asyncio.Event()
|
|
|
|
def start_network(self, network: 'Network'):
|
|
assert network
|
|
if self.network is not None:
|
|
self.logger.info('start_network: already started')
|
|
return
|
|
self.logger.info('start_network: starting main loop')
|
|
self.network = network
|
|
with self.swaps_lock:
|
|
swaps_items = list(self._swaps.items())
|
|
for k, swap in swaps_items:
|
|
if swap.is_redeemed:
|
|
continue
|
|
self.add_lnwatcher_callback(swap)
|
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
|
|
|
@log_exceptions
|
|
async def run_nostr_server(self):
|
|
await self.set_nostr_proof_of_work()
|
|
|
|
while self.wallet.has_password() and self.wallet.get_unlocked_password() is None:
|
|
self.logger.info("This wallet is password-protected. Please unlock it to start the swapserver plugin")
|
|
await asyncio.sleep(10)
|
|
|
|
with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport:
|
|
await transport.is_connected.wait()
|
|
self.logger.info(f'nostr is connected')
|
|
# will publish a new announcement if liquidity changed or every OFFER_UPDATE_INTERVAL_SEC
|
|
last_update = time.time()
|
|
while True:
|
|
await asyncio.sleep(transport.LIQUIDITY_UPDATE_INTERVAL_SEC)
|
|
|
|
previous_max_forward = self._max_forward
|
|
previous_max_reverse = self._max_reverse
|
|
previous_mining_fee = self.mining_fee
|
|
try:
|
|
self.server_update_pairs()
|
|
except Exception:
|
|
self.logger.exception("server_update_pairs failed")
|
|
continue
|
|
|
|
liquidity_changed = self._max_forward != previous_max_forward \
|
|
or self._max_reverse != previous_max_reverse
|
|
mining_fees_changed = self.mining_fee != previous_mining_fee
|
|
if liquidity_changed or mining_fees_changed:
|
|
self.logger.debug(f"updating announcement: {liquidity_changed=}, {mining_fees_changed=}")
|
|
elif time.time() - last_update < transport.OFFER_UPDATE_INTERVAL_SEC:
|
|
continue
|
|
|
|
await transport.publish_offer(self)
|
|
last_update = time.time()
|
|
|
|
@log_exceptions
|
|
async def main_loop(self):
|
|
tasks = [self.pay_pending_invoices()]
|
|
if self.is_server:
|
|
# nostr and http are not mutually exclusive
|
|
if self.config.SWAPSERVER_PORT:
|
|
tasks.append(self.http_server.run())
|
|
if self.config.NOSTR_RELAYS:
|
|
tasks.append(self.run_nostr_server())
|
|
|
|
async with self.taskgroup as group:
|
|
for task in tasks:
|
|
await group.spawn(task)
|
|
|
|
async def stop(self):
|
|
await self.taskgroup.cancel_remaining()
|
|
|
|
def create_transport(self) -> 'SwapServerTransport':
|
|
from .lnutil import generate_random_keypair
|
|
if self.config.SWAPSERVER_URL:
|
|
return HttpTransport(self.config, self)
|
|
else:
|
|
keypair = self.lnworker.nostr_keypair if self.is_server else generate_random_keypair()
|
|
return NostrTransport(self.config, self, keypair)
|
|
|
|
async def set_nostr_proof_of_work(self) -> None:
|
|
current_pow = get_nostr_ann_pow_amount(
|
|
self.lnworker.nostr_keypair.pubkey[1:],
|
|
self.config.SWAPSERVER_ANN_POW_NONCE
|
|
)
|
|
if current_pow >= self.config.SWAPSERVER_POW_TARGET:
|
|
self.logger.debug(f"Reusing existing PoW nonce for nostr announcement.")
|
|
return
|
|
|
|
self.logger.info(f"Generating PoW for nostr announcement. Target: {self.config.SWAPSERVER_POW_TARGET}")
|
|
nonce, pow_amount = await gen_nostr_ann_pow(
|
|
self.lnworker.nostr_keypair.pubkey[1:], # pubkey without prefix
|
|
self.config.SWAPSERVER_POW_TARGET,
|
|
)
|
|
self.logger.debug(f"Found {pow_amount} bits of work for Nostr announcement.")
|
|
self.config.SWAPSERVER_ANN_POW_NONCE = nonce
|
|
|
|
async def pay_invoice(self, key):
|
|
self.logger.info(f'trying to pay invoice {key}')
|
|
self.invoices_to_pay[key] = 1000000000000 # lock
|
|
try:
|
|
invoice = self.wallet.get_invoice(key)
|
|
success, log = await self.lnworker.pay_invoice(invoice)
|
|
except Exception as e:
|
|
self.logger.info(f'exception paying {key}, will not retry')
|
|
self.invoices_to_pay.pop(key, None)
|
|
return
|
|
if not success:
|
|
self.logger.info(f'failed to pay {key}, will retry in 10 minutes')
|
|
self.invoices_to_pay[key] = now() + 600
|
|
else:
|
|
self.logger.info(f'paid invoice {key}')
|
|
self.invoices_to_pay.pop(key, None)
|
|
|
|
async def pay_pending_invoices(self):
|
|
self.invoices_to_pay = {}
|
|
while True:
|
|
await asyncio.sleep(5)
|
|
for key, not_before in list(self.invoices_to_pay.items()):
|
|
if now() < not_before:
|
|
continue
|
|
await self.taskgroup.spawn(self.pay_invoice(key))
|
|
|
|
def cancel_normal_swap(self, swap: SwapData):
|
|
""" we must not have broadcast the funding tx """
|
|
if swap is None:
|
|
return
|
|
if swap.is_funded():
|
|
self.logger.info(f'cannot cancel swap {swap.payment_hash.hex()}: already funded')
|
|
return
|
|
self._fail_swap(swap, 'user cancelled')
|
|
|
|
def _fail_swap(self, swap: SwapData, reason: str):
|
|
self.logger.info(f'failing swap {swap.payment_hash.hex()}: {reason}')
|
|
if not swap.is_reverse and swap.payment_hash in self.lnworker.hold_invoice_callbacks:
|
|
self.lnworker.unregister_hold_invoice(swap.payment_hash)
|
|
payment_secret = self.lnworker.get_payment_secret(swap.payment_hash)
|
|
payment_key = swap.payment_hash + payment_secret
|
|
e = OnionRoutingFailure(code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, data=b'')
|
|
self.lnworker.save_forwarding_failure(payment_key.hex(), failure_message=e)
|
|
self.lnwatcher.remove_callback(swap.lockup_address)
|
|
if not swap.is_funded():
|
|
with self.swaps_lock:
|
|
if self._swaps.pop(swap.payment_hash.hex(), None) is None:
|
|
self.logger.debug(f"swap {swap.payment_hash.hex()} has already been deleted.")
|
|
# TODO clean-up other swaps dicts, i.e. undo _add_or_reindex_swap()
|
|
|
|
@classmethod
|
|
def extract_preimage(cls, swap: SwapData, claim_tx: Transaction) -> Optional[bytes]:
|
|
for txin in claim_tx.inputs():
|
|
witness = txin.witness_elements()
|
|
if not witness:
|
|
# tx may be unsigned
|
|
continue
|
|
preimage = witness[1]
|
|
if sha256(preimage) == swap.payment_hash:
|
|
return preimage
|
|
return None
|
|
|
|
@log_exceptions
|
|
async def _claim_swap(self, swap: SwapData) -> None:
|
|
assert self.network
|
|
assert self.lnwatcher
|
|
if not self.lnwatcher.adb.is_up_to_date():
|
|
return
|
|
current_height = self.network.get_local_height()
|
|
remaining_time = swap.locktime - current_height
|
|
txos = self.lnwatcher.adb.get_addr_outputs(swap.lockup_address)
|
|
|
|
for txin in txos.values():
|
|
if swap.is_reverse and txin.value_sats() < swap.onchain_amount:
|
|
# amount too low, we must not reveal the preimage
|
|
continue
|
|
break
|
|
else:
|
|
# swap not funded.
|
|
txin = None
|
|
# if it is a normal swap, we might have double spent the funding tx
|
|
# in that case we need to fail the HTLCs
|
|
if remaining_time <= 0:
|
|
self._fail_swap(swap, 'expired')
|
|
|
|
if txin:
|
|
# the swap is funded
|
|
# note: swap.funding_txid can change due to RBF, it will get updated here:
|
|
swap.funding_txid = txin.prevout.txid.hex()
|
|
swap._funding_prevout = txin.prevout
|
|
self._add_or_reindex_swap(swap) # to update _swaps_by_funding_outpoint
|
|
funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex())
|
|
spent_height = txin.spent_height
|
|
# set spending_txid (even if tx is local), for GUI grouping
|
|
swap.spending_txid = txin.spent_txid
|
|
# discard local spenders
|
|
if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
|
|
spent_height = None
|
|
if spent_height is not None:
|
|
if spent_height > 0:
|
|
if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY:
|
|
self.logger.info(f'stop watching swap {swap.lockup_address}')
|
|
self.lnwatcher.remove_callback(swap.lockup_address)
|
|
swap.is_redeemed = True
|
|
|
|
if not swap.is_reverse:
|
|
if swap.preimage is None and spent_height is not None:
|
|
# extract the preimage, add it to lnwatcher
|
|
claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid)
|
|
preimage = self.extract_preimage(swap, claim_tx)
|
|
if preimage:
|
|
swap.preimage = preimage
|
|
self.logger.info(f'found preimage: {preimage.hex()}')
|
|
self.lnworker.save_preimage(swap.payment_hash, preimage)
|
|
else:
|
|
# this is our refund tx
|
|
if spent_height > 0:
|
|
self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}')
|
|
self._fail_swap(swap, 'refund tx confirmed')
|
|
return
|
|
if remaining_time > 0:
|
|
# too early for refund
|
|
return
|
|
if swap.preimage:
|
|
# we have been paid. do not try to get refund.
|
|
return
|
|
else:
|
|
if swap.preimage is None:
|
|
swap.preimage = self.lnworker.get_preimage(swap.payment_hash)
|
|
if swap.preimage is None:
|
|
if funding_height.conf <= 0:
|
|
return
|
|
key = swap.payment_hash.hex()
|
|
if remaining_time <= MIN_LOCKTIME_DELTA:
|
|
if key in self.invoices_to_pay:
|
|
# fixme: should consider cltv of ln payment
|
|
self.logger.info(f'locktime too close {key} {remaining_time}')
|
|
self.invoices_to_pay.pop(key, None)
|
|
return
|
|
if key not in self.invoices_to_pay:
|
|
self.invoices_to_pay[key] = 0
|
|
return
|
|
|
|
if self.network.config.TEST_SWAPSERVER_REFUND:
|
|
# for testing: do not create claim tx
|
|
return
|
|
|
|
if spent_height is not None and spent_height > 0:
|
|
return
|
|
txin, locktime = self.create_claim_txin(txin=txin, swap=swap)
|
|
# note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation
|
|
name = 'swap claim' if swap.is_reverse else 'swap refund'
|
|
can_be_batched = True
|
|
sweep_info = SweepInfo(
|
|
txin=txin,
|
|
cltv_abs=locktime,
|
|
txout=None,
|
|
name=name,
|
|
can_be_batched=can_be_batched,
|
|
)
|
|
try:
|
|
self.wallet.txbatcher.add_sweep_input('swaps', sweep_info)
|
|
except BelowDustLimit:
|
|
self.logger.info('utxo value below dust threshold')
|
|
return
|
|
except NoDynamicFeeEstimates:
|
|
self.logger.info('got NoDynamicFeeEstimates')
|
|
return
|
|
|
|
def get_fee_for_txbatcher(self):
|
|
return self._get_tx_fee(self.config.FEE_POLICY_SWAPS)
|
|
|
|
def _get_tx_fee(self, policy_descriptor: str):
|
|
fee_policy = FeePolicy(policy_descriptor)
|
|
return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True)
|
|
|
|
def get_swap(self, payment_hash: bytes) -> Optional[SwapData]:
|
|
# for history
|
|
swap = self._swaps.get(payment_hash.hex())
|
|
if swap:
|
|
return swap
|
|
payment_hash = self._prepayments.get(payment_hash)
|
|
if payment_hash:
|
|
return self._swaps.get(payment_hash.hex())
|
|
return None
|
|
|
|
def add_lnwatcher_callback(self, swap: SwapData) -> None:
|
|
callback = lambda: self._claim_swap(swap)
|
|
self.lnwatcher.add_callback(swap.lockup_address, callback)
|
|
|
|
async def hold_invoice_callback(self, payment_hash: bytes) -> None:
|
|
# note: this assumes the wallet has been unlocked
|
|
key = payment_hash.hex()
|
|
if swap := self._swaps.get(key):
|
|
if not swap.is_funded():
|
|
output = self.create_funding_output(swap)
|
|
self.wallet.txbatcher.add_payment_output('swaps', output)
|
|
swap._payment_pending = True
|
|
else:
|
|
self.logger.info(f'key not in swaps {key}')
|
|
|
|
def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
|
|
""" server method """
|
|
assert lightning_amount_sat
|
|
locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
|
|
our_privkey = os.urandom(32)
|
|
our_pubkey = ECPrivkey(our_privkey).get_public_key_bytes(compressed=True)
|
|
onchain_amount_sat = self._get_recv_amount(lightning_amount_sat, is_reverse=True) # what the client is going to receive
|
|
if not onchain_amount_sat:
|
|
raise Exception("no onchain amount")
|
|
redeem_script = construct_script(
|
|
WITNESS_TEMPLATE_REVERSE_SWAP,
|
|
values={1:32, 5:ripemd(payment_hash), 7:their_pubkey, 10:locktime, 13:our_pubkey}
|
|
)
|
|
swap, invoice, prepay_invoice = self.add_normal_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
onchain_amount_sat=onchain_amount_sat,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
payment_hash=payment_hash,
|
|
our_privkey=our_privkey,
|
|
prepay=True,
|
|
)
|
|
self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
|
|
return swap, invoice, prepay_invoice
|
|
|
|
def add_normal_swap(
|
|
self, *,
|
|
redeem_script: bytes,
|
|
locktime: int, # onchain
|
|
onchain_amount_sat: int,
|
|
lightning_amount_sat: int,
|
|
payment_hash: bytes,
|
|
our_privkey: bytes,
|
|
prepay: bool,
|
|
channels: Optional[Sequence['Channel']] = None,
|
|
min_final_cltv_expiry_delta: Optional[int] = None,
|
|
) -> Tuple[SwapData, str, Optional[str]]:
|
|
"""creates a hold invoice"""
|
|
if prepay:
|
|
prepay_amount_sat = self.get_fee_for_txbatcher() * 2
|
|
invoice_amount_sat = lightning_amount_sat - prepay_amount_sat
|
|
else:
|
|
invoice_amount_sat = lightning_amount_sat
|
|
|
|
_, invoice = self.lnworker.get_bolt11_invoice(
|
|
payment_hash=payment_hash,
|
|
amount_msat=invoice_amount_sat * 1000,
|
|
message='Submarine swap',
|
|
expiry=300,
|
|
fallback_address=None,
|
|
channels=channels,
|
|
min_final_cltv_expiry_delta=min_final_cltv_expiry_delta,
|
|
)
|
|
# add payment info to lnworker
|
|
self.lnworker.add_payment_info_for_hold_invoice(payment_hash, invoice_amount_sat)
|
|
|
|
if prepay:
|
|
prepay_hash = self.lnworker.create_payment_info(amount_msat=prepay_amount_sat*1000)
|
|
_, prepay_invoice = self.lnworker.get_bolt11_invoice(
|
|
payment_hash=prepay_hash,
|
|
amount_msat=prepay_amount_sat * 1000,
|
|
message='Submarine swap mining fees',
|
|
expiry=300,
|
|
fallback_address=None,
|
|
channels=channels,
|
|
min_final_cltv_expiry_delta=min_final_cltv_expiry_delta,
|
|
)
|
|
self.lnworker.bundle_payments([payment_hash, prepay_hash])
|
|
self._prepayments[prepay_hash] = payment_hash
|
|
else:
|
|
prepay_invoice = None
|
|
prepay_hash = None
|
|
|
|
lockup_address = script_to_p2wsh(redeem_script)
|
|
receive_address = self.wallet.get_receiving_address()
|
|
swap = SwapData(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=our_privkey,
|
|
preimage=None,
|
|
prepay_hash=prepay_hash,
|
|
lockup_address=lockup_address,
|
|
onchain_amount=onchain_amount_sat,
|
|
receive_address=receive_address,
|
|
lightning_amount=lightning_amount_sat,
|
|
is_reverse=False,
|
|
is_redeemed=False,
|
|
funding_txid=None,
|
|
spending_txid=None,
|
|
)
|
|
swap._payment_hash = payment_hash
|
|
self._add_or_reindex_swap(swap)
|
|
self.add_lnwatcher_callback(swap)
|
|
return swap, invoice, prepay_invoice
|
|
|
|
def create_reverse_swap(self, *, lightning_amount_sat: int, their_pubkey: bytes) -> SwapData:
|
|
""" server method. """
|
|
assert lightning_amount_sat is not None
|
|
locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
|
|
privkey = os.urandom(32)
|
|
our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
|
|
onchain_amount_sat = self._get_send_amount(lightning_amount_sat, is_reverse=False)
|
|
if not onchain_amount_sat:
|
|
raise Exception("no onchain amount")
|
|
preimage = os.urandom(32)
|
|
payment_hash = sha256(preimage)
|
|
redeem_script = construct_script(
|
|
WITNESS_TEMPLATE_REVERSE_SWAP,
|
|
values={1:32, 5:ripemd(payment_hash), 7:our_pubkey, 10:locktime, 13:their_pubkey}
|
|
)
|
|
swap = self.add_reverse_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=privkey,
|
|
preimage=preimage,
|
|
payment_hash=payment_hash,
|
|
prepay_hash=None,
|
|
onchain_amount_sat=onchain_amount_sat,
|
|
lightning_amount_sat=lightning_amount_sat)
|
|
return swap
|
|
|
|
def add_reverse_swap(
|
|
self,
|
|
*,
|
|
redeem_script: bytes,
|
|
locktime: int, # onchain
|
|
privkey: bytes,
|
|
lightning_amount_sat: int,
|
|
onchain_amount_sat: int,
|
|
preimage: bytes,
|
|
payment_hash: bytes,
|
|
prepay_hash: Optional[bytes] = None,
|
|
) -> SwapData:
|
|
lockup_address = script_to_p2wsh(redeem_script)
|
|
receive_address = self.wallet.get_receiving_address()
|
|
swap = SwapData(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=privkey,
|
|
preimage=preimage,
|
|
prepay_hash=prepay_hash,
|
|
lockup_address=lockup_address,
|
|
onchain_amount=onchain_amount_sat,
|
|
receive_address=receive_address,
|
|
lightning_amount=lightning_amount_sat,
|
|
is_reverse=True,
|
|
is_redeemed=False,
|
|
funding_txid=None,
|
|
spending_txid=None,
|
|
)
|
|
if prepay_hash:
|
|
self._prepayments[prepay_hash] = payment_hash
|
|
swap._payment_hash = payment_hash
|
|
self._add_or_reindex_swap(swap)
|
|
self.add_lnwatcher_callback(swap)
|
|
return swap
|
|
|
|
def server_add_swap_invoice(self, request):
|
|
invoice = request['invoice']
|
|
invoice = Invoice.from_bech32(invoice)
|
|
key = invoice.rhash
|
|
payment_hash = bytes.fromhex(key)
|
|
with self.swaps_lock:
|
|
assert key in self._swaps
|
|
swap = self._swaps[key]
|
|
assert swap.lightning_amount == int(invoice.get_amount_sat())
|
|
self.wallet.save_invoice(invoice)
|
|
# check that we have the preimage
|
|
assert sha256(swap.preimage) == payment_hash
|
|
assert swap.spending_txid is None
|
|
self.invoices_to_pay[key] = 0
|
|
return {}
|
|
|
|
async def normal_swap(
|
|
self,
|
|
*,
|
|
transport: 'SwapServerTransport',
|
|
lightning_amount_sat: int,
|
|
expected_onchain_amount_sat: int,
|
|
password,
|
|
tx: PartialTransaction = None,
|
|
channels = None,
|
|
) -> Optional[str]:
|
|
"""send on-chain BTC, receive on Lightning
|
|
|
|
Old (removed) flow:
|
|
- User generates an LN invoice with RHASH, and knows preimage.
|
|
- User creates on-chain output locked to RHASH.
|
|
- Server pays LN invoice. User reveals preimage.
|
|
- Server spends the on-chain output using preimage.
|
|
cltv safety requirement: (onchain_locktime > LN_locktime), otherwise server is vulnerable
|
|
|
|
New flow:
|
|
- User requests swap
|
|
- Server creates preimage, sends RHASH to user
|
|
- User creates hold invoice, sends it to server
|
|
- Server sends HTLC, user holds it
|
|
- User creates on-chain output locked to RHASH
|
|
- Server spends the on-chain output using preimage (revealing the preimage)
|
|
- User fulfills HTLC using preimage
|
|
cltv safety requirement: (onchain_locktime < LN_locktime), otherwise client is vulnerable
|
|
"""
|
|
assert self.network
|
|
assert self.lnwatcher
|
|
swap, invoice = await self.request_normal_swap(
|
|
transport=transport,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
expected_onchain_amount_sat=expected_onchain_amount_sat,
|
|
channels=channels,
|
|
)
|
|
tx = self.create_funding_tx(swap, tx, password=password)
|
|
return await self.wait_for_htlcs_and_broadcast(transport=transport, swap=swap, invoice=invoice, tx=tx)
|
|
|
|
async def request_normal_swap(
|
|
self,
|
|
*,
|
|
transport: 'SwapServerTransport',
|
|
lightning_amount_sat: int,
|
|
expected_onchain_amount_sat: int,
|
|
channels: Optional[Sequence['Channel']] = None,
|
|
) -> Tuple[SwapData, str]:
|
|
await self.is_initialized.wait() # add timeout
|
|
refund_privkey = os.urandom(32)
|
|
refund_pubkey = ECPrivkey(refund_privkey).get_public_key_bytes(compressed=True)
|
|
self.logger.info('requesting preimage hash for swap')
|
|
request_data = {
|
|
"invoiceAmount": lightning_amount_sat,
|
|
"refundPublicKey": refund_pubkey.hex()
|
|
}
|
|
data = await transport.send_request_to_server('createnormalswap', request_data)
|
|
payment_hash = bytes.fromhex(data["preimageHash"])
|
|
onchain_amount = data["expectedAmount"]
|
|
locktime = data["timeoutBlockHeight"]
|
|
lockup_address = data["address"]
|
|
redeem_script = bytes.fromhex(data["redeemScript"])
|
|
# verify redeem_script is built with our pubkey and preimage
|
|
check_reverse_redeem_script(
|
|
redeem_script=redeem_script,
|
|
lockup_address=lockup_address,
|
|
payment_hash=payment_hash,
|
|
locktime=locktime,
|
|
refund_pubkey=refund_pubkey,
|
|
)
|
|
|
|
# check that onchain_amount is not more than what we estimated
|
|
if onchain_amount > expected_onchain_amount_sat:
|
|
raise Exception(f"fswap check failed: onchain_amount is more than what we estimated: "
|
|
f"{onchain_amount} > {expected_onchain_amount_sat}")
|
|
# verify that they are not locking up funds for too long
|
|
if locktime - self.network.get_local_height() > MAX_LOCKTIME_DELTA:
|
|
raise Exception("fswap check failed: locktime too far in future")
|
|
|
|
swap, invoice, _ = self.add_normal_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
onchain_amount_sat=onchain_amount,
|
|
payment_hash=payment_hash,
|
|
our_privkey=refund_privkey,
|
|
prepay=False,
|
|
channels=channels,
|
|
# When the client is doing a normal swap, we create a ln-invoice with larger than usual final_cltv_delta.
|
|
# If the user goes offline after broadcasting the funding tx (but before it is mined and
|
|
# the server claims it), they need to come back online before the held ln-htlc expires (see #8940).
|
|
# If the held ln-htlc expires, and the funding tx got confirmed, the server will have claimed the onchain
|
|
# funds, and the ln-htlc will be timed out onchain (and channel force-closed). i.e. the user loses the swap
|
|
# amount. Increasing the final_cltv_delta the user puts in the invoice extends this critical window.
|
|
min_final_cltv_expiry_delta=MIN_FINAL_CLTV_DELTA_FOR_CLIENT,
|
|
)
|
|
return swap, invoice
|
|
|
|
async def wait_for_htlcs_and_broadcast(
|
|
self,
|
|
*,
|
|
transport: 'SwapServerTransport',
|
|
swap: SwapData,
|
|
invoice: str,
|
|
tx: Transaction,
|
|
) -> Optional[str]:
|
|
await transport.is_connected.wait()
|
|
payment_hash = swap.payment_hash
|
|
refund_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True)
|
|
async def callback(payment_hash):
|
|
# FIXME what if this raises, e.g. TxBroadcastError?
|
|
# We will never retry the hold-invoice-callback.
|
|
await self.broadcast_funding_tx(swap, tx)
|
|
|
|
self.lnworker.register_hold_invoice(payment_hash, callback)
|
|
|
|
# send invoice to server and wait for htlcs
|
|
request_data = {
|
|
"preimageHash": payment_hash.hex(),
|
|
"invoice": invoice,
|
|
"refundPublicKey": refund_pubkey.hex(),
|
|
}
|
|
data = await transport.send_request_to_server('addswapinvoice', request_data)
|
|
# wait for funding tx
|
|
lnaddr = lndecode(invoice)
|
|
while swap.funding_txid is None and not lnaddr.is_expired():
|
|
await asyncio.sleep(0.1)
|
|
return swap.funding_txid
|
|
|
|
def create_funding_output(self, swap: SwapData) -> PartialTxOutput:
|
|
return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
|
|
|
|
def create_funding_tx(
|
|
self,
|
|
swap: SwapData,
|
|
tx: Optional[PartialTransaction],
|
|
*,
|
|
password,
|
|
) -> PartialTransaction:
|
|
# create funding tx
|
|
# use fee policy set by user (not using txbatcher)
|
|
fee_policy = FeePolicy(self.config.FEE_POLICY)
|
|
# note: rbf must not decrease payment
|
|
# this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
|
|
if tx is None:
|
|
funding_output = self.create_funding_output(swap)
|
|
tx = self.wallet.make_unsigned_transaction(
|
|
outputs=[funding_output],
|
|
rbf=True,
|
|
fee_policy=fee_policy,
|
|
)
|
|
else:
|
|
tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address)
|
|
tx.set_rbf(True)
|
|
self.wallet.sign_transaction(tx, password)
|
|
return tx
|
|
|
|
@log_exceptions
|
|
async def request_swap_for_amount(
|
|
self,
|
|
*,
|
|
transport: 'SwapServerTransport',
|
|
onchain_amount: int,
|
|
) -> Optional[Tuple[SwapData, str]]:
|
|
await self.is_initialized.wait()
|
|
lightning_amount_sat = self.get_recv_amount(onchain_amount, is_reverse=False)
|
|
if lightning_amount_sat is None:
|
|
raise SwapServerError(_("Swap amount outside of providers limits") + ":\n"
|
|
+ _("min") + f": {self.get_min_amount()}\n"
|
|
+ _("max") + f": {self.get_provider_max_reverse_amount()}")
|
|
swap, invoice = await self.request_normal_swap(
|
|
transport=transport,
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
expected_onchain_amount_sat=onchain_amount)
|
|
return swap, invoice
|
|
|
|
@log_exceptions
|
|
async def broadcast_funding_tx(self, swap: SwapData, tx: Transaction) -> None:
|
|
swap.funding_txid = tx.txid()
|
|
await self.network.broadcast_transaction(tx)
|
|
|
|
async def reverse_swap(
|
|
self,
|
|
*,
|
|
transport: 'SwapServerTransport',
|
|
lightning_amount_sat: int,
|
|
expected_onchain_amount_sat: int,
|
|
channels: Optional[Sequence['Channel']] = None,
|
|
) -> Optional[str]:
|
|
"""send on Lightning, receive on-chain
|
|
|
|
- User generates preimage, RHASH. Sends RHASH to server.
|
|
- Server creates an LN invoice for RHASH.
|
|
- User pays LN invoice - except server needs to hold the HTLC as preimage is unknown.
|
|
- if the server requested a fee prepayment (using 'minerFeeInvoice'),
|
|
the server will have the preimage for that. The user will send HTLCs for both the main RHASH,
|
|
and for the fee prepayment. Once both MPP sets arrive at the server, the server will fulfill
|
|
the HTLCs for the fee prepayment (before creating the on-chain output).
|
|
- Server creates on-chain output locked to RHASH.
|
|
- User spends on-chain output, revealing preimage.
|
|
- Server fulfills HTLC using preimage.
|
|
|
|
Note: expected_onchain_amount_sat is BEFORE deducting the on-chain claim tx fee.
|
|
"""
|
|
assert self.network
|
|
assert self.lnwatcher
|
|
privkey = os.urandom(32)
|
|
our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
|
|
preimage = os.urandom(32)
|
|
payment_hash = sha256(preimage)
|
|
request_data = {
|
|
"type": "reversesubmarine",
|
|
"pairId": "BTC/BTC",
|
|
"orderSide": "buy",
|
|
"invoiceAmount": lightning_amount_sat,
|
|
"preimageHash": payment_hash.hex(),
|
|
"claimPublicKey": our_pubkey.hex()
|
|
}
|
|
self.logger.debug(f'rswap: sending request for {lightning_amount_sat}')
|
|
data = await transport.send_request_to_server('createswap', request_data)
|
|
invoice = data['invoice']
|
|
fee_invoice = data.get('minerFeeInvoice')
|
|
lockup_address = data['lockupAddress']
|
|
redeem_script = bytes.fromhex(data['redeemScript'])
|
|
locktime = data['timeoutBlockHeight']
|
|
onchain_amount = data["onchainAmount"]
|
|
response_id = data['id']
|
|
self.logger.debug(f'rswap: {response_id=}')
|
|
# verify redeem_script is built with our pubkey and preimage
|
|
check_reverse_redeem_script(
|
|
redeem_script=redeem_script,
|
|
lockup_address=lockup_address,
|
|
payment_hash=payment_hash,
|
|
locktime=locktime,
|
|
refund_pubkey=None,
|
|
claim_pubkey=our_pubkey,
|
|
)
|
|
# check that the onchain amount is what we expected
|
|
if onchain_amount < expected_onchain_amount_sat:
|
|
raise Exception(f"rswap check failed: onchain_amount is less than what we expected: "
|
|
f"{onchain_amount} < {expected_onchain_amount_sat}")
|
|
# verify that we will have enough time to get our tx confirmed
|
|
if locktime - self.network.get_local_height() <= MIN_LOCKTIME_DELTA:
|
|
raise Exception("rswap check failed: locktime too close")
|
|
# verify invoice payment_hash
|
|
lnaddr = self.lnworker._check_bolt11_invoice(invoice)
|
|
invoice_amount = int(lnaddr.get_amount_sat())
|
|
if lnaddr.paymenthash != payment_hash:
|
|
raise Exception("rswap check failed: inconsistent RHASH and invoice")
|
|
# check that the lightning amount is what we requested
|
|
if fee_invoice:
|
|
fee_lnaddr = self.lnworker._check_bolt11_invoice(fee_invoice)
|
|
invoice_amount += fee_lnaddr.get_amount_sat()
|
|
prepay_hash = fee_lnaddr.paymenthash
|
|
else:
|
|
prepay_hash = None
|
|
if int(invoice_amount) != lightning_amount_sat:
|
|
raise Exception(f"rswap check failed: invoice_amount ({invoice_amount}) "
|
|
f"not what we requested ({lightning_amount_sat})")
|
|
# save swap data to wallet file
|
|
swap = self.add_reverse_swap(
|
|
redeem_script=redeem_script,
|
|
locktime=locktime,
|
|
privkey=privkey,
|
|
preimage=preimage,
|
|
payment_hash=payment_hash,
|
|
prepay_hash=prepay_hash,
|
|
onchain_amount_sat=onchain_amount,
|
|
lightning_amount_sat=lightning_amount_sat)
|
|
# initiate fee payment.
|
|
if fee_invoice:
|
|
fee_invoice_obj = Invoice.from_bech32(fee_invoice)
|
|
asyncio.ensure_future(self.lnworker.pay_invoice(fee_invoice_obj))
|
|
# we return if we detect funding
|
|
async def wait_for_funding(swap):
|
|
while swap.funding_txid is None:
|
|
await asyncio.sleep(1)
|
|
# initiate main payment
|
|
invoice_obj = Invoice.from_bech32(invoice)
|
|
tasks = [asyncio.create_task(self.lnworker.pay_invoice(invoice_obj, channels=channels)), asyncio.create_task(wait_for_funding(swap))]
|
|
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
|
return swap.funding_txid
|
|
|
|
def _add_or_reindex_swap(self, swap: SwapData) -> None:
|
|
with self.swaps_lock:
|
|
if swap.payment_hash.hex() not in self._swaps:
|
|
self._swaps[swap.payment_hash.hex()] = swap
|
|
if swap._funding_prevout:
|
|
self._swaps_by_funding_outpoint[swap._funding_prevout] = swap
|
|
self._swaps_by_lockup_address[swap.lockup_address] = swap
|
|
|
|
def server_update_pairs(self) -> None:
|
|
""" for server """
|
|
self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000 # type: ignore
|
|
self._min_amount = 20000
|
|
oc_balance_sat: int = self.wallet.get_spendable_balance_sat()
|
|
max_forward: int = min(int(self.lnworker.num_sats_can_receive()), oc_balance_sat, 10000000)
|
|
max_reverse: int = min(int(self.lnworker.num_sats_can_send()), 10000000)
|
|
self._max_forward: int = self._keep_leading_digits(max_forward, 2)
|
|
self._max_reverse: int = self._keep_leading_digits(max_reverse, 2)
|
|
new_mining_fee = self.get_fee_for_txbatcher()
|
|
if self.mining_fee is None \
|
|
or abs(self.mining_fee - new_mining_fee) / self.mining_fee > 0.1:
|
|
self.mining_fee = new_mining_fee
|
|
|
|
@staticmethod
|
|
def _keep_leading_digits(num: int, digits: int) -> int:
|
|
"""Reduces precision of num to `digits` leading digits."""
|
|
if num <= 0:
|
|
return 0
|
|
num_str = str(num)
|
|
zeroed_num_str = f"{num_str[:digits]}{(len(num_str[digits:])) * '0'}"
|
|
return int(zeroed_num_str)
|
|
|
|
def update_pairs(self, pairs: SwapFees):
|
|
self.logger.info(f'updating fees {pairs}')
|
|
self.mining_fee = pairs.mining_fee
|
|
self.percentage = pairs.percentage
|
|
self._min_amount = pairs.min_amount
|
|
self._max_forward = pairs.max_forward
|
|
self._max_reverse = pairs.max_reverse
|
|
self.trigger_pairs_updated_threadsafe()
|
|
|
|
def trigger_pairs_updated_threadsafe(self):
|
|
def trigger():
|
|
self.is_initialized.set()
|
|
self.pairs_updated.set()
|
|
self.pairs_updated.clear()
|
|
|
|
run_sync_function_on_asyncio_thread(trigger, block=True)
|
|
|
|
def get_provider_max_forward_amount(self) -> int:
|
|
"""in sat"""
|
|
return self._max_forward
|
|
|
|
def get_provider_max_reverse_amount(self) -> int:
|
|
"""in sat"""
|
|
return self._max_reverse
|
|
|
|
def get_min_amount(self) -> int:
|
|
"""in satoshis"""
|
|
return self._min_amount
|
|
|
|
def check_invoice_amount(self, x, is_reverse: bool) -> bool:
|
|
if is_reverse:
|
|
max_amount = self.get_provider_max_forward_amount()
|
|
else:
|
|
max_amount = self.get_provider_max_reverse_amount()
|
|
return self.get_min_amount() <= x <= max_amount
|
|
|
|
def _get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
"""For a given swap direction and amount we send, returns how much we will receive.
|
|
|
|
Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
|
|
In the reverse direction, the result matches what the swap server returns as response["onchainAmount"].
|
|
"""
|
|
if send_amount is None:
|
|
return None
|
|
x = Decimal(send_amount)
|
|
percentage = Decimal(self.percentage)
|
|
if is_reverse:
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
# see/ref:
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L948
|
|
percentage_fee = math.ceil(percentage * x / 100)
|
|
base_fee = self.mining_fee
|
|
x -= percentage_fee + base_fee
|
|
x = math.floor(x)
|
|
if x < dust_threshold():
|
|
return None
|
|
else:
|
|
x -= self.mining_fee
|
|
percentage_fee = math.ceil(x * percentage / (100 + percentage))
|
|
x -= percentage_fee
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
x = int(x)
|
|
return x
|
|
|
|
def _get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
"""For a given swap direction and amount we want to receive, returns how much we will need to send.
|
|
|
|
Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
|
|
In the forward direction, the result matches what the swap server returns as response["expectedAmount"].
|
|
"""
|
|
if not recv_amount:
|
|
return None
|
|
x = Decimal(recv_amount)
|
|
percentage = Decimal(self.percentage)
|
|
if is_reverse:
|
|
# see/ref:
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L928
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L958
|
|
base_fee = self.mining_fee
|
|
x += base_fee
|
|
x = math.ceil(x / ((100 - percentage) / 100))
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
else:
|
|
if not self.check_invoice_amount(x, is_reverse):
|
|
return None
|
|
# see/ref:
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L708
|
|
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/rates/FeeProvider.ts#L90
|
|
percentage_fee = math.ceil(percentage * x / 100)
|
|
x += percentage_fee + self.mining_fee
|
|
x = int(x)
|
|
return x
|
|
|
|
def get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
# first, add percentage fee
|
|
recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
|
|
# sanity check calculation can be inverted
|
|
if recv_amount is not None:
|
|
inverted_send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
|
|
# accept off-by ones as amt_rcv = recv_amt(send_amt(amt_rcv)) only up to +-1
|
|
if abs(send_amount - inverted_send_amount) > 1:
|
|
raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
|
|
f"send_amount={send_amount} -> recv_amount={recv_amount} -> inverted_send_amount={inverted_send_amount}")
|
|
# second, add on-chain claim tx fee
|
|
if is_reverse and recv_amount is not None:
|
|
recv_amount -= self.get_fee_for_txbatcher()
|
|
return recv_amount
|
|
|
|
def get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
|
|
# first, add on-chain claim tx fee
|
|
if is_reverse and recv_amount is not None:
|
|
recv_amount += self.get_fee_for_txbatcher()
|
|
# second, add percentage fee
|
|
send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
|
|
# sanity check calculation can be inverted
|
|
if send_amount is not None:
|
|
inverted_recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
|
|
if recv_amount != inverted_recv_amount:
|
|
raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
|
|
f"recv_amount={recv_amount} -> send_amount={send_amount} -> inverted_recv_amount={inverted_recv_amount}")
|
|
return send_amount
|
|
|
|
def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable[SwapData]:
|
|
swaps = []
|
|
for txout_idx, _txo in enumerate(tx.outputs()):
|
|
prevout = TxOutpoint(txid=bytes.fromhex(tx.txid()), out_idx=txout_idx)
|
|
if swap := self._swaps_by_funding_outpoint.get(prevout):
|
|
swaps.append(swap)
|
|
return swaps
|
|
|
|
def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[Tuple[int, SwapData]]:
|
|
swaps = []
|
|
for i, txin in enumerate(tx.inputs()):
|
|
if swap := self.get_swap_by_claim_txin(txin):
|
|
swaps.append((i, swap))
|
|
return swaps
|
|
|
|
def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]:
|
|
return self._swaps_by_funding_outpoint.get(txin.prevout)
|
|
|
|
def is_lockup_address_for_a_swap(self, addr: str) -> bool:
|
|
return bool(self._swaps_by_lockup_address.get(addr))
|
|
|
|
@classmethod
|
|
def add_txin_info(cls, swap, txin: PartialTxInput) -> None:
|
|
"""Add some info to a claim txin.
|
|
note: even without signing, this is useful for tx size estimation.
|
|
"""
|
|
preimage = swap.preimage if swap.is_reverse else 0
|
|
witness_script = swap.redeem_script
|
|
txin.script_sig = b''
|
|
txin.witness_script = witness_script
|
|
sig_dummy = b'\x00' * 71 # DER-encoded ECDSA sig, with low S and low R
|
|
witness = [sig_dummy, preimage, witness_script]
|
|
txin.witness_sizehint = len(construct_witness(witness))
|
|
txin.nsequence = 1 if swap.is_reverse else 0xffffffff - 2
|
|
|
|
@classmethod
|
|
def create_claim_txin(
|
|
cls,
|
|
*,
|
|
txin: PartialTxInput,
|
|
swap: SwapData,
|
|
) -> Tuple[PartialTxInput, Optional[int]]:
|
|
if swap.is_reverse: # successful reverse swap
|
|
locktime = None
|
|
# preimage will be set in sign_tx
|
|
else: # timing out forward swap
|
|
locktime = swap.locktime
|
|
cls.add_txin_info(swap, txin)
|
|
txin.privkey = swap.privkey
|
|
def make_witness(sig):
|
|
# preimae not known yet
|
|
preimage = swap.preimage if swap.is_reverse else 0
|
|
witness_script = swap.redeem_script
|
|
return construct_witness([sig, preimage, witness_script])
|
|
txin.make_witness = make_witness
|
|
return txin, locktime
|
|
|
|
def client_max_amount_forward_swap(self) -> Optional[int]:
|
|
""" returns None if we cannot swap """
|
|
max_swap_amt_ln = self.get_provider_max_reverse_amount()
|
|
if max_swap_amt_ln is None:
|
|
return None
|
|
max_recv_amt_ln = int(self.lnworker.num_sats_can_receive())
|
|
max_amt_ln = int(min(max_swap_amt_ln, max_recv_amt_ln))
|
|
max_amt_oc = self.get_send_amount(max_amt_ln, is_reverse=False) or 0
|
|
min_amt_oc = self.get_send_amount(self.get_min_amount(), is_reverse=False) or 0
|
|
return max_amt_oc if max_amt_oc >= min_amt_oc else None
|
|
|
|
def client_max_amount_reverse_swap(self) -> Optional[int]:
|
|
"""Returns None if swap is not possible"""
|
|
provider_max = self.get_provider_max_forward_amount()
|
|
max_ln_send = int(self.lnworker.num_sats_can_send())
|
|
max_swap_size = min(max_ln_send, provider_max)
|
|
if max_swap_size < self.get_min_amount():
|
|
return None
|
|
return max_swap_size
|
|
|
|
def server_create_normal_swap(self, request):
|
|
# normal for client, reverse for server
|
|
#request = await r.json()
|
|
lightning_amount_sat = request['invoiceAmount']
|
|
their_pubkey = bytes.fromhex(request['refundPublicKey'])
|
|
assert len(their_pubkey) == 33
|
|
swap = self.create_reverse_swap(
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
their_pubkey=their_pubkey,
|
|
)
|
|
response = {
|
|
"id": swap.payment_hash.hex(),
|
|
'preimageHash': swap.payment_hash.hex(),
|
|
"acceptZeroConf": False,
|
|
"expectedAmount": swap.onchain_amount,
|
|
"timeoutBlockHeight": swap.locktime,
|
|
"address": swap.lockup_address,
|
|
"redeemScript": swap.redeem_script.hex(),
|
|
}
|
|
return response
|
|
|
|
def server_create_swap(self, request):
|
|
# reverse for client, forward for server
|
|
# requesting a normal swap (old protocol) will raise an exception
|
|
#request = await r.json()
|
|
req_type = request['type']
|
|
assert request['pairId'] == 'BTC/BTC'
|
|
if req_type == 'reversesubmarine':
|
|
lightning_amount_sat=request['invoiceAmount']
|
|
payment_hash=bytes.fromhex(request['preimageHash'])
|
|
their_pubkey=bytes.fromhex(request['claimPublicKey'])
|
|
assert len(payment_hash) == 32
|
|
assert len(their_pubkey) == 33
|
|
swap, invoice, prepay_invoice = self.create_normal_swap(
|
|
lightning_amount_sat=lightning_amount_sat,
|
|
payment_hash=payment_hash,
|
|
their_pubkey=their_pubkey
|
|
)
|
|
response = {
|
|
'id': payment_hash.hex(),
|
|
'invoice': invoice,
|
|
'minerFeeInvoice': prepay_invoice,
|
|
'lockupAddress': swap.lockup_address,
|
|
'redeemScript': swap.redeem_script.hex(),
|
|
'timeoutBlockHeight': swap.locktime,
|
|
"onchainAmount": swap.onchain_amount,
|
|
}
|
|
elif req_type == 'submarine':
|
|
raise Exception('Deprecated API. Please upgrade your version of Electrum')
|
|
else:
|
|
raise Exception('unsupported request type:' + req_type)
|
|
return response
|
|
|
|
def get_groups_for_onchain_history(self):
|
|
current_height = self.wallet.adb.get_local_height()
|
|
d = {}
|
|
# add info about submarine swaps
|
|
settled_payments = self.lnworker.get_payments(status='settled')
|
|
with self.swaps_lock:
|
|
swaps_items = list(self._swaps.items())
|
|
for payment_hash_hex, swap in swaps_items:
|
|
txid = swap.spending_txid if swap.is_reverse else swap.funding_txid
|
|
if txid is None:
|
|
continue
|
|
payment_hash = bytes.fromhex(payment_hash_hex)
|
|
if payment_hash in settled_payments:
|
|
plist = settled_payments[payment_hash]
|
|
info = self.lnworker.get_payment_info(payment_hash)
|
|
direction, amount_msat, fee_msat, timestamp = self.lnworker.get_payment_value(info, plist)
|
|
else:
|
|
amount_msat = 0
|
|
|
|
if swap.is_reverse:
|
|
group_label = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount)
|
|
else:
|
|
group_label = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount)
|
|
|
|
label = _('Claim transaction') if swap.is_reverse else _('Funding transaction')
|
|
delta = current_height - swap.locktime
|
|
if self.wallet.adb.is_mine(swap.lockup_address):
|
|
tx_height = self.wallet.adb.get_tx_height(swap.funding_txid)
|
|
if swap.is_reverse and tx_height.height <= 0:
|
|
label += ' (%s)' % _('waiting for funding tx confirmation')
|
|
if not swap.is_reverse and not swap.is_redeemed and swap.spending_txid is None and delta < 0:
|
|
label += f' (refundable in {-delta} blocks)' # fixme: only if unspent
|
|
d[txid] = {
|
|
'group_id': txid,
|
|
'label': label,
|
|
'group_label': group_label,
|
|
}
|
|
if not swap.is_reverse:
|
|
claim_tx = self.lnwatcher.adb.get_transaction(swap.spending_txid)
|
|
if claim_tx and not self.extract_preimage(swap, claim_tx):
|
|
# if the spending_tx is in the wallet, this will add it
|
|
# to the group (see wallet.get_full_history)
|
|
d[swap.spending_txid] = {
|
|
'group_id': txid,
|
|
'group_label': group_label,
|
|
'label': _('Refund transaction'),
|
|
}
|
|
self.wallet._accounting_addresses.add(swap.lockup_address)
|
|
return d
|
|
|
|
def get_group_id_for_payment_hash(self, payment_hash: bytes) -> Optional[str]:
|
|
# add group_id to swap transactions
|
|
swap = self.get_swap(payment_hash)
|
|
if swap:
|
|
return swap.spending_txid if swap.is_reverse else swap.funding_txid
|
|
return None
|
|
|
|
def get_pending_swaps(self) -> List[SwapData]:
|
|
"""Returns a list of swaps with unconfirmed funding tx (which require us to stay online)."""
|
|
pending_swaps: List[SwapData] = []
|
|
with self.swaps_lock:
|
|
swaps = list(self._swaps.values())
|
|
for swap in swaps:
|
|
if swap.is_redeemed:
|
|
# adb data might have been removed after is_redeemed was set.
|
|
# in that case lnwatcher will no longer fetch the spending tx
|
|
# and adb will return TX_HEIGHT_LOCAL
|
|
continue
|
|
# note: adb.get_tx_height returns TX_HEIGHT_LOCAL if the txid is unknown
|
|
funding_height = self.lnworker.wallet.adb.get_tx_height(swap.funding_txid).height
|
|
spending_height = self.lnworker.wallet.adb.get_tx_height(swap.spending_txid).height
|
|
if funding_height > TX_HEIGHT_LOCAL and spending_height <= TX_HEIGHT_LOCAL:
|
|
pending_swaps.append(swap)
|
|
return pending_swaps
|
|
|
|
|
|
class SwapServerTransport(Logger):
|
|
|
|
def __init__(self, *, config: 'SimpleConfig', sm: 'SwapManager'):
|
|
Logger.__init__(self)
|
|
self.sm = sm
|
|
self.network = sm.network
|
|
self.config = config
|
|
self.is_connected = asyncio.Event()
|
|
self.connect_timeout = 10 if self.uses_proxy else 5
|
|
|
|
def __enter__(self):
|
|
pass
|
|
|
|
def __exit__(self, ex_type, ex, tb):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
pass
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict:
|
|
pass
|
|
|
|
async def get_pairs(self) -> None:
|
|
pass
|
|
|
|
@property
|
|
def uses_proxy(self):
|
|
return self.network.proxy and self.network.proxy.enabled
|
|
|
|
|
|
class HttpTransport(SwapServerTransport):
|
|
|
|
def __init__(self, config, sm):
|
|
SwapServerTransport.__init__(self, config=config, sm=sm)
|
|
self.api_url = config.SWAPSERVER_URL
|
|
self.is_connected.set()
|
|
|
|
def __enter__(self):
|
|
asyncio.run_coroutine_threadsafe(self.get_pairs(), self.network.asyncio_loop)
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex, tb):
|
|
pass
|
|
|
|
async def __aenter__(self):
|
|
asyncio.create_task(self.get_pairs())
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
async def send_request_to_server(self, method, request_data):
|
|
response = await self.network.async_send_http_on_proxy(
|
|
'post' if request_data else 'get',
|
|
self.api_url + '/' + method,
|
|
json=request_data,
|
|
timeout=30)
|
|
return json.loads(response)
|
|
|
|
async def get_pairs(self) -> None:
|
|
"""Might raise SwapServerError."""
|
|
try:
|
|
response = await self.send_request_to_server('getpairs', None)
|
|
except aiohttp.ClientError as e:
|
|
self.logger.error(f"Swap server errored: {e!r}")
|
|
raise SwapServerError() from e
|
|
assert response.get('htlcFirst') is True
|
|
fees = response['pairs']['BTC/BTC']['fees']
|
|
limits = response['pairs']['BTC/BTC']['limits']
|
|
pairs = SwapFees(
|
|
percentage=fees['percentage'],
|
|
mining_fee=fees['minerFees']['baseAsset']['mining_fee'],
|
|
min_amount=limits['minimal'],
|
|
max_forward=limits['max_forward_amount'],
|
|
max_reverse=limits['max_reverse_amount'],
|
|
)
|
|
self.sm.update_pairs(pairs)
|
|
|
|
|
|
class NostrTransport(SwapServerTransport):
|
|
# uses nostr:
|
|
# - to advertise servers
|
|
# - for client-server RPCs (using DMs)
|
|
# (todo: we should use onion messages for that)
|
|
|
|
EPHEMERAL_REQUEST = 25582
|
|
USER_STATUS_NIP38 = 30315
|
|
NOSTR_EVENT_VERSION = 5
|
|
OFFER_UPDATE_INTERVAL_SEC = 60 * 10
|
|
LIQUIDITY_UPDATE_INTERVAL_SEC = 30
|
|
|
|
def __init__(self, config, sm, keypair: Keypair):
|
|
SwapServerTransport.__init__(self, config=config, sm=sm)
|
|
self._offers = {} # type: Dict[str, SwapOffer]
|
|
self.private_key = keypair.privkey
|
|
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
|
|
self.nostr_pubkey = keypair.pubkey.hex()[2:]
|
|
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[str, asyncio.Future]
|
|
self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
|
|
self.relay_manager = None # type: Optional[aionostr.Manager]
|
|
self.taskgroup = OldTaskGroup()
|
|
self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]]
|
|
|
|
def __enter__(self):
|
|
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
|
|
return self
|
|
|
|
def __exit__(self, ex_type, ex, tb):
|
|
fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop)
|
|
fut.result(timeout=5)
|
|
|
|
async def __aenter__(self):
|
|
asyncio.create_task(self.main_loop())
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await wait_for2(self.stop(), timeout=5)
|
|
|
|
@log_exceptions
|
|
async def main_loop(self):
|
|
self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}')
|
|
self.logger.info(f'nostr relays: {self.relays}')
|
|
self.relay_manager = self.get_relay_manager()
|
|
await self.relay_manager.connect()
|
|
connected_relays = self.relay_manager.relays
|
|
self.logger.info(f'connected relays: {[relay.url for relay in connected_relays]}')
|
|
if connected_relays:
|
|
self.is_connected.set()
|
|
if self.sm.is_server:
|
|
tasks = [
|
|
self.check_direct_messages(),
|
|
]
|
|
else:
|
|
tasks = [
|
|
self.check_direct_messages(),
|
|
self.get_pairs(),
|
|
self.update_relays()
|
|
]
|
|
try:
|
|
async with self.taskgroup as group:
|
|
for task in tasks:
|
|
await group.spawn(task)
|
|
except Exception as e:
|
|
self.logger.exception("taskgroup died.")
|
|
finally:
|
|
self.logger.info("taskgroup stopped.")
|
|
|
|
@log_exceptions
|
|
async def stop(self):
|
|
self.logger.info("shutting down nostr transport")
|
|
self.sm.is_initialized.clear()
|
|
await self.taskgroup.cancel_remaining()
|
|
await self.relay_manager.close()
|
|
self.logger.info("nostr transport shut down")
|
|
|
|
@property
|
|
def relays(self):
|
|
our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else []
|
|
if self.sm.is_server:
|
|
return our_relays
|
|
last_swapserver_relays = self._last_swapserver_relays or []
|
|
return list(set(our_relays + last_swapserver_relays))
|
|
|
|
def get_relay_manager(self) -> aionostr.Manager:
|
|
assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
|
|
if not self.relay_manager:
|
|
if self.uses_proxy:
|
|
proxy = make_aiohttp_proxy_connector(self.network.proxy, self.ssl_context)
|
|
else:
|
|
proxy: Optional['ProxyConnector'] = None
|
|
nostr_logger = self.logger.getChild('aionostr')
|
|
nostr_logger.setLevel('INFO') # DEBUG is very verbose with aionostr
|
|
return aionostr.Manager(
|
|
self.relays,
|
|
private_key=self.nostr_private_key,
|
|
log=nostr_logger,
|
|
ssl_context=self.ssl_context,
|
|
proxy=proxy,
|
|
connect_timeout=self.connect_timeout
|
|
)
|
|
return self.relay_manager
|
|
|
|
def get_offer(self, pubkey: str) -> Optional[SwapOffer]:
|
|
return self._offers.get(pubkey)
|
|
|
|
def get_recent_offers(self) -> Sequence[SwapOffer]:
|
|
# filter to fresh timestamps
|
|
now = int(time.time())
|
|
recent_offers = [x for x in self._offers.values() if now - x.timestamp < 3600]
|
|
# sort by proof-of-work
|
|
recent_offers = sorted(recent_offers, key=lambda x: x.pow_bits, reverse=True)
|
|
# cap list size
|
|
recent_offers = recent_offers[:20]
|
|
return recent_offers
|
|
|
|
@ignore_exceptions
|
|
@log_exceptions
|
|
async def publish_offer(self, sm: 'SwapManager') -> None:
|
|
assert self.sm.is_server
|
|
if sm._max_forward < sm._min_amount and sm._max_reverse < sm._min_amount:
|
|
self.logger.warning(f"not publishing swap offer, no liquidity available: {sm._max_forward=}, {sm._max_reverse=}")
|
|
return
|
|
offer = {
|
|
'percentage_fee': sm.percentage,
|
|
'mining_fee': sm.mining_fee,
|
|
'min_amount': sm._min_amount,
|
|
'max_forward_amount': sm._max_forward,
|
|
'max_reverse_amount': sm._max_reverse,
|
|
'relays': sm.config.NOSTR_RELAYS,
|
|
'pow_nonce': hex(sm.config.SWAPSERVER_ANN_POW_NONCE),
|
|
}
|
|
# the first value of a single letter tag is indexed and can be filtered for
|
|
tags = [['d', f'electrum-swapserver-{self.NOSTR_EVENT_VERSION}'],
|
|
['r', 'net:' + constants.net.NET_NAME],
|
|
['expiration', str(int(time.time() + self.OFFER_UPDATE_INTERVAL_SEC + 10))]]
|
|
event_id = await aionostr._add_event(
|
|
self.relay_manager,
|
|
kind=self.USER_STATUS_NIP38,
|
|
tags=tags,
|
|
content=json.dumps(offer),
|
|
private_key=self.nostr_private_key)
|
|
self.logger.info(f"published offer {event_id}")
|
|
|
|
async def send_direct_message(self, pubkey: str, content: str) -> str:
|
|
our_private_key = aionostr.key.PrivateKey(self.private_key)
|
|
recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
|
|
encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
|
|
event_id = await aionostr._add_event(
|
|
self.relay_manager,
|
|
kind=self.EPHEMERAL_REQUEST,
|
|
content=encrypted_msg,
|
|
private_key=self.nostr_private_key,
|
|
tags=[['p', recv_pubkey_hex]],
|
|
)
|
|
return event_id
|
|
|
|
@log_exceptions
|
|
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
|
|
self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}")
|
|
request_data['method'] = method
|
|
server_npub = self.config.SWAPSERVER_NPUB
|
|
event_id = await self.send_direct_message(server_npub, json.dumps(request_data))
|
|
response = await self.dm_replies[event_id]
|
|
if 'error' in response:
|
|
self.logger.warning(f"error from swap server [DO NOT TRUST THIS MESSAGE]: {response['error']}")
|
|
raise SwapServerError()
|
|
return response
|
|
|
|
async def get_pairs(self):
|
|
await self.is_connected.wait()
|
|
query = {
|
|
"kinds": [self.USER_STATUS_NIP38],
|
|
"limit": 10,
|
|
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
|
|
"#r": [f"net:{constants.net.NET_NAME}"],
|
|
"since": int(time.time()) - 60 * 60,
|
|
"until": int(time.time()) + 60 * 60,
|
|
}
|
|
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
|
|
try:
|
|
content = json.loads(event.content)
|
|
tags = {k: v for k, v in event.tags}
|
|
except Exception as e:
|
|
self.logger.debug(f"failed to parse event: {e}")
|
|
continue
|
|
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
|
|
continue
|
|
if tags.get('r') != f"net:{constants.net.NET_NAME}":
|
|
continue
|
|
if (event.created_at > int(time.time()) + 60 * 60
|
|
or event.created_at < int(time.time()) - 60 * 60):
|
|
continue
|
|
# check if this is the most recent event for this pubkey
|
|
pubkey = event.pubkey
|
|
prev_offer = self._offers.get(to_nip19('npub', pubkey))
|
|
if prev_offer and event.created_at <= prev_offer.timestamp:
|
|
continue
|
|
try:
|
|
pow_bits = get_nostr_ann_pow_amount(
|
|
bytes.fromhex(pubkey),
|
|
int(content.get('pow_nonce', "0"), 16)
|
|
)
|
|
except ValueError:
|
|
continue
|
|
if pow_bits < self.config.SWAPSERVER_POW_TARGET:
|
|
self.logger.debug(f"too low pow: {pubkey}: pow: {pow_bits} nonce: {content.get('pow_nonce', 0)}")
|
|
continue
|
|
server_relays = content['relays'].split(',') if 'relays' in content else []
|
|
try:
|
|
pairs = SwapFees(
|
|
percentage=content['percentage_fee'],
|
|
mining_fee=content['mining_fee'],
|
|
min_amount=content['min_amount'],
|
|
max_forward=content['max_forward_amount'],
|
|
max_reverse=content['max_reverse_amount'],
|
|
)
|
|
except Exception:
|
|
self.logger.debug(f"swap fees couldn't be parsed", exc_info=True)
|
|
continue
|
|
offer = SwapOffer(
|
|
pairs=pairs,
|
|
relays=server_relays[:10],
|
|
timestamp=event.created_at,
|
|
server_pubkey=pubkey,
|
|
pow_bits=pow_bits,
|
|
)
|
|
if self.config.SWAPSERVER_NPUB == offer.server_npub:
|
|
self.sm.update_pairs(pairs)
|
|
self._offers[offer.server_npub] = offer
|
|
trigger_callback('swap_offers_changed', self.get_recent_offers())
|
|
# mirror event to other relays
|
|
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))
|
|
|
|
async def update_relays(self):
|
|
"""
|
|
Update the relays when update_pairs is called.
|
|
This ensures we try to connect to the same relays as the ones announced by the swap server.
|
|
"""
|
|
while True:
|
|
previous_relays = self._last_swapserver_relays
|
|
await self.sm.pairs_updated.wait()
|
|
latest_known_relays = self._offers[self.config.SWAPSERVER_NPUB].relays
|
|
if latest_known_relays != previous_relays:
|
|
self.logger.debug(f"swapserver relays changed, updating relay list.")
|
|
# store the latest known relays to a file
|
|
self._store_last_swapserver_relays(latest_known_relays)
|
|
# update the relay manager
|
|
await self.relay_manager.update_relays(self.relays)
|
|
|
|
async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
|
|
"""If the relays of the origin server are different from our relays we rebroadcast the
|
|
event to our relays so it gets spread more widely."""
|
|
if not server_relays:
|
|
return
|
|
rebroadcast_relays = [relay for relay in self.relay_manager.relays if
|
|
relay.url not in server_relays]
|
|
for relay in rebroadcast_relays:
|
|
try:
|
|
res = await relay.add_event(event, check_response=True)
|
|
except Exception as e:
|
|
self.logger.debug(f"failed to rebroadcast event to {relay.url}: {e}")
|
|
continue
|
|
self.logger.debug(f"rebroadcasted event to {relay.url}: {res}")
|
|
|
|
@log_exceptions
|
|
async def check_direct_messages(self):
|
|
privkey = aionostr.key.PrivateKey(self.private_key)
|
|
query = {"kinds": [self.EPHEMERAL_REQUEST], "limit":0, "#p": [self.nostr_pubkey]}
|
|
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
|
|
try:
|
|
content = privkey.decrypt_message(event.content, event.pubkey)
|
|
content = json.loads(content)
|
|
except Exception:
|
|
continue
|
|
content['event_id'] = event.id
|
|
content['event_pubkey'] = event.pubkey
|
|
if 'reply_to' in content:
|
|
self.dm_replies[content['reply_to']].set_result(content)
|
|
elif self.sm.is_server and 'method' in content:
|
|
try:
|
|
await self.handle_request(content)
|
|
except Exception as e:
|
|
self.logger.exception(f"failed to handle request: {content}")
|
|
error_response = json.dumps({
|
|
"error": str(e)[:100],
|
|
"reply_to": event.id,
|
|
})
|
|
await self.send_direct_message(event.pubkey,[], error_response)
|
|
else:
|
|
self.logger.info(f'unknown message {content}')
|
|
|
|
@log_exceptions
|
|
async def handle_request(self, request):
|
|
assert self.sm.is_server
|
|
# todo: remember event_id of already processed requests
|
|
method = request.pop('method')
|
|
event_id = request.pop('event_id')
|
|
event_pubkey = request.pop('event_pubkey')
|
|
self.logger.info(f'handle_request: id={event_id} {method} {request}')
|
|
if method == 'addswapinvoice':
|
|
r = self.sm.server_add_swap_invoice(request)
|
|
elif method == 'createswap':
|
|
r = self.sm.server_create_swap(request)
|
|
elif method == 'createnormalswap':
|
|
r = self.sm.server_create_normal_swap(request)
|
|
else:
|
|
raise Exception(method)
|
|
r['reply_to'] = event_id
|
|
self.logger.debug(f'sending response id={event_id}')
|
|
await self.send_direct_message(event_pubkey, json.dumps(r))
|
|
|
|
def _store_last_swapserver_relays(self, relays: Sequence[str]):
|
|
self._last_swapserver_relays = relays
|
|
if not self.config.path or not relays:
|
|
return
|
|
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
|
|
try:
|
|
with open(storage_path, 'w', encoding="utf-8") as f:
|
|
json.dump(relays, f, indent=4, sort_keys=True) # type: ignore
|
|
except Exception:
|
|
self.logger.exception(f"failed to write last swapserver relays to {storage_path}")
|
|
|
|
def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]:
|
|
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
|
|
if not os.path.exists(storage_path):
|
|
return None
|
|
try:
|
|
with open(storage_path, 'r', encoding="utf-8") as f:
|
|
relays = json.load(f)
|
|
except Exception:
|
|
self.logger.exception(f"failed to read last swapserver relays from {storage_path}")
|
|
return None
|
|
return relays
|