prepare for separation of ecc module:
- move encrypt/sign functions elsewhere - remove local dependencies in ecc.py, ecc_fast.py (except logging)
This commit is contained in:
@@ -769,7 +769,7 @@ def taproot_tweak_pubkey(pubkey32: bytes, h: bytes) -> Tuple[int, bytes]:
|
||||
assert len(pubkey32) == 32, len(pubkey32)
|
||||
int_from_bytes = lambda x: int.from_bytes(x, byteorder="big", signed=False)
|
||||
|
||||
tweak = int_from_bytes(ecc.bip340_tagged_hash(b"TapTweak", pubkey32 + h))
|
||||
tweak = int_from_bytes(bip340_tagged_hash(b"TapTweak", pubkey32 + h))
|
||||
if tweak >= ecc.CURVE_ORDER:
|
||||
raise ValueError
|
||||
P = ecc.ECPubkey(b"\x02" + pubkey32)
|
||||
@@ -786,7 +786,7 @@ def taproot_tweak_seckey(seckey0: bytes, h: bytes) -> bytes:
|
||||
P = ecc.ECPrivkey(seckey0)
|
||||
seckey = P.secret_scalar if P.has_even_y() else ecc.CURVE_ORDER - P.secret_scalar
|
||||
pubkey32 = P.get_public_key_bytes(compressed=True)[1:]
|
||||
tweak = int_from_bytes(ecc.bip340_tagged_hash(b"TapTweak", pubkey32 + h))
|
||||
tweak = int_from_bytes(bip340_tagged_hash(b"TapTweak", pubkey32 + h))
|
||||
if tweak >= ecc.CURVE_ORDER:
|
||||
raise ValueError
|
||||
return int.to_bytes((seckey + tweak) % ecc.CURVE_ORDER, length=32, byteorder="big", signed=False)
|
||||
@@ -798,18 +798,21 @@ def taproot_tweak_seckey(seckey0: bytes, h: bytes) -> bytes:
|
||||
TapTreeLeaf = Tuple[int, bytes]
|
||||
TapTree = Union[TapTreeLeaf, Sequence['TapTree']]
|
||||
|
||||
def bip340_tagged_hash(tag: bytes, msg: bytes) -> bytes:
|
||||
# note: _libsecp256k1.secp256k1_tagged_sha256 benchmarks about 70% slower than this (on my machine)
|
||||
return sha256(sha256(tag) + sha256(tag) + msg)
|
||||
|
||||
def taproot_tree_helper(script_tree: TapTree):
|
||||
if isinstance(script_tree, tuple):
|
||||
leaf_version, script = script_tree
|
||||
h = ecc.bip340_tagged_hash(b"TapLeaf", bytes([leaf_version]) + witness_push(script))
|
||||
h = bip340_tagged_hash(b"TapLeaf", bytes([leaf_version]) + witness_push(script))
|
||||
return ([((leaf_version, script), bytes())], h)
|
||||
left, left_h = taproot_tree_helper(script_tree[0])
|
||||
right, right_h = taproot_tree_helper(script_tree[1])
|
||||
ret = [(l, c + right_h) for l, c in left] + [(l, c + left_h) for l, c in right]
|
||||
if right_h < left_h:
|
||||
left_h, right_h = right_h, left_h
|
||||
return (ret, ecc.bip340_tagged_hash(b"TapBranch", left_h + right_h))
|
||||
return (ret, bip340_tagged_hash(b"TapBranch", left_h + right_h))
|
||||
|
||||
|
||||
def taproot_output_script(internal_pubkey: bytes, *, script_tree: Optional[TapTree]) -> bytes:
|
||||
@@ -838,3 +841,38 @@ def control_block_for_taproot_script_spend(
|
||||
pubkey_data = bytes([output_pubkey_y_parity + leaf_version]) + internal_pubkey
|
||||
control_block = pubkey_data + merkle_path
|
||||
return (leaf_script, control_block)
|
||||
|
||||
|
||||
# user message signing
|
||||
def usermessage_magic(message: bytes) -> bytes:
|
||||
from .bitcoin import var_int
|
||||
length = var_int(len(message))
|
||||
return b"\x18Bitcoin Signed Message:\n" + length + message
|
||||
|
||||
def ecdsa_sign_usermessage(ec_privkey, message: Union[bytes, str], *, is_compressed: bool) -> bytes:
|
||||
message = to_bytes(message, 'utf8')
|
||||
msg32 = sha256d(usermessage_magic(message))
|
||||
return ec_privkey.ecdsa_sign_recoverable(msg32, is_compressed=is_compressed)
|
||||
|
||||
def verify_usermessage_with_address(address: str, sig65: bytes, message: bytes, *, net=None) -> bool:
|
||||
from .bitcoin import pubkey_to_address
|
||||
from .ecc import ECPubkey
|
||||
assert_bytes(sig65, message)
|
||||
if net is None: net = constants.net
|
||||
h = sha256d(usermessage_magic(message))
|
||||
try:
|
||||
public_key, compressed, txin_type_guess = ECPubkey.from_ecdsa_sig65(sig65, h)
|
||||
except Exception as e:
|
||||
return False
|
||||
# check public key using the address
|
||||
pubkey_hex = public_key.get_public_key_hex(compressed)
|
||||
txin_types = (txin_type_guess,) if txin_type_guess else ('p2pkh', 'p2wpkh', 'p2wpkh-p2sh')
|
||||
for txin_type in txin_types:
|
||||
addr = pubkey_to_address(txin_type, pubkey_hex, net=net)
|
||||
if address == addr:
|
||||
break
|
||||
else:
|
||||
return False
|
||||
# check message
|
||||
# note: `$ bitcoin-cli verifymessage` does NOT enforce the low-S rule for ecdsa sigs
|
||||
return public_key.ecdsa_verify(sig65[1:], h, enforce_low_s=False)
|
||||
|
||||
@@ -716,7 +716,7 @@ class Commands:
|
||||
"""Verify a signature."""
|
||||
sig = base64.b64decode(signature)
|
||||
message = util.to_bytes(message)
|
||||
return ecc.verify_usermessage_with_address(address, sig, message)
|
||||
return bitcoin.verify_usermessage_with_address(address, sig, message)
|
||||
|
||||
@command('wp')
|
||||
async def payto(self, destination, amount, fee=None, feerate=None, from_addr=None, from_coins=None, change_addr=None,
|
||||
@@ -910,7 +910,7 @@ class Commands:
|
||||
except TypeError:
|
||||
raise UserFacingException(f"message must be a string-like object instead of {repr(message)}")
|
||||
public_key = ecc.ECPubkey(bfh(pubkey))
|
||||
encrypted = public_key.encrypt_message(message)
|
||||
encrypted = crypto.ecies_encrypt_message(public_key, message)
|
||||
return encrypted.decode('utf-8')
|
||||
|
||||
@command('wp')
|
||||
|
||||
@@ -34,7 +34,7 @@ from typing import Union, Mapping, Optional
|
||||
from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
|
||||
from .i18n import _
|
||||
from .logging import get_logger
|
||||
|
||||
from . import ecc
|
||||
|
||||
_logger = get_logger(__name__)
|
||||
|
||||
@@ -443,3 +443,41 @@ def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
|
||||
decryptor = cipher.decryptor()
|
||||
return decryptor.update(data)
|
||||
raise Exception("no chacha20 backend found")
|
||||
|
||||
|
||||
def ecies_encrypt_message(ec_pubkey, message: bytes, *, magic: bytes = b'BIE1') -> bytes:
|
||||
"""
|
||||
ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the cipher; hmac-sha256 is used as the mac
|
||||
"""
|
||||
assert_bytes(message)
|
||||
ephemeral = ecc.ECPrivkey.generate_random_key()
|
||||
ecdh_key = (ec_pubkey * ephemeral.secret_scalar).get_public_key_bytes(compressed=True)
|
||||
key = hashlib.sha512(ecdh_key).digest()
|
||||
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
|
||||
ciphertext = aes_encrypt_with_iv(key_e, iv, message)
|
||||
ephemeral_pubkey = ephemeral.get_public_key_bytes(compressed=True)
|
||||
encrypted = magic + ephemeral_pubkey + ciphertext
|
||||
mac = hmac_oneshot(key_m, encrypted, hashlib.sha256)
|
||||
return base64.b64encode(encrypted + mac)
|
||||
|
||||
|
||||
def ecies_decrypt_message(ec_privkey, encrypted: Union[str, bytes], *, magic: bytes=b'BIE1') -> bytes:
|
||||
encrypted = base64.b64decode(encrypted) # type: bytes
|
||||
if len(encrypted) < 85:
|
||||
raise Exception('invalid ciphertext: length')
|
||||
magic_found = encrypted[:4]
|
||||
ephemeral_pubkey_bytes = encrypted[4:37]
|
||||
ciphertext = encrypted[37:-32]
|
||||
mac = encrypted[-32:]
|
||||
if magic_found != magic:
|
||||
raise Exception('invalid ciphertext: invalid magic bytes')
|
||||
try:
|
||||
ephemeral_pubkey = ecc.ECPubkey(ephemeral_pubkey_bytes)
|
||||
except ecc.InvalidECPointException as e:
|
||||
raise Exception('invalid ciphertext: invalid ephemeral pubkey') from e
|
||||
ecdh_key = (ephemeral_pubkey * ec_privkey.secret_scalar).get_public_key_bytes(compressed=True)
|
||||
key = hashlib.sha512(ecdh_key).digest()
|
||||
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
|
||||
if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
|
||||
raise InvalidPassword()
|
||||
return aes_decrypt_with_iv(key_e, iv, ciphertext)
|
||||
|
||||
@@ -26,20 +26,17 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import functools
|
||||
import secrets
|
||||
from typing import Union, Tuple, Optional
|
||||
from ctypes import (
|
||||
byref, c_char_p, c_size_t, create_string_buffer, cast,
|
||||
)
|
||||
|
||||
from .util import bfh, assert_bytes, to_bytes, InvalidPassword, profiler, randrange
|
||||
from .crypto import (sha256, sha256d, aes_encrypt_with_iv, aes_decrypt_with_iv, hmac_oneshot)
|
||||
from . import constants
|
||||
from .logging import get_logger
|
||||
from . import ecc_fast
|
||||
from .ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED, LibModuleMissing
|
||||
|
||||
_logger = get_logger(__name__)
|
||||
|
||||
def assert_bytes(x):
|
||||
assert isinstance(x, (bytes, bytearray))
|
||||
|
||||
# Some unit tests need to create ECDSA sigs without grinding the R value (and just use RFC6979).
|
||||
# see https://github.com/bitcoin/bitcoin/pull/13666
|
||||
@@ -382,23 +379,6 @@ class ECPubkey(object):
|
||||
return False
|
||||
return True
|
||||
|
||||
def encrypt_message(self, message: bytes, *, magic: bytes = b'BIE1') -> bytes:
|
||||
"""
|
||||
ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the cipher; hmac-sha256 is used as the mac
|
||||
"""
|
||||
assert_bytes(message)
|
||||
|
||||
ephemeral = ECPrivkey.generate_random_key()
|
||||
ecdh_key = (self * ephemeral.secret_scalar).get_public_key_bytes(compressed=True)
|
||||
key = hashlib.sha512(ecdh_key).digest()
|
||||
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
|
||||
ciphertext = aes_encrypt_with_iv(key_e, iv, message)
|
||||
ephemeral_pubkey = ephemeral.get_public_key_bytes(compressed=True)
|
||||
encrypted = magic + ephemeral_pubkey + ciphertext
|
||||
mac = hmac_oneshot(key_m, encrypted, hashlib.sha256)
|
||||
|
||||
return base64.b64encode(encrypted + mac)
|
||||
|
||||
@classmethod
|
||||
def order(cls) -> int:
|
||||
return CURVE_ORDER
|
||||
@@ -424,34 +404,6 @@ CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D
|
||||
POINT_AT_INFINITY = ECPubkey(None)
|
||||
|
||||
|
||||
def usermessage_magic(message: bytes) -> bytes:
|
||||
from .bitcoin import var_int
|
||||
length = var_int(len(message))
|
||||
return b"\x18Bitcoin Signed Message:\n" + length + message
|
||||
|
||||
|
||||
def verify_usermessage_with_address(address: str, sig65: bytes, message: bytes, *, net=None) -> bool:
|
||||
from .bitcoin import pubkey_to_address
|
||||
assert_bytes(sig65, message)
|
||||
if net is None: net = constants.net
|
||||
h = sha256d(usermessage_magic(message))
|
||||
try:
|
||||
public_key, compressed, txin_type_guess = ECPubkey.from_ecdsa_sig65(sig65, h)
|
||||
except Exception as e:
|
||||
return False
|
||||
# check public key using the address
|
||||
pubkey_hex = public_key.get_public_key_hex(compressed)
|
||||
txin_types = (txin_type_guess,) if txin_type_guess else ('p2pkh', 'p2wpkh', 'p2wpkh-p2sh')
|
||||
for txin_type in txin_types:
|
||||
addr = pubkey_to_address(txin_type, pubkey_hex, net=net)
|
||||
if address == addr:
|
||||
break
|
||||
else:
|
||||
return False
|
||||
# check message
|
||||
# note: `$ bitcoin-cli verifymessage` does NOT enforce the low-S rule for ecdsa sigs
|
||||
return public_key.ecdsa_verify(sig65[1:], h, enforce_low_s=False)
|
||||
|
||||
|
||||
def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
|
||||
if isinstance(secret, bytes):
|
||||
@@ -499,7 +451,7 @@ class ECPrivkey(ECPubkey):
|
||||
|
||||
@classmethod
|
||||
def generate_random_key(cls) -> 'ECPrivkey':
|
||||
randint = randrange(CURVE_ORDER)
|
||||
randint = secrets.randbelow(CURVE_ORDER - 1) + 1
|
||||
ephemeral_exponent = int.to_bytes(randint, length=32, byteorder='big', signed=False)
|
||||
return ECPrivkey(ephemeral_exponent)
|
||||
|
||||
@@ -592,31 +544,6 @@ class ECPrivkey(ECPubkey):
|
||||
sig65, recid = bruteforce_recid(sig64)
|
||||
return sig65
|
||||
|
||||
def ecdsa_sign_usermessage(self, message: Union[bytes, str], *, is_compressed: bool) -> bytes:
|
||||
message = to_bytes(message, 'utf8')
|
||||
msg32 = sha256d(usermessage_magic(message))
|
||||
return self.ecdsa_sign_recoverable(msg32, is_compressed=is_compressed)
|
||||
|
||||
def decrypt_message(self, encrypted: Union[str, bytes], *, magic: bytes=b'BIE1') -> bytes:
|
||||
encrypted = base64.b64decode(encrypted) # type: bytes
|
||||
if len(encrypted) < 85:
|
||||
raise Exception('invalid ciphertext: length')
|
||||
magic_found = encrypted[:4]
|
||||
ephemeral_pubkey_bytes = encrypted[4:37]
|
||||
ciphertext = encrypted[37:-32]
|
||||
mac = encrypted[-32:]
|
||||
if magic_found != magic:
|
||||
raise Exception('invalid ciphertext: invalid magic bytes')
|
||||
try:
|
||||
ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
|
||||
except InvalidECPointException as e:
|
||||
raise Exception('invalid ciphertext: invalid ephemeral pubkey') from e
|
||||
ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
|
||||
key = hashlib.sha512(ecdh_key).digest()
|
||||
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
|
||||
if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
|
||||
raise InvalidPassword()
|
||||
return aes_decrypt_with_iv(key_e, iv, ciphertext)
|
||||
|
||||
|
||||
def construct_ecdsa_sig65(sig64: bytes, recid: int, *, is_compressed: bool) -> bytes:
|
||||
@@ -624,6 +551,3 @@ def construct_ecdsa_sig65(sig64: bytes, recid: int, *, is_compressed: bool) -> b
|
||||
return bytes([27 + recid + comp]) + sig64
|
||||
|
||||
|
||||
def bip340_tagged_hash(tag: bytes, msg: bytes) -> bytes:
|
||||
# note: _libsecp256k1.secp256k1_tagged_sha256 benchmarks about 70% slower than this (on my machine)
|
||||
return sha256(sha256(tag) + sha256(tag) + msg)
|
||||
|
||||
@@ -12,7 +12,7 @@ from electrum.util import WalletFileException, standardize_path, InvalidPassword
|
||||
from electrum.plugin import run_hook
|
||||
from electrum.lnchannel import ChannelState
|
||||
from electrum.bitcoin import is_address
|
||||
from electrum.ecc import verify_usermessage_with_address
|
||||
from electrum.bitcoin import verify_usermessage_with_address
|
||||
from electrum.storage import StorageReadWriteError
|
||||
|
||||
from .auth import AuthMixin, auth_protect
|
||||
|
||||
@@ -1993,7 +1993,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
|
||||
try:
|
||||
# This can throw on invalid base64
|
||||
sig = base64.b64decode(str(signature.toPlainText()))
|
||||
verified = ecc.verify_usermessage_with_address(address, sig, message)
|
||||
verified = bitcoin.verify_usermessage_with_address(address, sig, message)
|
||||
except Exception as e:
|
||||
verified = False
|
||||
if verified:
|
||||
@@ -2057,6 +2057,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
|
||||
self.thread.add(task, on_success=setText)
|
||||
|
||||
def do_encrypt(self, message_e, pubkey_e, encrypted_e):
|
||||
from electrum import crypto
|
||||
message = message_e.toPlainText()
|
||||
message = message.encode('utf-8')
|
||||
try:
|
||||
@@ -2065,7 +2066,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
|
||||
self.logger.exception('Invalid Public key')
|
||||
self.show_warning(_('Invalid Public key'))
|
||||
return
|
||||
encrypted = public_key.encrypt_message(message)
|
||||
encrypted = crypto.ecies_encrypt_message(public_key, message)
|
||||
encrypted_e.setText(encrypted.decode('ascii'))
|
||||
|
||||
def encrypt_message(self, address=''):
|
||||
|
||||
@@ -41,6 +41,7 @@ from .bip32 import (convert_bip32_strpath_to_intpath, BIP32_PRIME,
|
||||
KeyOriginInfo)
|
||||
from .descriptor import PubkeyProvider
|
||||
from .ecc import string_to_number
|
||||
from . import crypto
|
||||
from .crypto import (pw_decode, pw_encode, sha256, sha256d, PW_HASH_VERSION_LATEST,
|
||||
SUPPORTED_PW_HASH_VERSIONS, UnsupportedPasswordHashVersion, hash_160,
|
||||
CiphertextFormatError)
|
||||
@@ -223,12 +224,12 @@ class Software_KeyStore(KeyStore):
|
||||
def sign_message(self, sequence, message, password, *, script_type=None) -> bytes:
|
||||
privkey, compressed = self.get_private_key(sequence, password)
|
||||
key = ecc.ECPrivkey(privkey)
|
||||
return key.ecdsa_sign_usermessage(message, is_compressed=compressed)
|
||||
return bitcoin.ecdsa_sign_usermessage(key, message, is_compressed=compressed)
|
||||
|
||||
def decrypt_message(self, sequence, message, password) -> bytes:
|
||||
privkey, compressed = self.get_private_key(sequence, password)
|
||||
ec = ecc.ECPrivkey(privkey)
|
||||
decrypted = ec.decrypt_message(message)
|
||||
decrypted = crypto.ecies_decrypt_message(ec, message)
|
||||
return decrypted
|
||||
|
||||
def sign_transaction(self, tx, password):
|
||||
|
||||
@@ -235,7 +235,7 @@ class PaymentRequest:
|
||||
address = info.get('address')
|
||||
pr.signature = b''
|
||||
message = pr.SerializeToString()
|
||||
if ecc.verify_usermessage_with_address(address, sig, message):
|
||||
if bitcoin.verify_usermessage_with_address(address, sig, message):
|
||||
self._verified_success_msg = 'Verified with DNSSEC'
|
||||
self._verified_success = True
|
||||
return True
|
||||
@@ -356,7 +356,7 @@ def sign_request_with_alias(pr, alias, alias_privkey):
|
||||
message = pr.SerializeToString()
|
||||
ec_key = ecc.ECPrivkey(alias_privkey)
|
||||
compressed = bitcoin.is_compressed_privkey(alias_privkey)
|
||||
pr.signature = ec_key.ecdsa_sign_usermessage(message, is_compressed=compressed)
|
||||
pr.signature = bitcoin.ecdsa_sign_usermessage(ec_key, message, is_compressed=compressed)
|
||||
|
||||
|
||||
def verify_cert_chain(chain):
|
||||
|
||||
@@ -31,7 +31,7 @@ import zlib
|
||||
from enum import IntEnum
|
||||
from typing import Optional
|
||||
|
||||
from . import ecc
|
||||
from . import ecc, crypto
|
||||
from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path,
|
||||
test_read_write_permissions, os_chmod)
|
||||
|
||||
@@ -192,7 +192,7 @@ class WalletStorage(Logger):
|
||||
ec_key = self.get_eckey_from_password(password)
|
||||
if self.raw:
|
||||
enc_magic = self._get_encryption_magic()
|
||||
s = zlib.decompress(ec_key.decrypt_message(self.raw, magic=enc_magic))
|
||||
s = zlib.decompress(crypto.ecies_decrypt_message(ec_key, self.raw, magic=enc_magic))
|
||||
s = s.decode('utf8')
|
||||
else:
|
||||
s = ''
|
||||
@@ -207,7 +207,7 @@ class WalletStorage(Logger):
|
||||
c = zlib.compress(s, level=zlib.Z_BEST_SPEED)
|
||||
enc_magic = self._get_encryption_magic()
|
||||
public_key = ecc.ECPubkey(bfh(self.pubkey))
|
||||
s = public_key.encrypt_message(c, magic=enc_magic)
|
||||
s = crypto.ecies_encrypt_message(public_key, c, magic=enc_magic)
|
||||
s = s.decode('utf8')
|
||||
return s
|
||||
|
||||
|
||||
@@ -2309,7 +2309,7 @@ class PartialTransaction(Transaction):
|
||||
merkle_root = txin.tap_merkle_root or bytes()
|
||||
output_privkey_bytes = taproot_tweak_seckey(privkey_bytes, merkle_root)
|
||||
output_privkey = ecc.ECPrivkey(output_privkey_bytes)
|
||||
msg_hash = ecc.bip340_tagged_hash(b"TapSighash", pre_hash)
|
||||
msg_hash = bitcoin.bip340_tagged_hash(b"TapSighash", pre_hash)
|
||||
sig = output_privkey.schnorr_sign(msg_hash)
|
||||
sighash = txin.sighash if txin.sighash is not None else Sighash.DEFAULT
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user