1
0
Files
electrum/electrum/submarine_swaps.py

1523 lines
64 KiB
Python

import asyncio
import json
import os
import ssl
from typing import TYPE_CHECKING, Optional, Dict, Union, Sequence, Tuple, Iterable
from decimal import Decimal
import math
import time
import attr
import aiohttp
from electrum_ecc import ECPrivkey
import electrum_aionostr as aionostr
from electrum_aionostr.event import Event
from electrum_aionostr.util import to_nip19
from collections import defaultdict
from . import lnutil
from .crypto import sha256, hash_160
from .bitcoin import (script_to_p2wsh, opcodes,
construct_witness)
from .transaction import PartialTxInput, PartialTxOutput, PartialTransaction, Transaction, TxInput, TxOutpoint
from .transaction import script_GetOp, match_script_against_template, OPPushDataGeneric, OPPushDataPubkey
from .util import (log_exceptions, ignore_exceptions, BelowDustLimit, OldTaskGroup, age, ca_path,
gen_nostr_ann_pow, get_nostr_ann_pow_amount)
from .lnutil import REDEEM_AFTER_DOUBLE_SPENT_DELAY
from .bitcoin import dust_threshold, DummyAddress
from .logging import Logger
from .lnutil import hex_to_bytes
from .lnaddr import lndecode
from .json_db import StoredObject, stored_in
from . import constants
from .address_synchronizer import TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE
from .i18n import _
from .fee_policy import FeePolicy
from .bitcoin import construct_script
from .crypto import ripemd
from .invoices import Invoice
from .network import TxBroadcastError
from .lnonion import OnionRoutingFailure, OnionFailureCode
from .lnsweep import SweepInfo
if TYPE_CHECKING:
from .network import Network
from .wallet import Abstract_Wallet
from .lnwatcher import LNWalletWatcher
from .lnworker import LNWallet
from .lnchannel import Channel
from .simple_config import SimpleConfig
SWAP_TX_SIZE = 150 # default tx size, used for mining fee estimation
MIN_LOCKTIME_DELTA = 60
LOCKTIME_DELTA_REFUND = 70
MAX_LOCKTIME_DELTA = 100
MIN_FINAL_CLTV_DELTA_FOR_CLIENT = 3 * 144 # note: put in invoice, but is not enforced by receiver in lnpeer.py
assert MIN_LOCKTIME_DELTA <= LOCKTIME_DELTA_REFUND <= MAX_LOCKTIME_DELTA
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_ACCEPTED
assert MAX_LOCKTIME_DELTA < lnutil.MIN_FINAL_CLTV_DELTA_FOR_INVOICE
assert MAX_LOCKTIME_DELTA < MIN_FINAL_CLTV_DELTA_FOR_CLIENT
# The script of the reverse swaps has one extra check in it to verify
# that the length of the preimage is 32. This is required because in
# the reverse swaps the preimage is generated by the user and to
# settle the hold invoice, you need a preimage with 32 bytes . If that
# check wasn't there the user could generate a preimage with a
# different length which would still allow for claiming the onchain
# coins but the invoice couldn't be settled
WITNESS_TEMPLATE_REVERSE_SWAP = [
opcodes.OP_SIZE,
OPPushDataGeneric(None),
opcodes.OP_EQUAL,
opcodes.OP_IF,
opcodes.OP_HASH160,
OPPushDataGeneric(lambda x: x == 20),
opcodes.OP_EQUALVERIFY,
OPPushDataPubkey,
opcodes.OP_ELSE,
opcodes.OP_DROP,
OPPushDataGeneric(None),
opcodes.OP_CHECKLOCKTIMEVERIFY,
opcodes.OP_DROP,
OPPushDataPubkey,
opcodes.OP_ENDIF,
opcodes.OP_CHECKSIG
]
def check_reverse_redeem_script(
*,
redeem_script: bytes,
lockup_address: str,
payment_hash: bytes,
locktime: int,
refund_pubkey: bytes = None,
claim_pubkey: bytes = None,
) -> None:
parsed_script = [x for x in script_GetOp(redeem_script)]
if not match_script_against_template(redeem_script, WITNESS_TEMPLATE_REVERSE_SWAP):
raise Exception("rswap check failed: scriptcode does not match template")
if script_to_p2wsh(redeem_script) != lockup_address:
raise Exception("rswap check failed: inconsistent scriptcode and address")
if ripemd(payment_hash) != parsed_script[5][1]:
raise Exception("rswap check failed: our preimage not in script")
if claim_pubkey and claim_pubkey != parsed_script[7][1]:
raise Exception("rswap check failed: our pubkey not in script")
if refund_pubkey and refund_pubkey != parsed_script[13][1]:
raise Exception("rswap check failed: our pubkey not in script")
if locktime != int.from_bytes(parsed_script[10][1], byteorder='little'):
raise Exception("rswap check failed: inconsistent locktime and script")
class SwapServerError(Exception):
def __str__(self):
return _("The swap server errored or is unreachable.")
def now():
return int(time.time())
@attr.s
class SwapFees:
percentage = attr.ib(type=int)
mining_fee = attr.ib(type=int)
min_amount = attr.ib(type=int)
max_amount = attr.ib(type=int)
@stored_in('submarine_swaps')
@attr.s
class SwapData(StoredObject):
is_reverse = attr.ib(type=bool) # for whoever is running code (PoV of client or server)
locktime = attr.ib(type=int)
onchain_amount = attr.ib(type=int) # in sats
lightning_amount = attr.ib(type=int) # in sats
redeem_script = attr.ib(type=bytes, converter=hex_to_bytes)
preimage = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
prepay_hash = attr.ib(type=Optional[bytes], converter=hex_to_bytes)
privkey = attr.ib(type=bytes, converter=hex_to_bytes)
lockup_address = attr.ib(type=str)
receive_address = attr.ib(type=str)
funding_txid = attr.ib(type=Optional[str])
spending_txid = attr.ib(type=Optional[str])
is_redeemed = attr.ib(type=bool)
_funding_prevout = None # type: Optional[TxOutpoint] # for RBF
_payment_hash = None
_zeroconf = False
_payment_pending = False # for forward swaps
@property
def payment_hash(self) -> bytes:
return self._payment_hash
def is_funded(self) -> bool:
return self._payment_pending or bool(self.funding_txid)
class SwapManager(Logger):
network: Optional['Network'] = None
lnwatcher: Optional['LNWalletWatcher'] = None
def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'):
Logger.__init__(self)
self.mining_fee = None
self.percentage = None
self._min_amount = None
self._max_amount = None
self.wallet = wallet
self.config = wallet.config
self.lnworker = lnworker
self.lnwatcher = self.lnworker.lnwatcher
self.config = wallet.config
self.taskgroup = OldTaskGroup()
self.dummy_address = DummyAddress.SWAP
self.swaps = self.wallet.db.get_dict('submarine_swaps') # type: Dict[str, SwapData]
self._swaps_by_funding_outpoint = {} # type: Dict[TxOutpoint, SwapData]
self._swaps_by_lockup_address = {} # type: Dict[str, SwapData]
for payment_hash_hex, swap in self.swaps.items():
payment_hash = bytes.fromhex(payment_hash_hex)
swap._payment_hash = payment_hash
self._add_or_reindex_swap(swap)
if not swap.is_reverse and not swap.is_redeemed:
self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
self.prepayments = {} # type: Dict[bytes, bytes] # fee_rhash -> rhash
for k, swap in self.swaps.items():
if swap.prepay_hash is not None:
self.prepayments[swap.prepay_hash] = bytes.fromhex(k)
self.is_server = False # overriden by swapserver plugin if enabled
self.is_initialized = asyncio.Event()
def start_network(self, network: 'Network'):
assert network
if self.network is not None:
self.logger.info('start_network: already started')
return
self.logger.info('start_network: starting main loop')
self.network = network
for k, swap in self.swaps.items():
if swap.is_redeemed:
continue
self.add_lnwatcher_callback(swap)
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
@log_exceptions
async def run_nostr_server(self):
await self.set_nostr_proof_of_work()
with NostrTransport(self.config, self, self.lnworker.nostr_keypair) as transport:
await transport.is_connected.wait()
self.logger.info(f'nostr is connected')
while True:
# todo: publish everytime fees have changed
self.server_update_pairs()
await transport.publish_offer(self)
await asyncio.sleep(transport.OFFER_UPDATE_INTERVAL_SEC)
@log_exceptions
async def main_loop(self):
tasks = [self.pay_pending_invoices()]
if self.is_server:
# nostr and http are not mutually exclusive
if self.config.SWAPSERVER_PORT:
tasks.append(self.http_server.run())
if self.config.NOSTR_RELAYS:
tasks.append(self.run_nostr_server())
async with self.taskgroup as group:
for task in tasks:
await group.spawn(task)
async def stop(self):
await self.taskgroup.cancel_remaining()
def create_transport(self) -> 'SwapServerTransport':
from .lnutil import generate_random_keypair
if self.config.SWAPSERVER_URL:
return HttpTransport(self.config, self)
else:
keypair = self.lnworker.nostr_keypair if self.is_server else generate_random_keypair()
return NostrTransport(self.config, self, keypair)
async def set_nostr_proof_of_work(self) -> None:
current_pow = get_nostr_ann_pow_amount(
self.lnworker.nostr_keypair.pubkey[1:],
self.config.SWAPSERVER_ANN_POW_NONCE
)
if current_pow >= self.config.SWAPSERVER_POW_TARGET:
self.logger.debug(f"Reusing existing PoW nonce for nostr announcement.")
return
self.logger.info(f"Generating PoW for nostr announcement. Target: {self.config.SWAPSERVER_POW_TARGET}")
nonce, pow_amount = await gen_nostr_ann_pow(
self.lnworker.nostr_keypair.pubkey[1:], # pubkey without prefix
self.config.SWAPSERVER_POW_TARGET,
)
self.logger.debug(f"Found {pow_amount} bits of work for Nostr announcement.")
self.config.SWAPSERVER_ANN_POW_NONCE = nonce
async def pay_invoice(self, key):
self.logger.info(f'trying to pay invoice {key}')
self.invoices_to_pay[key] = 1000000000000 # lock
try:
invoice = self.wallet.get_invoice(key)
success, log = await self.lnworker.pay_invoice(invoice.lightning_invoice, attempts=10)
except Exception as e:
self.logger.info(f'exception paying {key}, will not retry')
self.invoices_to_pay.pop(key, None)
return
if not success:
self.logger.info(f'failed to pay {key}, will retry in 10 minutes')
self.invoices_to_pay[key] = now() + 600
else:
self.logger.info(f'paid invoice {key}')
self.invoices_to_pay.pop(key, None)
async def pay_pending_invoices(self):
self.invoices_to_pay = {}
while True:
await asyncio.sleep(5)
for key, not_before in list(self.invoices_to_pay.items()):
if now() < not_before:
continue
await self.taskgroup.spawn(self.pay_invoice(key))
def cancel_normal_swap(self, swap: SwapData):
""" we must not have broadcast the funding tx """
if swap is None:
return
if swap.is_funded():
self.logger.info(f'cannot cancel swap {swap.payment_hash.hex()}: already funded')
return
self._fail_swap(swap, 'user cancelled')
def _fail_swap(self, swap: SwapData, reason: str):
self.logger.info(f'failing swap {swap.payment_hash.hex()}: {reason}')
if not swap.is_reverse and swap.payment_hash in self.lnworker.hold_invoice_callbacks:
self.lnworker.unregister_hold_invoice(swap.payment_hash)
payment_secret = self.lnworker.get_payment_secret(swap.payment_hash)
payment_key = swap.payment_hash + payment_secret
e = OnionRoutingFailure(code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, data=b'')
self.lnworker.save_forwarding_failure(payment_key.hex(), failure_message=e)
self.lnwatcher.remove_callback(swap.lockup_address)
if not swap.is_funded():
self.swaps.pop(swap.payment_hash.hex())
def extract_preimage(self, swap: SwapData, claim_tx: Transaction) -> Optional[bytes]:
for txin in claim_tx.inputs():
preimage = txin.witness_elements()[1]
if sha256(preimage) == swap.payment_hash:
return preimage
def _claim_swap(self, swap: SwapData) -> None:
assert self.network
assert self.lnwatcher
if not self.lnwatcher.adb.is_up_to_date():
return
current_height = self.network.get_local_height()
remaining_time = swap.locktime - current_height
txos = self.lnwatcher.adb.get_addr_outputs(swap.lockup_address)
for txin in txos.values():
if swap.is_reverse and txin.value_sats() < swap.onchain_amount:
# amount too low, we must not reveal the preimage
continue
break
else:
# swap not funded.
txin = None
# if it is a normal swap, we might have double spent the funding tx
# in that case we need to fail the HTLCs
if remaining_time <= 0:
self._fail_swap(swap, 'expired')
if txin:
# the swap is funded
# note: swap.funding_txid can change due to RBF, it will get updated here:
swap.funding_txid = txin.prevout.txid.hex()
swap._funding_prevout = txin.prevout
self._add_or_reindex_swap(swap) # to update _swaps_by_funding_outpoint
funding_height = self.lnwatcher.adb.get_tx_height(txin.prevout.txid.hex())
spent_height = txin.spent_height
# set spending_txid (even if tx is local), for GUI grouping
swap.spending_txid = txin.spent_txid
# discard local spenders
if spent_height in [TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE]:
spent_height = None
if spent_height is not None:
if spent_height > 0:
if current_height - spent_height > REDEEM_AFTER_DOUBLE_SPENT_DELAY:
self.logger.info(f'stop watching swap {swap.lockup_address}')
self.lnwatcher.remove_callback(swap.lockup_address)
swap.is_redeemed = True
if not swap.is_reverse:
if swap.preimage is None and spent_height is not None:
# extract the preimage, add it to lnwatcher
claim_tx = self.lnwatcher.adb.get_transaction(txin.spent_txid)
preimage = self.extract_preimage(swap, claim_tx)
if preimage:
swap.preimage = preimage
self.logger.info(f'found preimage: {preimage.hex()}')
self.lnworker.preimages[swap.payment_hash.hex()] = preimage.hex()
else:
# this is our refund tx
if spent_height > 0:
self.logger.info(f'refund tx confirmed: {txin.spent_txid} {spent_height}')
self._fail_swap(swap, 'refund tx confirmed')
return
if remaining_time > 0:
# too early for refund
return
if swap.preimage:
# we have been paid. do not try to get refund.
return
else:
if swap.preimage is None:
swap.preimage = self.lnworker.get_preimage(swap.payment_hash)
if swap.preimage is None:
if funding_height.conf <= 0:
return
key = swap.payment_hash.hex()
if remaining_time <= MIN_LOCKTIME_DELTA:
if key in self.invoices_to_pay:
# fixme: should consider cltv of ln payment
self.logger.info(f'locktime too close {key} {remaining_time}')
self.invoices_to_pay.pop(key, None)
return
if key not in self.invoices_to_pay:
self.invoices_to_pay[key] = 0
return
if self.network.config.TEST_SWAPSERVER_REFUND:
# for testing: do not create claim tx
return
if spent_height is not None and spent_height > 0:
return
txin, locktime = self.create_claim_txin(txin=txin, swap=swap)
# note: there is no csv in the script, we just set this so that txbatcher waits for one confirmation
csv = 1 if (swap.is_reverse and not swap._zeroconf) else 0
name = 'swap claim' if swap.is_reverse else 'swap refund'
can_be_batched = bool(csv) if swap.is_reverse else True
sweep_info = SweepInfo(
txin=txin,
csv_delay=csv,
cltv_abs=locktime,
txout=None,
name=name,
can_be_batched=can_be_batched,
)
try:
self.wallet.txbatcher.add_sweep_input('swaps', sweep_info, self.config.FEE_POLICY_SWAPS)
except BelowDustLimit:
self.logger.info('utxo value below dust threshold')
return
def get_swap_tx_fee(self):
return self._get_tx_fee(self.config.FEE_POLICY)
def get_fee_for_txbatcher(self):
return self._get_tx_fee(self.config.FEE_POLICY_SWAPS)
def _get_tx_fee(self, policy_descriptor: str):
fee_policy = FeePolicy(policy_descriptor)
return fee_policy.estimate_fee(SWAP_TX_SIZE, network=self.network, allow_fallback_to_static_rates=True)
def get_swap(self, payment_hash: bytes) -> Optional[SwapData]:
# for history
swap = self.swaps.get(payment_hash.hex())
if swap:
return swap
payment_hash = self.prepayments.get(payment_hash)
if payment_hash:
return self.swaps.get(payment_hash.hex())
def add_lnwatcher_callback(self, swap: SwapData) -> None:
callback = lambda: self._claim_swap(swap)
self.lnwatcher.add_callback(swap.lockup_address, callback)
async def hold_invoice_callback(self, payment_hash: bytes) -> None:
# note: this assumes the wallet has been unlocked
key = payment_hash.hex()
if key in self.swaps:
swap = self.swaps[key]
if not swap.is_funded():
output = self.create_funding_output(swap)
self.wallet.txbatcher.add_payment_output('swaps', output, self.config.FEE_POLICY_SWAPS)
swap._payment_pending = True
else:
self.logger.info(f'key not in swaps {key}')
def create_normal_swap(self, *, lightning_amount_sat: int, payment_hash: bytes, their_pubkey: bytes = None):
""" server method """
assert lightning_amount_sat
locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
our_privkey = os.urandom(32)
our_pubkey = ECPrivkey(our_privkey).get_public_key_bytes(compressed=True)
onchain_amount_sat = self._get_recv_amount(lightning_amount_sat, is_reverse=True) # what the client is going to receive
redeem_script = construct_script(
WITNESS_TEMPLATE_REVERSE_SWAP,
values={1:32, 5:ripemd(payment_hash), 7:their_pubkey, 10:locktime, 13:our_pubkey}
)
swap, invoice, prepay_invoice = self.add_normal_swap(
redeem_script=redeem_script,
locktime=locktime,
onchain_amount_sat=onchain_amount_sat,
lightning_amount_sat=lightning_amount_sat,
payment_hash=payment_hash,
our_privkey=our_privkey,
prepay=True,
)
self.lnworker.register_hold_invoice(payment_hash, self.hold_invoice_callback)
return swap, invoice, prepay_invoice
def add_normal_swap(
self, *,
redeem_script: bytes,
locktime: int, # onchain
onchain_amount_sat: int,
lightning_amount_sat: int,
payment_hash: bytes,
our_privkey: bytes,
prepay: bool,
channels: Optional[Sequence['Channel']] = None,
min_final_cltv_expiry_delta: Optional[int] = None,
) -> Tuple[SwapData, str, Optional[str]]:
"""creates a hold invoice"""
if prepay:
prepay_amount_sat = self.get_fee_for_txbatcher() * 2
invoice_amount_sat = lightning_amount_sat - prepay_amount_sat
else:
invoice_amount_sat = lightning_amount_sat
_, invoice = self.lnworker.get_bolt11_invoice(
payment_hash=payment_hash,
amount_msat=invoice_amount_sat * 1000,
message='Submarine swap',
expiry=300,
fallback_address=None,
channels=channels,
min_final_cltv_expiry_delta=min_final_cltv_expiry_delta,
)
# add payment info to lnworker
self.lnworker.add_payment_info_for_hold_invoice(payment_hash, invoice_amount_sat)
if prepay:
prepay_hash = self.lnworker.create_payment_info(amount_msat=prepay_amount_sat*1000)
_, prepay_invoice = self.lnworker.get_bolt11_invoice(
payment_hash=prepay_hash,
amount_msat=prepay_amount_sat * 1000,
message='Submarine swap mining fees',
expiry=300,
fallback_address=None,
channels=channels,
min_final_cltv_expiry_delta=min_final_cltv_expiry_delta,
)
self.lnworker.bundle_payments([payment_hash, prepay_hash])
self.prepayments[prepay_hash] = payment_hash
else:
prepay_invoice = None
prepay_hash = None
lockup_address = script_to_p2wsh(redeem_script)
receive_address = self.wallet.get_receiving_address()
swap = SwapData(
redeem_script=redeem_script,
locktime=locktime,
privkey=our_privkey,
preimage=None,
prepay_hash=prepay_hash,
lockup_address=lockup_address,
onchain_amount=onchain_amount_sat,
receive_address=receive_address,
lightning_amount=lightning_amount_sat,
is_reverse=False,
is_redeemed=False,
funding_txid=None,
spending_txid=None,
)
swap._payment_hash = payment_hash
self._add_or_reindex_swap(swap)
self.add_lnwatcher_callback(swap)
return swap, invoice, prepay_invoice
def create_reverse_swap(self, *, lightning_amount_sat: int, their_pubkey: bytes) -> SwapData:
""" server method. """
assert lightning_amount_sat is not None
locktime = self.network.get_local_height() + LOCKTIME_DELTA_REFUND
privkey = os.urandom(32)
our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
onchain_amount_sat = self._get_send_amount(lightning_amount_sat, is_reverse=False)
preimage = os.urandom(32)
payment_hash = sha256(preimage)
redeem_script = construct_script(
WITNESS_TEMPLATE_REVERSE_SWAP,
values={1:32, 5:ripemd(payment_hash), 7:our_pubkey, 10:locktime, 13:their_pubkey}
)
swap = self.add_reverse_swap(
redeem_script=redeem_script,
locktime=locktime,
privkey=privkey,
preimage=preimage,
payment_hash=payment_hash,
prepay_hash=None,
onchain_amount_sat=onchain_amount_sat,
lightning_amount_sat=lightning_amount_sat)
return swap
def add_reverse_swap(
self,
*,
redeem_script: bytes,
locktime: int, # onchain
privkey: bytes,
lightning_amount_sat: int,
onchain_amount_sat: int,
preimage: bytes,
payment_hash: bytes,
prepay_hash: Optional[bytes] = None,
) -> SwapData:
lockup_address = script_to_p2wsh(redeem_script)
receive_address = self.wallet.get_receiving_address()
swap = SwapData(
redeem_script=redeem_script,
locktime=locktime,
privkey=privkey,
preimage=preimage,
prepay_hash=prepay_hash,
lockup_address=lockup_address,
onchain_amount=onchain_amount_sat,
receive_address=receive_address,
lightning_amount=lightning_amount_sat,
is_reverse=True,
is_redeemed=False,
funding_txid=None,
spending_txid=None,
)
if prepay_hash:
self.prepayments[prepay_hash] = payment_hash
swap._payment_hash = payment_hash
self._add_or_reindex_swap(swap)
self.add_lnwatcher_callback(swap)
return swap
def server_add_swap_invoice(self, request):
invoice = request['invoice']
invoice = Invoice.from_bech32(invoice)
key = invoice.rhash
payment_hash = bytes.fromhex(key)
assert key in self.swaps
swap = self.swaps[key]
assert swap.lightning_amount == int(invoice.get_amount_sat())
self.wallet.save_invoice(invoice)
# check that we have the preimage
assert sha256(swap.preimage) == payment_hash
assert swap.spending_txid is None
self.invoices_to_pay[key] = 0
return {}
async def normal_swap(
self,
*,
lightning_amount_sat: int,
expected_onchain_amount_sat: int,
password,
tx: PartialTransaction = None,
channels = None,
) -> Optional[str]:
"""send on-chain BTC, receive on Lightning
Old (removed) flow:
- User generates an LN invoice with RHASH, and knows preimage.
- User creates on-chain output locked to RHASH.
- Server pays LN invoice. User reveals preimage.
- Server spends the on-chain output using preimage.
cltv safety requirement: (onchain_locktime > LN_locktime), otherwise server is vulnerable
New flow:
- User requests swap
- Server creates preimage, sends RHASH to user
- User creates hold invoice, sends it to server
- Server sends HTLC, user holds it
- User creates on-chain output locked to RHASH
- Server spends the on-chain output using preimage (revealing the preimage)
- User fulfills HTLC using preimage
cltv safety requirement: (onchain_locktime < LN_locktime), otherwise client is vulnerable
"""
assert self.network
assert self.lnwatcher
swap, invoice = await self.request_normal_swap(
lightning_amount_sat=lightning_amount_sat,
expected_onchain_amount_sat=expected_onchain_amount_sat,
channels=channels,
)
tx = self.create_funding_tx(swap, tx, password=password)
return await self.wait_for_htlcs_and_broadcast(swap=swap, invoice=invoice, tx=tx)
async def request_normal_swap(
self, transport, *,
lightning_amount_sat: int,
expected_onchain_amount_sat: int,
channels: Optional[Sequence['Channel']] = None,
) -> Tuple[SwapData, str]:
await self.is_initialized.wait() # add timeout
refund_privkey = os.urandom(32)
refund_pubkey = ECPrivkey(refund_privkey).get_public_key_bytes(compressed=True)
self.logger.info('requesting preimage hash for swap')
request_data = {
"invoiceAmount": lightning_amount_sat,
"refundPublicKey": refund_pubkey.hex()
}
data = await transport.send_request_to_server('createnormalswap', request_data)
payment_hash = bytes.fromhex(data["preimageHash"])
zeroconf = data["acceptZeroConf"]
onchain_amount = data["expectedAmount"]
locktime = data["timeoutBlockHeight"]
lockup_address = data["address"]
redeem_script = bytes.fromhex(data["redeemScript"])
# verify redeem_script is built with our pubkey and preimage
check_reverse_redeem_script(
redeem_script=redeem_script,
lockup_address=lockup_address,
payment_hash=payment_hash,
locktime=locktime,
refund_pubkey=refund_pubkey,
)
# check that onchain_amount is not more than what we estimated
if onchain_amount > expected_onchain_amount_sat:
raise Exception(f"fswap check failed: onchain_amount is more than what we estimated: "
f"{onchain_amount} > {expected_onchain_amount_sat}")
# verify that they are not locking up funds for too long
if locktime - self.network.get_local_height() > MAX_LOCKTIME_DELTA:
raise Exception("fswap check failed: locktime too far in future")
swap, invoice, _ = self.add_normal_swap(
redeem_script=redeem_script,
locktime=locktime,
lightning_amount_sat=lightning_amount_sat,
onchain_amount_sat=onchain_amount,
payment_hash=payment_hash,
our_privkey=refund_privkey,
prepay=False,
channels=channels,
# When the client is doing a normal swap, we create a ln-invoice with larger than usual final_cltv_delta.
# If the user goes offline after broadcasting the funding tx (but before it is mined and
# the server claims it), they need to come back online before the held ln-htlc expires (see #8940).
# If the held ln-htlc expires, and the funding tx got confirmed, the server will have claimed the onchain
# funds, and the ln-htlc will be timed out onchain (and channel force-closed). i.e. the user loses the swap
# amount. Increasing the final_cltv_delta the user puts in the invoice extends this critical window.
min_final_cltv_expiry_delta=MIN_FINAL_CLTV_DELTA_FOR_CLIENT,
)
return swap, invoice
async def wait_for_htlcs_and_broadcast(
self, transport, *,
swap: SwapData,
invoice: str,
tx: Transaction,
) -> Optional[str]:
await transport.is_connected.wait()
payment_hash = swap.payment_hash
refund_pubkey = ECPrivkey(swap.privkey).get_public_key_bytes(compressed=True)
async def callback(payment_hash):
# FIXME what if this raises, e.g. TxBroadcastError?
# We will never retry the hold-invoice-callback.
await self.broadcast_funding_tx(swap, tx)
self.lnworker.register_hold_invoice(payment_hash, callback)
# send invoice to server and wait for htlcs
request_data = {
"preimageHash": payment_hash.hex(),
"invoice": invoice,
"refundPublicKey": refund_pubkey.hex(),
}
data = await transport.send_request_to_server('addswapinvoice', request_data)
# wait for funding tx
lnaddr = lndecode(invoice)
while swap.funding_txid is None and not lnaddr.is_expired():
await asyncio.sleep(0.1)
return swap.funding_txid
def create_funding_output(self, swap):
return PartialTxOutput.from_address_and_value(swap.lockup_address, swap.onchain_amount)
def create_funding_tx(
self,
swap: SwapData,
tx: Optional[PartialTransaction],
*,
password,
) -> PartialTransaction:
# create funding tx
# use fee policy set by user (not using txbatcher)
fee_policy = FeePolicy(self.config.FEE_POLICY)
# note: rbf must not decrease payment
# this is taken care of in wallet._is_rbf_allowed_to_touch_tx_output
if tx is None:
funding_output = self.create_funding_output(swap)
tx = self.wallet.create_transaction(
outputs=[funding_output],
rbf=True,
fee_policy=fee_policy,
)
else:
tx.replace_output_address(DummyAddress.SWAP, swap.lockup_address)
tx.set_rbf(True)
self.wallet.sign_transaction(tx, password)
return tx
@log_exceptions
async def request_swap_for_amount(self, transport, onchain_amount) -> Optional[Tuple[SwapData, str]]:
await self.is_initialized.wait()
lightning_amount_sat = self.get_recv_amount(onchain_amount, is_reverse=False)
swap, invoice = await self.request_normal_swap(
transport,
lightning_amount_sat=lightning_amount_sat,
expected_onchain_amount_sat=onchain_amount)
return swap, invoice
@log_exceptions
async def broadcast_funding_tx(self, swap: SwapData, tx: Transaction) -> None:
swap.funding_txid = tx.txid()
await self.network.broadcast_transaction(tx)
async def reverse_swap(
self, transport,
*,
lightning_amount_sat: int,
expected_onchain_amount_sat: int,
zeroconf: bool=False,
channels: Optional[Sequence['Channel']] = None,
) -> Optional[str]:
"""send on Lightning, receive on-chain
- User generates preimage, RHASH. Sends RHASH to server.
- Server creates an LN invoice for RHASH.
- User pays LN invoice - except server needs to hold the HTLC as preimage is unknown.
- if the server requested a fee prepayment (using 'minerFeeInvoice'),
the server will have the preimage for that. The user will send HTLCs for both the main RHASH,
and for the fee prepayment. Once both MPP sets arrive at the server, the server will fulfill
the HTLCs for the fee prepayment (before creating the on-chain output).
- Server creates on-chain output locked to RHASH.
- User spends on-chain output, revealing preimage.
- Server fulfills HTLC using preimage.
Note: expected_onchain_amount_sat is BEFORE deducting the on-chain claim tx fee.
"""
assert self.network
assert self.lnwatcher
privkey = os.urandom(32)
our_pubkey = ECPrivkey(privkey).get_public_key_bytes(compressed=True)
preimage = os.urandom(32)
payment_hash = sha256(preimage)
request_data = {
"type": "reversesubmarine",
"pairId": "BTC/BTC",
"orderSide": "buy",
"invoiceAmount": lightning_amount_sat,
"preimageHash": payment_hash.hex(),
"claimPublicKey": our_pubkey.hex()
}
self.logger.debug(f'rswap: sending request for {lightning_amount_sat}')
data = await transport.send_request_to_server('createswap', request_data)
invoice = data['invoice']
fee_invoice = data.get('minerFeeInvoice')
lockup_address = data['lockupAddress']
redeem_script = bytes.fromhex(data['redeemScript'])
locktime = data['timeoutBlockHeight']
onchain_amount = data["onchainAmount"]
response_id = data['id']
self.logger.debug(f'rswap: {response_id=}')
# verify redeem_script is built with our pubkey and preimage
check_reverse_redeem_script(
redeem_script=redeem_script,
lockup_address=lockup_address,
payment_hash=payment_hash,
locktime=locktime,
refund_pubkey=None,
claim_pubkey=our_pubkey,
)
# check that the onchain amount is what we expected
if onchain_amount < expected_onchain_amount_sat:
raise Exception(f"rswap check failed: onchain_amount is less than what we expected: "
f"{onchain_amount} < {expected_onchain_amount_sat}")
# verify that we will have enough time to get our tx confirmed
if locktime - self.network.get_local_height() <= MIN_LOCKTIME_DELTA:
raise Exception("rswap check failed: locktime too close")
# verify invoice payment_hash
lnaddr = self.lnworker._check_invoice(invoice)
invoice_amount = int(lnaddr.get_amount_sat())
if lnaddr.paymenthash != payment_hash:
raise Exception("rswap check failed: inconsistent RHASH and invoice")
# check that the lightning amount is what we requested
if fee_invoice:
fee_lnaddr = self.lnworker._check_invoice(fee_invoice)
invoice_amount += fee_lnaddr.get_amount_sat()
prepay_hash = fee_lnaddr.paymenthash
else:
prepay_hash = None
if int(invoice_amount) != lightning_amount_sat:
raise Exception(f"rswap check failed: invoice_amount ({invoice_amount}) "
f"not what we requested ({lightning_amount_sat})")
# save swap data to wallet file
swap = self.add_reverse_swap(
redeem_script=redeem_script,
locktime=locktime,
privkey=privkey,
preimage=preimage,
payment_hash=payment_hash,
prepay_hash=prepay_hash,
onchain_amount_sat=onchain_amount,
lightning_amount_sat=lightning_amount_sat)
swap._zeroconf = zeroconf
# initiate fee payment.
if fee_invoice:
asyncio.ensure_future(self.lnworker.pay_invoice(fee_invoice))
# we return if we detect funding
async def wait_for_funding(swap):
while swap.funding_txid is None:
await asyncio.sleep(1)
# initiate main payment
tasks = [asyncio.create_task(self.lnworker.pay_invoice(invoice, channels=channels)), asyncio.create_task(wait_for_funding(swap))]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
return swap.funding_txid
def _add_or_reindex_swap(self, swap: SwapData) -> None:
if swap.payment_hash.hex() not in self.swaps:
self.swaps[swap.payment_hash.hex()] = swap
if swap._funding_prevout:
self._swaps_by_funding_outpoint[swap._funding_prevout] = swap
self._swaps_by_lockup_address[swap.lockup_address] = swap
def server_update_pairs(self) -> None:
""" for server """
self.percentage = float(self.config.SWAPSERVER_FEE_MILLIONTHS) / 10000
self._min_amount = 20000
self._max_amount = 10000000
self.mining_fee = self.get_fee_for_txbatcher()
def update_pairs(self, pairs):
self.logger.info(f'updating fees {pairs}')
self.mining_fee = pairs.mining_fee
self.percentage = pairs.percentage
self._min_amount = pairs.min_amount
self._max_amount = pairs.max_amount
self.is_initialized.set()
def get_max_amount(self) -> int:
"""in satoshis"""
return self._max_amount
def get_min_amount(self) -> int:
"""in satoshis"""
return self._min_amount
def check_invoice_amount(self, x) -> bool:
return self.get_min_amount() <= x <= self.get_max_amount()
def _get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
"""For a given swap direction and amount we send, returns how much we will receive.
Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
In the reverse direction, the result matches what the swap server returns as response["onchainAmount"].
"""
if send_amount is None:
return
x = Decimal(send_amount)
percentage = Decimal(self.percentage)
if is_reverse:
if not self.check_invoice_amount(x):
return
# see/ref:
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L948
percentage_fee = math.ceil(percentage * x / 100)
base_fee = self.mining_fee
x -= percentage_fee + base_fee
x = math.floor(x)
if x < dust_threshold():
return
else:
x -= self.mining_fee
percentage_fee = math.ceil(x * percentage / (100 + percentage))
x -= percentage_fee
if not self.check_invoice_amount(x):
return
x = int(x)
return x
def _get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
"""For a given swap direction and amount we want to receive, returns how much we will need to send.
Note: in the reverse direction, the mining fee for the on-chain claim tx is NOT accounted for.
In the forward direction, the result matches what the swap server returns as response["expectedAmount"].
"""
if not recv_amount:
return
x = Decimal(recv_amount)
percentage = Decimal(self.percentage)
if is_reverse:
# see/ref:
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L928
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L958
base_fee = self.mining_fee
x += base_fee
x = math.ceil(x / ((100 - percentage) / 100))
if not self.check_invoice_amount(x):
return
else:
if not self.check_invoice_amount(x):
return
# see/ref:
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/service/Service.ts#L708
# https://github.com/BoltzExchange/boltz-backend/blob/e7e2d30f42a5bea3665b164feb85f84c64d86658/lib/rates/FeeProvider.ts#L90
percentage_fee = math.ceil(percentage * x / 100)
x += percentage_fee + self.mining_fee
x = int(x)
return x
def get_recv_amount(self, send_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
# first, add percentage fee
recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
# sanity check calculation can be inverted
if recv_amount is not None:
inverted_send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
# accept off-by ones as amt_rcv = recv_amt(send_amt(amt_rcv)) only up to +-1
if abs(send_amount - inverted_send_amount) > 1:
raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
f"send_amount={send_amount} -> recv_amount={recv_amount} -> inverted_send_amount={inverted_send_amount}")
# second, add on-chain claim tx fee
if is_reverse and recv_amount is not None:
recv_amount -= self.get_swap_tx_fee()
return recv_amount
def get_send_amount(self, recv_amount: Optional[int], *, is_reverse: bool) -> Optional[int]:
# first, add on-chain claim tx fee
if is_reverse and recv_amount is not None:
recv_amount += self.get_swap_tx_fee()
# second, add percentage fee
send_amount = self._get_send_amount(recv_amount, is_reverse=is_reverse)
# sanity check calculation can be inverted
if send_amount is not None:
inverted_recv_amount = self._get_recv_amount(send_amount, is_reverse=is_reverse)
if recv_amount != inverted_recv_amount:
raise Exception(f"calc-invert-sanity-check failed. is_reverse={is_reverse}. "
f"recv_amount={recv_amount} -> send_amount={send_amount} -> inverted_recv_amount={inverted_recv_amount}")
return send_amount
def get_swaps_by_funding_tx(self, tx: Transaction) -> Iterable[SwapData]:
swaps = []
for txout_idx, _txo in enumerate(tx.outputs()):
prevout = TxOutpoint(txid=bytes.fromhex(tx.txid()), out_idx=txout_idx)
if swap := self._swaps_by_funding_outpoint.get(prevout):
swaps.append(swap)
return swaps
def get_swaps_by_claim_tx(self, tx: Transaction) -> Iterable[SwapData]:
swaps = []
for i, txin in enumerate(tx.inputs()):
if swap := self.get_swap_by_claim_txin(txin):
swaps.append((i, swap))
return swaps
def get_swap_by_claim_txin(self, txin: TxInput) -> Optional[SwapData]:
return self._swaps_by_funding_outpoint.get(txin.prevout)
def is_lockup_address_for_a_swap(self, addr: str) -> bool:
return bool(self._swaps_by_lockup_address.get(addr))
@classmethod
def add_txin_info(cls, swap, txin: PartialTxInput) -> None:
"""Add some info to a claim txin.
note: even without signing, this is useful for tx size estimation.
"""
preimage = swap.preimage if swap.is_reverse else 0
witness_script = swap.redeem_script
txin.script_sig = b''
txin.witness_script = witness_script
sig_dummy = b'\x00' * 71 # DER-encoded ECDSA sig, with low S and low R
witness = [sig_dummy, preimage, witness_script]
txin.witness_sizehint = len(construct_witness(witness))
txin.nsequence = 0xffffffff - 2
@classmethod
def create_claim_txin(
cls,
*,
txin: PartialTxInput,
swap: SwapData,
) -> PartialTransaction:
if swap.is_reverse: # successful reverse swap
locktime = None
# preimage will be set in sign_tx
else: # timing out forward swap
locktime = swap.locktime
cls.add_txin_info(swap, txin)
txin.privkey = swap.privkey
def make_witness(sig):
# preimae not known yet
preimage = swap.preimage if swap.is_reverse else 0
witness_script = swap.redeem_script
return construct_witness([sig, preimage, witness_script])
txin.make_witness = make_witness
return txin, locktime
def max_amount_forward_swap(self) -> Optional[int]:
""" returns None if we cannot swap """
max_swap_amt_ln = self.get_max_amount()
if max_swap_amt_ln is None:
return None
max_recv_amt_ln = int(self.lnworker.num_sats_can_receive())
max_amt_ln = int(min(max_swap_amt_ln, max_recv_amt_ln))
max_amt_oc = self.get_send_amount(max_amt_ln, is_reverse=False) or 0
min_amt_oc = self.get_send_amount(self.get_min_amount(), is_reverse=False) or 0
return max_amt_oc if max_amt_oc >= min_amt_oc else None
def server_create_normal_swap(self, request):
# normal for client, reverse for server
#request = await r.json()
lightning_amount_sat = request['invoiceAmount']
their_pubkey = bytes.fromhex(request['refundPublicKey'])
assert len(their_pubkey) == 33
swap = self.create_reverse_swap(
lightning_amount_sat=lightning_amount_sat,
their_pubkey=their_pubkey,
)
response = {
"id": swap.payment_hash.hex(),
'preimageHash': swap.payment_hash.hex(),
"acceptZeroConf": False,
"expectedAmount": swap.onchain_amount,
"timeoutBlockHeight": swap.locktime,
"address": swap.lockup_address,
"redeemScript": swap.redeem_script.hex(),
}
return response
def server_create_swap(self, request):
# reverse for client, forward for server
# requesting a normal swap (old protocol) will raise an exception
#request = await r.json()
req_type = request['type']
assert request['pairId'] == 'BTC/BTC'
if req_type == 'reversesubmarine':
lightning_amount_sat=request['invoiceAmount']
payment_hash=bytes.fromhex(request['preimageHash'])
their_pubkey=bytes.fromhex(request['claimPublicKey'])
assert len(payment_hash) == 32
assert len(their_pubkey) == 33
swap, invoice, prepay_invoice = self.create_normal_swap(
lightning_amount_sat=lightning_amount_sat,
payment_hash=payment_hash,
their_pubkey=their_pubkey
)
response = {
'id': payment_hash.hex(),
'invoice': invoice,
'minerFeeInvoice': prepay_invoice,
'lockupAddress': swap.lockup_address,
'redeemScript': swap.redeem_script.hex(),
'timeoutBlockHeight': swap.locktime,
"onchainAmount": swap.onchain_amount,
}
elif req_type == 'submarine':
raise Exception('Deprecated API. Please upgrade your version of Electrum')
else:
raise Exception('unsupported request type:' + req_type)
return response
def get_groups_for_onchain_history(self):
current_height = self.wallet.adb.get_local_height()
d = {}
# add info about submarine swaps
settled_payments = self.lnworker.get_payments(status='settled')
for payment_hash_hex, swap in self.swaps.items():
txid = swap.spending_txid if swap.is_reverse else swap.funding_txid
if txid is None:
continue
payment_hash = bytes.fromhex(payment_hash_hex)
if payment_hash in settled_payments:
plist = settled_payments[payment_hash]
info = self.lnworker.get_payment_info(payment_hash)
direction, amount_msat, fee_msat, timestamp = self.lnworker.get_payment_value(info, plist)
else:
amount_msat = 0
if swap.is_reverse:
group_label = 'Reverse swap' + ' ' + self.config.format_amount_and_units(swap.lightning_amount)
else:
group_label = 'Forward swap' + ' ' + self.config.format_amount_and_units(swap.onchain_amount)
label = _('Claim transaction') if swap.is_reverse else _('Funding transaction')
delta = current_height - swap.locktime
if self.wallet.adb.is_mine(swap.lockup_address):
tx_height = self.wallet.adb.get_tx_height(swap.funding_txid)
if swap.is_reverse and tx_height.height <= 0:
label += ' (%s)' % _('waiting for funding tx confirmation')
if not swap.is_reverse and not swap.is_redeemed and swap.spending_txid is None and delta < 0:
label += f' (refundable in {-delta} blocks)' # fixme: only if unspent
d[txid] = {
'group_id': txid,
'label': label,
'group_label': group_label,
}
if not swap.is_reverse:
claim_tx = self.lnwatcher.adb.get_transaction(swap.spending_txid)
if claim_tx and not self.extract_preimage(swap, claim_tx):
# if the spending_tx is in the wallet, this will add it
# to the group (see wallet.get_full_history)
d[swap.spending_txid] = {
'group_id': txid,
'group_label': group_label,
'label': _('Refund transaction'),
}
return d
def get_group_id_for_payment_hash(self, payment_hash):
# add group_id to swap transactions
swap = self.get_swap(payment_hash)
if swap:
return swap.spending_txid if swap.is_reverse else swap.funding_txid
class SwapServerTransport(Logger):
def __init__(self, *, config: 'SimpleConfig', sm: 'SwapManager'):
Logger.__init__(self)
self.sm = sm
self.network = sm.network
self.config = config
self.is_connected = asyncio.Event()
def __enter__(self):
pass
def __exit__(self, ex_type, ex, tb):
pass
async def send_request_to_server(self, method: str, request_data: Optional[dict]) -> dict:
pass
async def get_pairs(self) -> None:
pass
class HttpTransport(SwapServerTransport):
def __init__(self, config, sm):
SwapServerTransport.__init__(self, config=config, sm=sm)
self.api_url = config.SWAPSERVER_URL
self.is_connected.set()
def __enter__(self):
asyncio.run_coroutine_threadsafe(self.get_pairs(), self.network.asyncio_loop)
return self
def __exit__(self, ex_type, ex, tb):
pass
async def send_request_to_server(self, method, request_data):
response = await self.network.async_send_http_on_proxy(
'post' if request_data else 'get',
self.api_url + '/' + method,
json=request_data,
timeout=30)
return json.loads(response)
async def get_pairs(self) -> None:
"""Might raise SwapServerError."""
try:
response = await self.send_request_to_server('getpairs', None)
except aiohttp.ClientError as e:
self.logger.error(f"Swap server errored: {e!r}")
raise SwapServerError() from e
assert response.get('htlcFirst') is True
fees = response['pairs']['BTC/BTC']['fees']
limits = response['pairs']['BTC/BTC']['limits']
pairs = SwapFees(
percentage=fees['percentage'],
mining_fee=fees['minerFees']['baseAsset']['mining_fee'],
min_amount=limits['minimal'],
max_amount=limits['maximal'],
)
self.sm.update_pairs(pairs)
class NostrTransport(SwapServerTransport):
# uses nostr:
# - to advertise servers
# - for client-server RPCs (using DMs)
# (todo: we should use onion messages for that)
NOSTR_DM = 4
USER_STATUS_NIP38 = 30315
NOSTR_EVENT_VERSION = 3
OFFER_UPDATE_INTERVAL_SEC = 60 * 10
def __init__(self, config, sm, keypair):
SwapServerTransport.__init__(self, config=config, sm=sm)
self._offers = {} # type: Dict[str, Dict]
self.private_key = keypair.privkey
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
self.nostr_pubkey = keypair.pubkey.hex()[2:]
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[bytes, asyncio.Future]
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
self.relay_manager = aionostr.Manager(self.relays, private_key=self.nostr_private_key, log=self.logger, ssl_context=ssl_context)
self.taskgroup = OldTaskGroup()
self.server_relays = None
def __enter__(self):
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
return self
def __exit__(self, ex_type, ex, tb):
fut = asyncio.run_coroutine_threadsafe(self.stop(), self.network.asyncio_loop)
fut.result(timeout=5)
@log_exceptions
async def main_loop(self):
self.logger.info(f'starting nostr transport with pubkey: {self.nostr_pubkey}')
self.logger.info(f'nostr relays: {self.relays}')
await self.relay_manager.connect()
connected_relays = self.relay_manager.relays
self.logger.info(f'connected relays: {[relay.url for relay in connected_relays]}')
if connected_relays:
self.is_connected.set()
if self.sm.is_server:
tasks = [
self.check_direct_messages(),
]
else:
tasks = [
self.check_direct_messages(),
self.receive_offers(),
self.get_pairs(),
]
try:
async with self.taskgroup as group:
for task in tasks:
await group.spawn(task)
except Exception as e:
self.logger.exception("taskgroup died.")
finally:
self.logger.info("taskgroup stopped.")
@log_exceptions
async def stop(self):
self.logger.info("shutting down nostr transport")
self.sm.is_initialized.clear()
await self.taskgroup.cancel_remaining()
await self.relay_manager.close()
self.logger.info("nostr transport shut down")
@property
def relays(self):
return self.network.config.NOSTR_RELAYS.split(',')
def get_offer(self, pubkey):
offer = self._offers.get(pubkey)
return self._parse_offer(offer)
def get_recent_offers(self) -> Sequence[Dict]:
# filter to fresh timestamps
now = int(time.time())
recent_offers = [x for x in self._offers.values() if now - x['timestamp'] < 3600]
# sort by proof-of-work
recent_offers = sorted(recent_offers, key=lambda x: x['pow_bits'], reverse=True)
# cap list size
recent_offers = recent_offers[:20]
return recent_offers
def _parse_offer(self, offer):
return SwapFees(
percentage=offer['percentage_fee'],
mining_fee=offer['mining_fee'],
min_amount=offer['min_amount'],
max_amount=offer['max_amount'],
)
@ignore_exceptions
@log_exceptions
async def publish_offer(self, sm):
assert self.sm.is_server
offer = {
'percentage_fee': sm.percentage,
'mining_fee': sm.mining_fee,
'min_amount': sm._min_amount,
'max_amount': sm._max_amount,
'relays': sm.config.NOSTR_RELAYS,
'pow_nonce': hex(sm.config.SWAPSERVER_ANN_POW_NONCE),
}
# the first value of a single letter tag is indexed and can be filtered for
tags = [['d', f'electrum-swapserver-{self.NOSTR_EVENT_VERSION}'],
['r', 'net:' + constants.net.NET_NAME],
['expiration', str(int(time.time() + self.OFFER_UPDATE_INTERVAL_SEC + 10))]]
event_id = await aionostr._add_event(
self.relay_manager,
kind=self.USER_STATUS_NIP38,
tags=tags,
content=json.dumps(offer),
private_key=self.nostr_private_key)
self.logger.info(f"published offer {event_id}")
async def send_direct_message(self, pubkey: str, relays, content: str) -> str:
event_id = await aionostr._add_event(
self.relay_manager,
kind=self.NOSTR_DM,
content=content,
private_key=self.nostr_private_key,
direct_message=pubkey)
return event_id
@log_exceptions
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
request_data['method'] = method
request_data['relays'] = self.config.NOSTR_RELAYS
server_pubkey = self.config.SWAPSERVER_NPUB
event_id = await self.send_direct_message(server_pubkey, self.server_relays, json.dumps(request_data))
response = await self.dm_replies[event_id]
return response
async def receive_offers(self):
await self.is_connected.wait()
query = {
"kinds": [self.USER_STATUS_NIP38],
"limit":10,
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
"#r": [f"net:{constants.net.NET_NAME}"],
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC
}
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
try:
content = json.loads(event.content)
tags = {k: v for k, v in event.tags}
except Exception as e:
continue
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
continue
if tags.get('r') != f"net:{constants.net.NET_NAME}":
continue
# check if this is the most recent event for this pubkey
pubkey = event.pubkey
ts = self._offers.get(pubkey, {}).get('timestamp', 0)
if event.created_at <= ts:
#print('skipping old event', pubkey[0:10], event.id)
continue
try:
pow_bits = get_nostr_ann_pow_amount(
bytes.fromhex(pubkey),
int(content.get('pow_nonce', "0"), 16)
)
except ValueError:
continue
if pow_bits < self.config.SWAPSERVER_POW_TARGET:
self.logger.debug(f"too low pow: {pubkey}: pow: {pow_bits} nonce: {content.get('pow_nonce', 0)}")
continue
content['pow_bits'] = pow_bits
content['pubkey'] = pubkey
content['timestamp'] = event.created_at
self._offers[pubkey] = content
# mirror event to other relays
server_relays = content['relays'].split(',') if 'relays' in content else []
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))
async def get_pairs(self):
if not self.config.SWAPSERVER_NPUB:
return
query = {
"kinds": [self.USER_STATUS_NIP38],
"authors": [self.config.SWAPSERVER_NPUB],
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
"#r": [f"net:{constants.net.NET_NAME}"],
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC,
"limit": 1
}
async for event in self.relay_manager.get_events(query, single_event=True, only_stored=False):
try:
content = json.loads(event.content)
tags = {k: v for k, v in event.tags}
except Exception:
continue
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
continue
if tags.get('r') != f"net:{constants.net.NET_NAME}":
continue
# check if this is the most recent event for this pubkey
pubkey = event.pubkey
content['pubkey'] = pubkey
content['timestamp'] = event.created_at
self.logger.info(f'received offer from {age(event.created_at)}')
pairs = self._parse_offer(content)
self.sm.update_pairs(pairs)
self.server_relays = content['relays'].split(',')
async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
"""If the relays of the origin server are different from our relays we rebroadcast the
event to our relays so it gets spread more widely."""
if not server_relays:
return
rebroadcast_relays = [relay for relay in self.relay_manager.relays if
relay.url not in server_relays]
for relay in rebroadcast_relays:
try:
res = await relay.add_event(event, check_response=True)
except Exception as e:
self.logger.debug(f"failed to rebroadcast event to {relay.url}: {e}")
continue
self.logger.debug(f"rebroadcasted event to {relay.url}: {res}")
@log_exceptions
async def check_direct_messages(self):
privkey = aionostr.key.PrivateKey(self.private_key)
query = {"kinds": [self.NOSTR_DM], "limit":0, "#p": [self.nostr_pubkey]}
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
try:
content = privkey.decrypt_message(event.content, event.pubkey)
content = json.loads(content)
except Exception:
continue
content['event_id'] = event.id
content['event_pubkey'] = event.pubkey
if 'reply_to' in content:
self.dm_replies[content['reply_to']].set_result(content)
elif self.sm.is_server and 'method' in content:
await self.handle_request(content)
else:
self.logger.info(f'unknown message {content}')
@log_exceptions
async def handle_request(self, request):
assert self.sm.is_server
# todo: remember event_id of already processed requests
method = request.pop('method')
event_id = request.pop('event_id')
event_pubkey = request.pop('event_pubkey')
self.logger.info(f'handle_request: id={event_id} {method} {request}')
relays = request.pop('relays').split(',')
if method == 'addswapinvoice':
r = self.sm.server_add_swap_invoice(request)
elif method == 'createswap':
r = self.sm.server_create_swap(request)
elif method == 'createnormalswap':
r = self.sm.server_create_normal_swap(request)
else:
raise Exception(method)
r['reply_to'] = event_id
self.logger.info(f'sending response id={event_id}')
await self.send_direct_message(event_pubkey, relays, json.dumps(r))