1
0
Files
electrum/electrum/plugins/jade/jade.py
SomberNight be159b5b95 bugfix: assert walrus (":=") combo side-eff. breaks w/ asserts disabled
```
$ python3 -O
Python 3.10.6 (main, Mar 10 2023, 10:55:28) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> assert (x := 2)
>>> x
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'x' is not defined
```

pity. it looked to be a neat and concise pattern.
2023-04-20 15:17:36 +00:00

476 lines
19 KiB
Python

import os
import base64
import json
from typing import Optional
from electrum import bip32, constants
from electrum.crypto import sha256
from electrum.i18n import _
from electrum.keystore import Hardware_KeyStore
from electrum.transaction import Transaction
from electrum.wallet import Multisig_Wallet
from electrum.util import UserFacingException
from electrum.base_wizard import ScriptTypeNotSupported
from electrum.logging import get_logger
from electrum.plugin import runs_in_hwd_thread, Device
from electrum.network import Network
from ..hw_wallet import HW_PluginBase, HardwareClientBase
from ..hw_wallet.plugin import OutdatedHwFirmwareException
_logger = get_logger(__name__)
#import logging
#LOGGING = logging.INFO
#if LOGGING:
# logger = logging.getLogger('jade')
# logger.setLevel(LOGGING)
# device_logger = logging.getLogger('jade-device')
# device_logger.setLevel(LOGGING)
try:
# Do imports
from .jadepy.jade import JadeAPI
from serial.tools import list_ports
except ImportError as e:
_logger.exception('error importing Jade plugin deps')
# Ignore -beta and -rc etc labels
def _versiontuple(v):
return tuple(map(int, (v.split('-')[0].split('.'))))
def _is_multisig(wallet):
return type(wallet) is Multisig_Wallet
# Ensure a multisig wallet is registered on Jade hw.
# Derives and returns the deterministic name for that multisig registration
def _register_multisig_wallet(wallet, keystore, address):
wallet_fingerprint_hash = sha256(wallet.get_fingerprint())
multisig_name = 'ele' + wallet_fingerprint_hash.hex()[:12]
# Collect all the signer data in case we need to register the
# multisig wallet on the Jade hw - NOTE: re-register is a no-op.
signers = []
for kstore in wallet.get_keystores():
fingerprint = kstore.get_root_fingerprint()
bip32_path_prefix = kstore.get_derivation_prefix()
derivation_path = bip32.convert_bip32_strpath_to_intpath(bip32_path_prefix)
# Jade only understands standard xtypes, so convert here
node = bip32.BIP32Node.from_xkey(kstore.xpub)
standard_xpub = node._replace(xtype='standard').to_xkey()
signers.append({'fingerprint': bytes.fromhex(fingerprint),
'derivation': derivation_path,
'xpub': standard_xpub,
'path': []})
# Check multisig is registered - re-registering is a no-op
# NOTE: electrum multisigs appear to always be sorted-multisig
txin_type = wallet.get_txin_type(address)
keystore.register_multisig(multisig_name, txin_type, True, wallet.m, signers)
# Return the name used to register the wallet
return multisig_name
# Helper to adapt Jade's http call/data to Network.send_http_on_proxy()
def _http_request(params):
# Use the first non-onion url
url = [url for url in params['urls'] if not url.endswith('.onion')][0]
method = params['method'].lower()
json_payload = params.get('data')
json_response = Network.send_http_on_proxy(method, url, json=json_payload)
return {'body': json.loads(json_response)}
class Jade_Client(HardwareClientBase):
@staticmethod
def _network() -> str:
return 'localtest' if constants.net.NET_NAME == 'regtest' else constants.net.NET_NAME
ADDRTYPES = {'standard': 'pkh(k)',
'p2pkh': 'pkh(k)',
'p2wpkh': 'wpkh(k)',
'p2wpkh-p2sh': 'sh(wpkh(k))'}
MULTI_ADDRTYPES = {'standard': 'sh(multi(k))',
'p2sh': 'sh(multi(k))',
'p2wsh': 'wsh(multi(k))',
'p2wsh-p2sh': 'sh(wsh(multi(k)))'}
@classmethod
def _convertAddrType(cls, addrType: str, multisig: bool) -> str:
return cls.MULTI_ADDRTYPES[addrType] if multisig else cls.ADDRTYPES[addrType]
def __init__(self, device: str, plugin: HW_PluginBase):
HardwareClientBase.__init__(self, plugin=plugin)
# Connect with a small timeout to test connection
self.jade = JadeAPI.create_serial(device, timeout=1)
self.jade.connect()
verinfo = self.jade.get_version_info()
self.fwversion = _versiontuple(verinfo['JADE_VERSION'])
self.efusemac = verinfo['EFUSEMAC']
self.jade.disconnect()
# Reconnect with a the default timeout for all subsequent calls
self.jade = JadeAPI.create_serial(device)
self.jade.connect()
# Push some host entropy into jade
self.jade.add_entropy(os.urandom(32))
@runs_in_hwd_thread
def authenticate(self):
# Ensure Jade unlocked - always call hw unit at least once
# If the hw is already unlocked, this call returns immediately/no-op
# NOTE: uses provided http/networking which respects any user proxy
authenticated = False
while not authenticated:
authenticated = self.jade.auth_user(self._network(), http_request_fn=_http_request)
def is_pairable(self):
return True
@runs_in_hwd_thread
def close(self):
self.jade.disconnect()
self.jade = None
@runs_in_hwd_thread
def is_initialized(self):
verinfo = self.jade.get_version_info()
return verinfo['JADE_STATE'] != 'UNINIT'
def label(self) -> Optional[str]:
return self.efusemac[-6:]
def get_soft_device_id(self):
return f'Jade {self.label()}'
def device_model_name(self):
return 'Blockstream Jade'
@runs_in_hwd_thread
def has_usable_connection_with_device(self):
if self.efusemac is None:
return False
try:
verinfo = self.jade.get_version_info()
return verinfo['EFUSEMAC'] == self.efusemac
except BaseException:
return False
@runs_in_hwd_thread
def get_xpub(self, bip32_path, xtype):
self.authenticate()
# Jade only provides traditional xpubs ...
path = bip32.convert_bip32_strpath_to_intpath(bip32_path)
xpub = self.jade.get_xpub(self._network(), path)
# ... so convert to relevant xtype locally
node = bip32.BIP32Node.from_xkey(xpub)
return node._replace(xtype=xtype).to_xkey()
@runs_in_hwd_thread
def sign_message(self, bip32_path_prefix, sequence, message):
self.authenticate()
path = bip32.convert_bip32_strpath_to_intpath(bip32_path_prefix)
path.extend(sequence)
if isinstance(message, bytes) or isinstance(message, bytearray):
message = message.decode('utf-8')
# Signature verification does not work with anti-exfil, so stick with default (rfc6979)
sig = self.jade.sign_message(path, message)
return base64.b64decode(sig)
@runs_in_hwd_thread
def sign_tx(self, txn_bytes, inputs, change):
self.authenticate()
# Add some host entropy for AE sigs (although we won't verify)
for input in inputs:
if input['path'] is not None:
input['ae_host_entropy'] = os.urandom(32)
input['ae_host_commitment'] = os.urandom(32)
# Map change script type
for output in change:
if output and output.get('variant') is not None:
output['variant'] = self._convertAddrType(output['variant'], False)
# Pass to Jade to generate signatures
sig_data = self.jade.sign_tx(self._network(), txn_bytes, inputs, change, use_ae_signatures=True)
# Extract signatures from returned data (sig[0] is the AE signer-commitment)
return [sig[1] for sig in sig_data]
@runs_in_hwd_thread
def show_address(self, bip32_path_prefix, sequence, txin_type):
self.authenticate()
path = bip32.convert_bip32_strpath_to_intpath(bip32_path_prefix)
path.extend(sequence)
script_variant = self._convertAddrType(txin_type, multisig=False)
address = self.jade.get_receive_address(self._network(), path, variant=script_variant)
return address
@runs_in_hwd_thread
def register_multisig(self, multisig_name, txin_type, sorted, threshold, signers):
self.authenticate()
variant = self._convertAddrType(txin_type, multisig=True)
return self.jade.register_multisig(self._network(), multisig_name, variant, sorted, threshold, signers)
@runs_in_hwd_thread
def show_address_multi(self, multisig_name, paths):
self.authenticate()
return self.jade.get_receive_address(self._network(), paths, multisig_name=multisig_name)
class Jade_KeyStore(Hardware_KeyStore):
hw_type = 'jade'
device = 'Jade'
plugin: 'JadePlugin'
def decrypt_message(self, sequence, message, password):
raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device))
@runs_in_hwd_thread
def sign_message(self, sequence, message, password, *, script_type=None):
self.handler.show_message(_("Please confirm signing the message with your Jade device..."))
try:
client = self.get_client()
bip32_path_prefix = self.get_derivation_prefix()
return client.sign_message(bip32_path_prefix, sequence, message)
finally:
self.handler.finished()
@runs_in_hwd_thread
def sign_transaction(self, tx, password):
if tx.is_complete():
return
self.handler.show_message(_("Preparing to sign transaction ..."))
try:
wallet = self.handler.get_wallet()
is_multisig = _is_multisig(wallet)
# Fetch inputs of the transaction to sign
jade_inputs = []
for txin in tx.inputs():
pubkey, path = self.find_my_pubkey_in_txinout(txin)
witness_input = txin.is_segwit()
redeem_script = Transaction.get_preimage_script(txin)
redeem_script = bytes.fromhex(redeem_script) if redeem_script is not None else None
input_tx = txin.utxo
input_tx = bytes.fromhex(input_tx.serialize()) if input_tx is not None else None
# Build the input and add to the list - include some host entropy for AE sigs (although we won't verify)
jade_inputs.append({'is_witness': witness_input,
'input_tx': input_tx,
'script': redeem_script,
'path': path})
# Change detection
change = [None] * len(tx.outputs())
for index, txout in enumerate(tx.outputs()):
if txout.is_mine and txout.is_change:
desc = txout.script_descriptor
assert desc
if is_multisig:
# Multisig - wallet details must be registered on Jade hw
multisig_name = _register_multisig_wallet(wallet, self, txout.address)
# Jade only needs the path suffix(es) and the multisig registration
# name to generate the address, as the fixed derivation part is
# embedded in the multisig wallet registration record
# NOTE: all cosigners have same path suffix
path_suffix = wallet.get_address_index(txout.address)
paths = [path_suffix] * wallet.n
change[index] = {'multisig_name': multisig_name, 'paths': paths}
else:
# Pass entire path
pubkey, path = self.find_my_pubkey_in_txinout(txout)
change[index] = {'path':path, 'variant': desc.to_legacy_electrum_script_type()}
# The txn itself
txn_bytes = bytes.fromhex(tx.serialize_to_network())
# Request Jade generate the signatures for our inputs.
# Change details are passed to be validated on the hw (user does not confirm)
self.handler.show_message(_("Please confirm the transaction details on your Jade device..."))
client = self.get_client()
signatures = client.sign_tx(txn_bytes, jade_inputs, change)
assert len(signatures) == len(tx.inputs())
# Inject signatures into tx
for index, (txin, signature) in enumerate(zip(tx.inputs(), signatures)):
pubkey, path = self.find_my_pubkey_in_txinout(txin)
if pubkey is not None and signature is not None:
tx.add_signature_to_txin(txin_idx=index,
signing_pubkey=pubkey.hex(),
sig=signature.hex())
finally:
self.handler.finished()
@runs_in_hwd_thread
def show_address(self, sequence, txin_type):
self.handler.show_message(_("Showing address ..."))
try:
client = self.get_client()
bip32_path_prefix = self.get_derivation_prefix()
return client.show_address(bip32_path_prefix, sequence, txin_type)
finally:
self.handler.finished()
@runs_in_hwd_thread
def register_multisig(self, name, txin_type, sorted, threshold, signers):
self.handler.show_message(_("Please confirm the multisig wallet details on your Jade device..."))
try:
client = self.get_client()
return client.register_multisig(name, txin_type, sorted, threshold, signers)
finally:
self.handler.finished()
@runs_in_hwd_thread
def show_address_multi(self, multisig_name, paths):
self.handler.show_message(_("Showing address ..."))
try:
client = self.get_client()
return client.show_address_multi(multisig_name, paths)
finally:
self.handler.finished()
class JadePlugin(HW_PluginBase):
keystore_class = Jade_KeyStore
minimum_library = (0, 0, 1)
DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4), (0x0403, 0x6001)]
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
MIN_SUPPORTED_FW_VERSION = (0, 1, 32)
# For testing with qemu simulator (experimental)
SIMULATOR_PATH = None # 'tcp:127.0.0.1:2222'
SIMULATOR_TEST_SEED = None # bytes.fromhex('b90e532426d0dc20fffe01037048c018e940300038b165c211915c672e07762c')
def enumerate_serial(self):
# Jade is not really an HID device, it shows as a serial/com port device.
# Scan com ports looking for the relevant vid and pid, and use 'path' to
# hold the path to the serial port device, eg. /dev/ttyUSB0
devices = []
for devinfo in list_ports.comports():
device_product_key = (devinfo.vid, devinfo.pid)
if device_product_key in self.DEVICE_IDS:
device = Device(path=devinfo.device,
interface_number=-1,
id_=devinfo.serial_number,
product_key=device_product_key,
usage_page=-1,
transport_ui_string=devinfo.device)
devices.append(device)
# Maybe look for Jade Qemu simulator if the vars are set (experimental)
if self.SIMULATOR_PATH is not None and self.SIMULATOR_TEST_SEED is not None:
try:
# If we can connect to a simulator and poke a seed in, add that too
client = Jade_Client(self.SIMULATOR_PATH, plugin=self)
device = Device(path=self.SIMULATOR_PATH,
interface_number=-1,
id_='Jade Qemu Simulator',
product_key=self.DEVICE_IDS[0],
usage_page=-1,
transport_ui_string='simulator')
if client.jade.set_seed(self.SIMULATOR_TEST_SEED):
devices.append(device)
client.close()
except Exception as e:
# If we get any sort of error do not add the simulator
_logger.debug("Failed to connect to Jade simulator at {}".format(self.SIMULATOR_PATH))
_logger.debug(e)
return devices
def __init__(self, parent, config, name):
HW_PluginBase.__init__(self, parent, config, name)
self.libraries_available = self.check_libraries_available()
if not self.libraries_available:
return
# Register our own serial/com port scanning function
self.device_manager().register_enumerate_func(self.enumerate_serial)
def get_library_version(self):
try:
from . import jadepy
version = jadepy.__version__
except ImportError:
raise
except:
version = "unknown"
return version
@runs_in_hwd_thread
def create_client(self, device, handler):
client = Jade_Client(device.path, plugin=self)
# Check minimum supported firmware version
if self.MIN_SUPPORTED_FW_VERSION > client.fwversion:
msg = (_('Outdated {} firmware for device labelled {}. Please '
'update using a Blockstream Green companion app')
.format(self.device, client.label()))
self.logger.info(msg)
if handler:
handler.show_error(msg)
raise OutdatedHwFirmwareException(msg)
return client
def setup_device(self, device_info, wizard, purpose):
device_id = device_info.device.id_
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
# Call authenticate on hww to ensure unlocked and suitable for network
# May involve user entering PIN on (or even setting up!) hardware device
wizard.run_task_without_blocking_gui(task=lambda: client.authenticate())
return client
def get_xpub(self, device_id, derivation, xtype, wizard):
if xtype not in self.SUPPORTED_XTYPES:
raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
xpub = client.get_xpub(derivation, xtype)
return xpub
def show_address(self, wallet, address, keystore=None):
if keystore is None:
keystore = wallet.get_keystore()
if not self.show_address_helper(wallet, address, keystore):
return
path_suffix = wallet.get_address_index(address)
if _is_multisig(wallet):
# Multisig - wallet details must be registered on Jade hw
multisig_name = _register_multisig_wallet(wallet, keystore, address)
# Jade only needs the path suffix(es) and the multisig registration
# name to generate the address, as the fixed derivation part is
# embedded in the multisig wallet registration record
# NOTE: all cosigners have same path suffix
paths = [path_suffix] * wallet.n
hw_address = keystore.show_address_multi(multisig_name, paths)
else:
# Single-sig/standard
txin_type = wallet.get_txin_type(address)
hw_address = keystore.show_address(path_suffix, txin_type)
if hw_address != address:
keystore.handler.show_error(_('The address generated by {} does not match!').format(self.device))