1
0

qt: fix: bip70 pay reqs need x509 verification

regression from https://github.com/spesmilo/electrum/pull/8462

- pr.verify() was called in qml, but not in qt gui
- we now call pr.verify() in get_payment_request(), to make the API less error-prone
- it is now ok to call pr.verify() multiple times, the result is cached
This commit is contained in:
SomberNight
2023-07-10 17:50:53 +00:00
parent 023e8ff0eb
commit 612a8e6424
6 changed files with 49 additions and 26 deletions

View File

@@ -39,13 +39,14 @@ except ImportError:
sys.exit("Error: could not find paymentrequest_pb2.py. Create it with 'contrib/generate_payreqpb2.sh'")
from . import bitcoin, constants, ecc, util, transaction, x509, rsakey
from .util import bfh, make_aiohttp_session, error_text_bytes_to_safe_str
from .util import bfh, make_aiohttp_session, error_text_bytes_to_safe_str, get_running_loop
from .invoices import Invoice, get_id_from_onchain_outputs
from .crypto import sha256
from .bitcoin import address_to_script
from .transaction import PartialTxOutput
from .network import Network
from .logging import get_logger, Logger
from .contacts import Contacts
if TYPE_CHECKING:
from .simple_config import SimpleConfig
@@ -104,6 +105,10 @@ async def get_payment_request(url: str) -> 'PaymentRequest':
data = None
error = f"Unknown scheme for payment request. URL: {url}"
pr = PaymentRequest(data, error=error)
loop = get_running_loop()
# do x509/dnssec verification now (in separate thread, to avoid blocking event loop).
# we still expect the caller to at least check pr.error!
await loop.run_in_executor(None, pr.verify)
return pr
@@ -111,15 +116,17 @@ class PaymentRequest:
def __init__(self, data: bytes, *, error=None):
self.raw = data
self.error = error # FIXME overloaded and also used when 'verify' succeeds
self.parse(data)
self.error = error # type: Optional[str]
self._verified_success = None # caches result of _verify
self._verified_success_msg = None # type: Optional[str]
self._parse(data)
self.requestor = None # known after verify
self.tx = None
def __str__(self):
return str(self.raw)
def parse(self, r: bytes):
def _parse(self, r: bytes):
self.outputs = [] # type: List[PartialTxOutput]
if self.error:
return
@@ -147,8 +154,11 @@ class PaymentRequest:
self.memo = self.details.memo
self.payment_url = self.details.payment_url
def verify(self, contacts):
def verify(self) -> bool:
# FIXME: we should enforce that this method was called before we attempt payment
# note: this method might do network requests (at least for verify_dnssec)
if self._verified_success is True:
return True
if self.error:
return False
if not self.raw:
@@ -167,7 +177,7 @@ class PaymentRequest:
if pr.pki_type in ["x509+sha256", "x509+sha1"]:
return self.verify_x509(pr)
elif pr.pki_type in ["dnssec+btc", "dnssec+ecdsa"]:
return self.verify_dnssec(pr, contacts)
return self.verify_dnssec(pr)
else:
self.error = "ERROR: Unsupported PKI Type for Message Signature"
return False
@@ -209,13 +219,14 @@ class PaymentRequest:
self.error = "ERROR: Invalid Signature for Payment Request Data"
return False
### SIG Verified
self.error = 'Signed by Trusted CA: ' + ca.get_common_name()
self._verified_success_msg = 'Signed by Trusted CA: ' + ca.get_common_name()
self._verified_success = True
return True
def verify_dnssec(self, pr, contacts):
def verify_dnssec(self, pr):
sig = pr.signature
alias = pr.pki_data
info = contacts.resolve(alias)
info = Contacts.resolve_openalias(alias)
if info.get('validated') is not True:
self.error = "Alias verification failed (DNSSEC)"
return False
@@ -225,7 +236,8 @@ class PaymentRequest:
pr.signature = b''
message = pr.SerializeToString()
if ecc.verify_message_with_address(address, sig, message):
self.error = 'Verified with DNSSEC'
self._verified_success_msg = 'Verified with DNSSEC'
self._verified_success = True
return True
else:
self.error = "verify failed"
@@ -257,8 +269,8 @@ class PaymentRequest:
def get_requestor(self):
return self.requestor if self.requestor else self.get_address()
def get_verify_status(self):
return self.error if self.requestor else "No Signature"
def get_verify_status(self) -> str:
return (self.error or self._verified_success_msg) if self.requestor else "No Signature"
def get_memo(self):
return self.memo