Lightning addresses with 'lightning:' do occur in the wild and make sense (how else would e.g. the smartphone know to open a lightning wallet instead of the e-mail client). So we should allow this.
740 lines
30 KiB
Python
740 lines
30 KiB
Python
import asyncio
|
|
import time
|
|
import urllib
|
|
import re
|
|
from decimal import Decimal, InvalidOperation
|
|
from enum import IntEnum
|
|
from typing import NamedTuple, Optional, Callable, List, TYPE_CHECKING, Tuple, Union
|
|
|
|
from . import bitcoin
|
|
from .contacts import AliasNotFoundException
|
|
from .i18n import _
|
|
from .invoices import Invoice
|
|
from .logging import Logger
|
|
from .util import parse_max_spend, InvoiceError
|
|
from .util import get_asyncio_loop, log_exceptions
|
|
from .transaction import PartialTxOutput
|
|
from .lnurl import (decode_lnurl, request_lnurl, callback_lnurl, LNURLError,
|
|
lightning_address_to_url, try_resolve_lnurlpay, LNURL6Data,
|
|
LNURL3Data, LNURLData)
|
|
from .bitcoin import opcodes, construct_script
|
|
from .lnaddr import LnInvoiceException
|
|
from .lnutil import IncompatibleOrInsaneFeatures
|
|
from .bip21 import parse_bip21_URI, InvalidBitcoinURI, LIGHTNING_URI_SCHEME, BITCOIN_BIP21_URI_SCHEME
|
|
from .segwit_addr import bech32_decode
|
|
from . import paymentrequest
|
|
|
|
if TYPE_CHECKING:
|
|
from .wallet import Abstract_Wallet
|
|
from .transaction import Transaction
|
|
|
|
|
|
def maybe_extract_bech32_lightning_payment_identifier(data: str) -> Optional[str]:
|
|
data = remove_uri_prefix(data, prefix=LIGHTNING_URI_SCHEME)
|
|
if not data.startswith('ln'):
|
|
return None
|
|
decoded_bech32 = bech32_decode(data, ignore_long_length=True)
|
|
if not decoded_bech32.hrp or not decoded_bech32.data:
|
|
return None
|
|
return data
|
|
|
|
|
|
def remove_uri_prefix(data: str, *, prefix: str) -> str:
|
|
assert isinstance(data, str) and isinstance(prefix, str)
|
|
data = data.lower().strip()
|
|
data = data.removeprefix(prefix + ':')
|
|
return data
|
|
|
|
|
|
RE_ALIAS = r'(.*?)\s*\<([0-9A-Za-z]{1,})\>'
|
|
RE_EMAIL = r'\b[A-Za-z0-9._%+-]+@([A-Za-z0-9-]+\.)+[A-Z|a-z]{2,7}\b'
|
|
RE_DOMAIN = r'\b([A-Za-z0-9-]+\.)+[A-Z|a-z]{2,7}\b'
|
|
RE_SCRIPT_FN = r'script\((.*)\)'
|
|
|
|
|
|
class PaymentIdentifierState(IntEnum):
|
|
EMPTY = 0 # Initial state.
|
|
INVALID = 1 # Unrecognized PI
|
|
AVAILABLE = 2 # PI contains a payable destination
|
|
# payable means there's enough addressing information to submit to one
|
|
# of the channels Electrum supports (on-chain, lightning)
|
|
NEED_RESOLVE = 3 # PI contains a recognized destination format, but needs an online resolve step
|
|
LNURLP_FINALIZE = 4 # PI contains a resolved LNURLp, but needs amount and comment to resolve to a bolt11
|
|
LNURLW_FINALIZE = 5 # PI contains resolved LNURLw, user needs to enter amount and initiate withdraw
|
|
MERCHANT_NOTIFY = 6 # PI contains a valid payment request and on-chain destination. It should notify
|
|
# the merchant payment processor of the tx after on-chain broadcast,
|
|
# and supply a refund address (bip70)
|
|
MERCHANT_ACK = 7 # PI notified merchant. nothing to be done.
|
|
ERROR = 50 # generic error
|
|
NOT_FOUND = 51 # PI contains a recognized destination format, but resolve step was unsuccessful
|
|
MERCHANT_ERROR = 52 # PI failed notifying the merchant after broadcasting onchain TX
|
|
INVALID_AMOUNT = 53 # Specified amount not accepted
|
|
|
|
|
|
class PaymentIdentifierType(IntEnum):
|
|
UNKNOWN = 0
|
|
SPK = 1
|
|
BIP21 = 2
|
|
BIP70 = 3
|
|
MULTILINE = 4
|
|
BOLT11 = 5
|
|
LNURL = 6 # before the resolve it's unknown if pi is LNURLP or LNURLW
|
|
LNURLP = 7
|
|
LNURLW = 8
|
|
EMAILLIKE = 9
|
|
OPENALIAS = 10
|
|
LNADDR = 11
|
|
DOMAINLIKE = 12
|
|
|
|
|
|
class FieldsForGUI(NamedTuple):
|
|
recipient: Optional[str]
|
|
amount: Optional[int]
|
|
description: Optional[str]
|
|
validated: Optional[bool]
|
|
comment: Optional[int]
|
|
amount_range: Optional[Tuple[int, int]]
|
|
|
|
|
|
class PaymentIdentifier(Logger):
|
|
"""
|
|
Takes:
|
|
* bitcoin addresses or script
|
|
* paytomany csv
|
|
* openalias
|
|
* bip21 URI
|
|
* lightning-URI (containing bolt11 or lnurl)
|
|
* bolt11 invoice
|
|
* lnurl
|
|
* lightning address
|
|
"""
|
|
|
|
def __init__(self, wallet: Optional['Abstract_Wallet'], text: str):
|
|
Logger.__init__(self)
|
|
self._state = PaymentIdentifierState.EMPTY
|
|
self.wallet = wallet
|
|
self.contacts = wallet.contacts if wallet is not None else None
|
|
self.config = wallet.config if wallet is not None else None
|
|
self.text = text.strip()
|
|
self._type = PaymentIdentifierType.UNKNOWN
|
|
self.error = None # if set, GUI should show error and stop
|
|
self.warning = None # if set, GUI should ask user if they want to proceed
|
|
# more than one of those may be set
|
|
self.multiline_outputs = None
|
|
self._is_max = False
|
|
self.bolt11 = None # type: Optional[Invoice]
|
|
self.bip21 = None
|
|
self.spk = None
|
|
self.spk_is_address = False
|
|
#
|
|
self.emaillike = None
|
|
self.domainlike = None
|
|
self.openalias_data = None
|
|
#
|
|
self.bip70 = None
|
|
self.bip70_data = None
|
|
self.merchant_ack_status = None
|
|
self.merchant_ack_message = None
|
|
#
|
|
self.lnurl = None # type: Optional[str]
|
|
self.lnurl_data = None # type: Optional[LNURLData]
|
|
|
|
self.parse(text)
|
|
|
|
@property
|
|
def type(self):
|
|
return self._type
|
|
|
|
def set_state(self, state: 'PaymentIdentifierState'):
|
|
self.logger.debug(f'PI state {self._state.name} -> {state.name}')
|
|
self._state = state
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
def need_resolve(self):
|
|
return self._state == PaymentIdentifierState.NEED_RESOLVE
|
|
|
|
def need_finalize(self):
|
|
return self._state == PaymentIdentifierState.LNURLP_FINALIZE
|
|
|
|
def need_merchant_notify(self):
|
|
return self._state == PaymentIdentifierState.MERCHANT_NOTIFY
|
|
|
|
def is_valid(self):
|
|
return self._state not in [PaymentIdentifierState.INVALID, PaymentIdentifierState.EMPTY]
|
|
|
|
def is_available(self):
|
|
return self._state in [PaymentIdentifierState.AVAILABLE]
|
|
|
|
def is_lightning(self):
|
|
return bool(self.lnurl) or bool(self.bolt11)
|
|
|
|
def is_onchain(self):
|
|
if self._type in [PaymentIdentifierType.SPK, PaymentIdentifierType.MULTILINE, PaymentIdentifierType.BIP70,
|
|
PaymentIdentifierType.OPENALIAS]:
|
|
return True
|
|
if self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.BOLT11, PaymentIdentifierType.LNADDR]:
|
|
return bool(self.bolt11) and bool(self.bolt11.get_address())
|
|
if self._type == PaymentIdentifierType.BIP21:
|
|
return bool(self.bip21.get('address', None)) or (bool(self.bolt11) and bool(self.bolt11.get_address()))
|
|
|
|
def is_multiline(self):
|
|
return bool(self.multiline_outputs)
|
|
|
|
def is_multiline_max(self):
|
|
return self.is_multiline() and self._is_max
|
|
|
|
def is_amount_locked(self):
|
|
if self._type == PaymentIdentifierType.BIP21:
|
|
return bool(self.bip21.get('amount'))
|
|
elif self._type == PaymentIdentifierType.BIP70:
|
|
return not self.need_resolve() # always fixed after resolve?
|
|
elif self._type == PaymentIdentifierType.BOLT11:
|
|
return bool(self.bolt11.get_amount_sat())
|
|
elif self._type in [PaymentIdentifierType.LNURLP, PaymentIdentifierType.LNADDR]:
|
|
# amount limits known after resolve, might be specific amount or locked to range
|
|
if self.need_resolve():
|
|
return False
|
|
if self.need_finalize():
|
|
self.logger.debug(f'lnurl f {self.lnurl_data.min_sendable_sat}-{self.lnurl_data.max_sendable_sat}')
|
|
return not (self.lnurl_data.min_sendable_sat < self.lnurl_data.max_sendable_sat)
|
|
return True
|
|
elif self._type == PaymentIdentifierType.MULTILINE:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def is_error(self) -> bool:
|
|
return self._state >= PaymentIdentifierState.ERROR
|
|
|
|
def get_error(self) -> str:
|
|
return self.error
|
|
|
|
def parse(self, text: str):
|
|
# parse text, set self._type and self.error
|
|
text = text.strip()
|
|
if not text:
|
|
return
|
|
if outputs := self._parse_as_multiline(text):
|
|
self._type = PaymentIdentifierType.MULTILINE
|
|
self.multiline_outputs = outputs
|
|
if self.error:
|
|
self.set_state(PaymentIdentifierState.INVALID)
|
|
else:
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
elif invoice_or_lnurl := maybe_extract_bech32_lightning_payment_identifier(text):
|
|
if invoice_or_lnurl.startswith('lnurl'):
|
|
self._type = PaymentIdentifierType.LNURL
|
|
try:
|
|
self.lnurl = decode_lnurl(invoice_or_lnurl)
|
|
self.set_state(PaymentIdentifierState.NEED_RESOLVE)
|
|
except Exception as e:
|
|
self.error = _("Error parsing LNURL") + f":\n{e}"
|
|
self.set_state(PaymentIdentifierState.INVALID)
|
|
return
|
|
else:
|
|
self._type = PaymentIdentifierType.BOLT11
|
|
try:
|
|
self.bolt11 = Invoice.from_bech32(invoice_or_lnurl)
|
|
except InvoiceError as e:
|
|
self.error = self._get_error_from_invoiceerror(e)
|
|
self.set_state(PaymentIdentifierState.INVALID)
|
|
self.logger.debug(f'Exception cause {e.args!r}')
|
|
return
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
elif text.lower().startswith(BITCOIN_BIP21_URI_SCHEME + ':'):
|
|
try:
|
|
out = parse_bip21_URI(text)
|
|
except InvalidBitcoinURI as e:
|
|
self.error = _("Error parsing URI") + f":\n{e}"
|
|
self.set_state(PaymentIdentifierState.INVALID)
|
|
return
|
|
self.bip21 = out
|
|
self.bip70 = out.get('r')
|
|
if self.bip70:
|
|
self._type = PaymentIdentifierType.BIP70
|
|
self.set_state(PaymentIdentifierState.NEED_RESOLVE)
|
|
else:
|
|
self._type = PaymentIdentifierType.BIP21
|
|
# check optional lightning in bip21, set self.bolt11 if valid
|
|
bolt11 = out.get('lightning')
|
|
if bolt11:
|
|
try:
|
|
self.bolt11 = Invoice.from_bech32(bolt11)
|
|
# carry BIP21 onchain address in Invoice.outputs in case bolt11 doesn't contain a fallback
|
|
# address but the BIP21 URI has one.
|
|
if bip21_address := self.bip21.get('address'):
|
|
amount = self.bip21.get('amount', 0)
|
|
self.bolt11.outputs = [PartialTxOutput.from_address_and_value(bip21_address, amount)]
|
|
except InvoiceError as e:
|
|
self.logger.debug(self._get_error_from_invoiceerror(e))
|
|
elif not self.bip21.get('address'):
|
|
# no address and no bolt11, invalid
|
|
self.set_state(PaymentIdentifierState.INVALID)
|
|
return
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
elif self.parse_output(text)[0]:
|
|
scriptpubkey, is_address = self.parse_output(text)
|
|
self._type = PaymentIdentifierType.SPK
|
|
self.spk = scriptpubkey
|
|
self.spk_is_address = is_address
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
elif self.contacts and (contact := self.contacts.by_name(text)):
|
|
if contact['type'] == 'address':
|
|
self._type = PaymentIdentifierType.BIP21
|
|
self.bip21 = {
|
|
'address': contact['address'],
|
|
'label': contact['name']
|
|
}
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
elif contact['type'] in ('openalias', 'lnaddress'):
|
|
self._type = PaymentIdentifierType.EMAILLIKE
|
|
self.emaillike = contact['address']
|
|
self.set_state(PaymentIdentifierState.NEED_RESOLVE)
|
|
elif re.match(RE_EMAIL, (maybe_emaillike := remove_uri_prefix(text, prefix=LIGHTNING_URI_SCHEME))):
|
|
self._type = PaymentIdentifierType.EMAILLIKE
|
|
self.emaillike = maybe_emaillike
|
|
self.set_state(PaymentIdentifierState.NEED_RESOLVE)
|
|
elif re.match(RE_DOMAIN, text):
|
|
self._type = PaymentIdentifierType.DOMAINLIKE
|
|
self.domainlike = text
|
|
self.set_state(PaymentIdentifierState.NEED_RESOLVE)
|
|
elif self.error is None:
|
|
truncated_text = f"{text[:100]}..." if len(text) > 100 else text
|
|
self.error = f"Unknown payment identifier:\n{truncated_text}"
|
|
self.set_state(PaymentIdentifierState.INVALID)
|
|
|
|
def resolve(self, *, on_finished: Callable[['PaymentIdentifier'], None]) -> None:
|
|
assert self._state == PaymentIdentifierState.NEED_RESOLVE
|
|
coro = self._do_resolve(on_finished=on_finished)
|
|
asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop())
|
|
|
|
@log_exceptions
|
|
async def _do_resolve(self, *, on_finished: Callable[['PaymentIdentifier'], None] = None):
|
|
try:
|
|
if self.emaillike or self.domainlike:
|
|
openalias_key = self.emaillike if self.emaillike else self.domainlike
|
|
openalias_task = asyncio.create_task(self.resolve_openalias(openalias_key))
|
|
|
|
# prefers lnurl over openalias if both are available
|
|
lnurl = lightning_address_to_url(self.emaillike) if self.emaillike else None
|
|
if lnurl is not None and (lnurl_result := await try_resolve_lnurlpay(lnurl)):
|
|
openalias_task.cancel()
|
|
self._type = PaymentIdentifierType.LNADDR
|
|
self.lnurl = lnurl
|
|
self.lnurl_data = lnurl_result
|
|
self.set_state(PaymentIdentifierState.LNURLP_FINALIZE)
|
|
elif openalias_result := await openalias_task:
|
|
self.openalias_data = openalias_result
|
|
address = openalias_result.get('address')
|
|
try:
|
|
# this assertion error message is shown in the GUI
|
|
assert bitcoin.is_address(address), f"{_('Openalias address invalid')}: {address[:100]}"
|
|
scriptpubkey = bitcoin.address_to_script(address)
|
|
self._type = PaymentIdentifierType.OPENALIAS
|
|
self.spk = scriptpubkey
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
except Exception as e:
|
|
self.error = str(e)
|
|
self.set_state(PaymentIdentifierState.NOT_FOUND)
|
|
else:
|
|
self.set_state(PaymentIdentifierState.NOT_FOUND)
|
|
elif self.bip70:
|
|
pr = await paymentrequest.get_payment_request(self.bip70)
|
|
if await pr.verify():
|
|
self.bip70_data = pr
|
|
self.set_state(PaymentIdentifierState.MERCHANT_NOTIFY)
|
|
else:
|
|
self.error = pr.error
|
|
self.set_state(PaymentIdentifierState.ERROR)
|
|
elif self.lnurl:
|
|
data = await request_lnurl(self.lnurl)
|
|
self.lnurl_data = data
|
|
if isinstance(data, LNURL6Data):
|
|
self._type = PaymentIdentifierType.LNURLP
|
|
self.set_state(PaymentIdentifierState.LNURLP_FINALIZE)
|
|
elif isinstance(data, LNURL3Data):
|
|
self._type = PaymentIdentifierType.LNURLW
|
|
self.set_state(PaymentIdentifierState.LNURLW_FINALIZE)
|
|
else:
|
|
raise NotImplementedError(f"Invalid LNURL type? {data=}")
|
|
self.logger.debug(f'LNURL data: {data!r}')
|
|
else:
|
|
self.set_state(PaymentIdentifierState.ERROR)
|
|
return
|
|
except Exception as e:
|
|
self.error = str(e)
|
|
self.logger.error(f"_do_resolve() got error: {e!r}")
|
|
self.set_state(PaymentIdentifierState.ERROR)
|
|
finally:
|
|
if on_finished:
|
|
on_finished(self)
|
|
|
|
def finalize(
|
|
self,
|
|
*,
|
|
amount_sat: int = 0,
|
|
comment: str = None,
|
|
on_finished: Callable[['PaymentIdentifier'], None] = None,
|
|
):
|
|
assert self._state == PaymentIdentifierState.LNURLP_FINALIZE
|
|
coro = self._do_finalize(amount_sat=amount_sat, comment=comment, on_finished=on_finished)
|
|
asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop())
|
|
|
|
@log_exceptions
|
|
async def _do_finalize(
|
|
self,
|
|
*,
|
|
amount_sat: int = None,
|
|
comment: str = None,
|
|
on_finished: Callable[['PaymentIdentifier'], None] = None,
|
|
):
|
|
from .invoices import Invoice
|
|
try:
|
|
if not self.lnurl_data:
|
|
raise Exception("Unexpected missing LNURL data")
|
|
|
|
if not (self.lnurl_data.min_sendable_sat <= amount_sat <= self.lnurl_data.max_sendable_sat):
|
|
self.error = _('Amount must be between {} and {} sat.').format(
|
|
self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat)
|
|
self.set_state(PaymentIdentifierState.INVALID_AMOUNT)
|
|
return
|
|
|
|
if self.lnurl_data.comment_allowed == 0:
|
|
comment = None
|
|
params = {'amount': amount_sat * 1000}
|
|
if comment:
|
|
params['comment'] = comment
|
|
|
|
try:
|
|
invoice_data = await callback_lnurl(self.lnurl_data.callback_url, params=params)
|
|
except LNURLError as e:
|
|
self.error = f"LNURL request encountered error: {e}"
|
|
self.set_state(PaymentIdentifierState.ERROR)
|
|
return
|
|
|
|
bolt11_invoice = invoice_data.get('pr')
|
|
invoice = Invoice.from_bech32(bolt11_invoice)
|
|
if invoice.get_amount_sat() != amount_sat:
|
|
raise Exception("lnurl returned invoice with wrong amount")
|
|
# this will change what is returned by get_fields_for_GUI
|
|
self.bolt11 = invoice
|
|
self.set_state(PaymentIdentifierState.AVAILABLE)
|
|
except Exception as e:
|
|
self.error = str(e)
|
|
self.logger.error(f"_do_finalize() got error: {e!r}")
|
|
self.set_state(PaymentIdentifierState.ERROR)
|
|
finally:
|
|
if on_finished:
|
|
on_finished(self)
|
|
|
|
def notify_merchant(
|
|
self,
|
|
*,
|
|
tx: 'Transaction',
|
|
refund_address: str,
|
|
on_finished: Callable[['PaymentIdentifier'], None] = None,
|
|
):
|
|
assert self._state == PaymentIdentifierState.MERCHANT_NOTIFY
|
|
assert tx
|
|
assert refund_address
|
|
coro = self._do_notify_merchant(tx, refund_address, on_finished=on_finished)
|
|
asyncio.run_coroutine_threadsafe(coro, get_asyncio_loop())
|
|
|
|
@log_exceptions
|
|
async def _do_notify_merchant(
|
|
self,
|
|
tx: 'Transaction',
|
|
refund_address: str,
|
|
*,
|
|
on_finished: Callable[['PaymentIdentifier'], None] = None,
|
|
):
|
|
try:
|
|
if not self.bip70_data:
|
|
self.set_state(PaymentIdentifierState.ERROR)
|
|
return
|
|
|
|
ack_status, ack_msg = await self.bip70_data.send_payment_and_receive_paymentack(tx.serialize(), refund_address)
|
|
self.logger.info(f"Payment ACK: {ack_status}. Ack message: {ack_msg}")
|
|
self.merchant_ack_status = ack_status
|
|
self.merchant_ack_message = ack_msg
|
|
self.set_state(PaymentIdentifierState.MERCHANT_ACK)
|
|
except Exception as e:
|
|
self.error = str(e)
|
|
self.logger.error(f"_do_notify_merchant() got error: {e!r}")
|
|
self.set_state(PaymentIdentifierState.MERCHANT_ERROR)
|
|
finally:
|
|
if on_finished:
|
|
on_finished(self)
|
|
|
|
def get_onchain_outputs(self, amount):
|
|
if self.bip70:
|
|
return self.bip70_data.get_outputs()
|
|
elif self.multiline_outputs:
|
|
return self.multiline_outputs
|
|
elif self.spk:
|
|
return [PartialTxOutput(scriptpubkey=self.spk, value=amount)]
|
|
elif self.bip21:
|
|
address = self.bip21.get('address')
|
|
scriptpubkey, is_address = self.parse_output(address)
|
|
assert is_address # unlikely, but make sure it is an address, not a script
|
|
return [PartialTxOutput(scriptpubkey=scriptpubkey, value=amount)]
|
|
else:
|
|
raise Exception('not onchain')
|
|
|
|
def _parse_as_multiline(self, text: str):
|
|
# filter out empty lines
|
|
lines = text.split('\n')
|
|
lines = [i for i in lines if i]
|
|
is_multiline = len(lines) > 1
|
|
outputs = [] # type: List[PartialTxOutput]
|
|
errors = ''
|
|
total = 0
|
|
self._is_max = False
|
|
for i, line in enumerate(lines):
|
|
try:
|
|
output = self.parse_address_and_amount(line)
|
|
outputs.append(output)
|
|
if parse_max_spend(output.value):
|
|
self._is_max = True
|
|
else:
|
|
total += output.value
|
|
except Exception as e:
|
|
errors = f'{errors}line #{i}: {str(e)}\n'
|
|
continue
|
|
if is_multiline and errors:
|
|
self.error = errors.strip() if errors else None
|
|
self.logger.debug(f'multiline: {outputs!r}, {self.error}')
|
|
return outputs
|
|
|
|
def parse_address_and_amount(self, line: str) -> PartialTxOutput:
|
|
try:
|
|
x, y = line.split(',')
|
|
except ValueError:
|
|
raise Exception("expected two comma-separated values: (address, amount)") from None
|
|
scriptpubkey, is_address = self.parse_output(x)
|
|
if not scriptpubkey:
|
|
raise Exception('Invalid address')
|
|
amount = self.parse_amount(y)
|
|
return PartialTxOutput(scriptpubkey=scriptpubkey, value=amount)
|
|
|
|
def parse_output(self, x: str) -> Tuple[Optional[bytes], bool]:
|
|
try:
|
|
address = self.parse_address(x)
|
|
return bitcoin.address_to_script(address), True
|
|
except Exception as e:
|
|
pass
|
|
try:
|
|
m = re.match('^' + RE_SCRIPT_FN + '$', x)
|
|
script = self.parse_script(str(m.group(1)))
|
|
return script, False
|
|
except Exception as e:
|
|
pass
|
|
|
|
return None, False
|
|
|
|
def parse_script(self, x: str) -> bytes:
|
|
script = bytearray()
|
|
for word in x.split():
|
|
if word[0:3] == 'OP_':
|
|
opcode_int = opcodes[word]
|
|
script += construct_script([opcode_int])
|
|
else:
|
|
bytes.fromhex(word) # to test it is hex data
|
|
script += construct_script([word])
|
|
return bytes(script)
|
|
|
|
def parse_amount(self, x: str) -> Union[str, int]:
|
|
x = x.strip()
|
|
if not x:
|
|
raise Exception("Amount is empty")
|
|
if parse_max_spend(x):
|
|
return x
|
|
p = pow(10, self.config.BTC_AMOUNTS_DECIMAL_POINT)
|
|
try:
|
|
return int(p * Decimal(x))
|
|
except InvalidOperation:
|
|
raise Exception("Invalid amount")
|
|
|
|
def parse_address(self, line: str):
|
|
r = line.strip()
|
|
m = re.match('^' + RE_ALIAS + '$', r)
|
|
address = str(m.group(2) if m else r)
|
|
assert bitcoin.is_address(address)
|
|
return address
|
|
|
|
def _get_error_from_invoiceerror(self, e: 'InvoiceError') -> str:
|
|
error = _("Error parsing Lightning invoice") + f":\n{e!r}"
|
|
if e.args and len(e.args):
|
|
arg = e.args[0]
|
|
if isinstance(arg, LnInvoiceException):
|
|
error = _("Error parsing Lightning invoice") + f":\n{e}"
|
|
elif isinstance(arg, IncompatibleOrInsaneFeatures):
|
|
error = _("Invoice requires unknown or incompatible Lightning feature") + f":\n{e!r}"
|
|
return error
|
|
|
|
def get_fields_for_GUI(self) -> FieldsForGUI:
|
|
recipient = None
|
|
amount = None
|
|
description = None
|
|
validated = None
|
|
comment = None
|
|
amount_range = None
|
|
|
|
if (self.emaillike or self.domainlike) and self.openalias_data:
|
|
key = self.emaillike if self.emaillike else self.domainlike
|
|
address = self.openalias_data.get('address')
|
|
name = self.openalias_data.get('name')
|
|
description = name
|
|
recipient = key + ' <' + address + '>'
|
|
|
|
elif self.bolt11:
|
|
recipient, amount, description = self._get_bolt11_fields()
|
|
|
|
elif self.lnurl and self.lnurl_data:
|
|
assert isinstance(self.lnurl_data, LNURL6Data), f"{self.lnurl_data=}"
|
|
domain = urllib.parse.urlparse(self.lnurl).netloc
|
|
recipient = f"{self.lnurl_data.metadata_plaintext} <{domain}>"
|
|
description = self.lnurl_data.metadata_plaintext
|
|
if self.lnurl_data.comment_allowed:
|
|
comment = self.lnurl_data.comment_allowed
|
|
if self.lnurl_data.min_sendable_sat:
|
|
amount = self.lnurl_data.min_sendable_sat
|
|
if self.lnurl_data.min_sendable_sat != self.lnurl_data.max_sendable_sat:
|
|
amount_range = (self.lnurl_data.min_sendable_sat, self.lnurl_data.max_sendable_sat)
|
|
|
|
elif self.bip70 and self.bip70_data:
|
|
pr = self.bip70_data
|
|
if pr.error:
|
|
self.error = pr.error
|
|
else:
|
|
recipient = pr.get_requestor()
|
|
amount = pr.get_amount()
|
|
description = pr.get_memo()
|
|
validated = not pr.has_expired()
|
|
|
|
elif self.spk:
|
|
pass
|
|
|
|
elif self.multiline_outputs:
|
|
pass
|
|
|
|
elif self.bip21:
|
|
label = self.bip21.get('label')
|
|
address = self.bip21.get('address')
|
|
recipient = f'{label} <{address}>' if label else address
|
|
amount = self.bip21.get('amount')
|
|
description = self.bip21.get('message')
|
|
# TODO: use label as description? (not BIP21 compliant)
|
|
# if label and not description:
|
|
# description = label
|
|
|
|
return FieldsForGUI(recipient=recipient, amount=amount, description=description,
|
|
comment=comment, validated=validated, amount_range=amount_range)
|
|
|
|
def _get_bolt11_fields(self):
|
|
lnaddr = self.bolt11._lnaddr # TODO: improve access to lnaddr
|
|
pubkey = lnaddr.pubkey.serialize().hex()
|
|
for k, v in lnaddr.tags:
|
|
if k == 'd':
|
|
description = v
|
|
break
|
|
else:
|
|
description = ''
|
|
amount = lnaddr.get_amount_sat()
|
|
return pubkey, amount, description
|
|
|
|
async def resolve_openalias(self, key: str) -> Optional[dict]:
|
|
parts = key.split(sep=',') # assuming single line
|
|
if parts and len(parts) > 0 and bitcoin.is_address(parts[0]):
|
|
return None
|
|
try:
|
|
data = await self.contacts.resolve(key) # TODO: don't use contacts as delegate to resolve openalias, separate.
|
|
self.logger.debug(f'OA: {data!r}')
|
|
return data
|
|
except AliasNotFoundException as e:
|
|
self.logger.info(f'OpenAlias not found: {repr(e)}')
|
|
return None
|
|
except Exception as e:
|
|
self.logger.info(f'error resolving address/alias: {repr(e)}')
|
|
return None
|
|
|
|
def has_expired(self):
|
|
if self.bip70 and self.bip70_data:
|
|
return self.bip70_data.has_expired()
|
|
elif self.bolt11:
|
|
return self.bolt11.has_expired()
|
|
elif self.bip21:
|
|
expires = self.bip21.get('exp') + self.bip21.get('time') if self.bip21.get('exp') else 0
|
|
return bool(expires) and expires < time.time()
|
|
return False
|
|
|
|
|
|
def invoice_from_payment_identifier(
|
|
pi: 'PaymentIdentifier',
|
|
wallet: 'Abstract_Wallet',
|
|
amount_sat: Union[int, str],
|
|
message: str = None
|
|
) -> Optional[Invoice]:
|
|
assert pi.state in [PaymentIdentifierState.AVAILABLE, PaymentIdentifierState.MERCHANT_NOTIFY]
|
|
assert pi.is_onchain() if amount_sat == '!' else True # MAX should only be allowed if pi has onchain destination
|
|
|
|
if pi.is_lightning() and not amount_sat == '!':
|
|
invoice = pi.bolt11
|
|
if not invoice:
|
|
return
|
|
if invoice._lnaddr.get_amount_msat() is None:
|
|
invoice.set_amount_msat(int(amount_sat * 1000))
|
|
return invoice
|
|
else:
|
|
outputs = pi.get_onchain_outputs(amount_sat)
|
|
message = pi.bip21.get('message') if pi.bip21 else message
|
|
bip70_data = pi.bip70_data if pi.bip70 else None
|
|
return wallet.create_invoice(
|
|
outputs=outputs,
|
|
message=message,
|
|
pr=bip70_data,
|
|
URI=pi.bip21)
|
|
|
|
|
|
# Note: this is only really used for bip70 to handle MECHANT_NOTIFY state from
|
|
# a saved bip70 invoice.
|
|
# TODO: reflect bip70-only in function name, or implement other types as well.
|
|
def payment_identifier_from_invoice(
|
|
wallet: 'Abstract_Wallet',
|
|
invoice: Invoice
|
|
) -> Optional[PaymentIdentifier]:
|
|
if not invoice:
|
|
return
|
|
pi = PaymentIdentifier(wallet, '')
|
|
if invoice.bip70:
|
|
pi._type = PaymentIdentifierType.BIP70
|
|
pi.bip70_data = paymentrequest.PaymentRequest(bytes.fromhex(invoice.bip70))
|
|
pi.set_state(PaymentIdentifierState.MERCHANT_NOTIFY)
|
|
return pi
|
|
# else:
|
|
# if invoice.outputs:
|
|
# if len(invoice.outputs) > 1:
|
|
# pi._type = PaymentIdentifierType.MULTILINE
|
|
# pi.multiline_outputs = invoice.outputs
|
|
# pi.set_state(PaymentIdentifierState.AVAILABLE)
|
|
# else:
|
|
# pi._type = PaymentIdentifierType.BIP21
|
|
# params = {}
|
|
# if invoice.exp:
|
|
# params['exp'] = str(invoice.exp)
|
|
# if invoice.time:
|
|
# params['time'] = str(invoice.time)
|
|
# pi.bip21 = create_bip21_uri(invoice.outputs[0].address, invoice.get_amount_sat(), invoice.message,
|
|
# extra_query_params=params)
|
|
# pi.set_state(PaymentIdentifierState.AVAILABLE)
|
|
# elif invoice.is_lightning():
|
|
# pi._type = PaymentIdentifierType.BOLT11
|
|
# pi.bolt11 = invoice
|
|
# pi.set_state(PaymentIdentifierState.AVAILABLE)
|
|
# else:
|
|
# return None
|
|
# return pi
|