1
0
Files
electrum/electrum/plugins/ledger/ledger.py
SomberNight 3e4601c61d base64.b64decode: always set validate=True
Notably verifymessage and decrypt(message) were silently ignoring trailing garbage
or inserted non-base64 characters present in signatures/ciphertext.
(both the CLI commands and in the GUI)
I think it is much cleaner and preferable to treat such signatures/ciphertext as invalid.

In fact I find it surprising that base64.b64decode(validate=False) is the default.
Perhaps we should create a helper function for it that set validate=True and use that.
2025-06-03 18:58:05 +00:00

1366 lines
54 KiB
Python

# Some parts of this code are adapted from bitcoin-core/HWI:
# https://github.com/bitcoin-core/HWI/blob/e731395bde13362950e9f13e01689c475545e4dc/hwilib/devices/ledger.py
from abc import ABC, abstractmethod
import base64
import hashlib
from typing import Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
import electrum_ecc as ecc
from electrum import bip32, constants
from electrum import descriptor
from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath, normalize_bip32_derivation
from electrum.bitcoin import EncodeBase58Check, is_b58_address, is_segwit_script_type, var_int
from electrum.crypto import hash_160
from electrum.i18n import _
from electrum.keystore import Hardware_KeyStore
from electrum.logging import get_logger
from electrum.plugin import Device, runs_in_hwd_thread
from electrum.transaction import PartialTransaction, Transaction, PartialTxInput
from electrum.util import bfh, UserFacingException, versiontuple
from electrum.wallet import Standard_Wallet
from electrum.hw_wallet import HardwareClientBase, HW_PluginBase
from electrum.hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable
from electrum.hw_wallet.plugin import HardwareClientDummy
if TYPE_CHECKING:
from electrum.plugin import DeviceInfo
from electrum.wizard import NewWalletWizard
_logger = get_logger(__name__)
try:
import ledger_bitcoin
from ledger_bitcoin import WalletPolicy, MultisigWallet, AddressType, Chain
from ledger_bitcoin.exception.errors import DenyError, NotSupportedError, SecurityStatusNotSatisfiedError
from ledger_bitcoin.key import KeyOriginInfo
from ledgercomm.interfaces.hid_device import HID
# legacy imports
import hid
from ledger_bitcoin.btchip.btchipComm import HIDDongleHIDAPI
from ledger_bitcoin.btchip.btchip import btchip
from ledger_bitcoin.btchip.btchipUtils import compress_public_key
from ledger_bitcoin.btchip.bitcoinTransaction import bitcoinTransaction
from ledger_bitcoin.btchip.btchipException import BTChipException
LEDGER_BITCOIN = True
except ImportError as e:
if not (isinstance(e, ModuleNotFoundError) and e.name == 'ledger_bitcoin'):
_logger.exception('error importing ledger plugin deps')
LEDGER_BITCOIN = False
MSG_NEEDS_FW_UPDATE_GENERIC = _('Firmware version too old. Please update at') + \
' https://www.ledger.com'
MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for Segwit support. Please update at') + \
' https://www.ledger.com'
MULTI_OUTPUT_SUPPORT = '1.1.4'
SEGWIT_SUPPORT = '1.1.10'
SEGWIT_SUPPORT_SPECIAL = '1.0.4'
SEGWIT_TRUSTEDINPUTS = '1.4.0'
def is_policy_standard(wp: 'WalletPolicy', fpr: bytes, exp_coin_type: int) -> bool:
"""Returns True if the wallet policy can be used without registration."""
if wp.name != "" or wp.n_keys != 1:
return False
key_info = wp.keys_info[0]
if key_info[0] != '[':
# no key origin info
return False
try:
key_orig_end = key_info.index(']')
except ValueError:
# invalid key_info
return False
key_fpr, key_path = key_info[1:key_orig_end].split('/', maxsplit=1)
if key_fpr != fpr.hex():
# not an internal key
return False
key_path_parts = key_path.split('/')
# Account key should be exactly 3 hardened derivation steps
if len(key_path_parts) != 3 or any(part[-1] != "'" for part in key_path_parts):
return False
purpose, coin_type, account_index = key_path_parts
if coin_type != f"{exp_coin_type}'" or int(account_index[:-1]) > 100:
return False
if wp.descriptor_template == "pkh(@0/**)":
# BIP-44
return purpose == "44'"
elif wp.descriptor_template == "sh(wpkh(@0/**))":
# BIP-49, nested SegWit
return purpose == "49'"
elif wp.descriptor_template == "wpkh(@0/**)":
# BIP-84, native SegWit
return purpose == "84'"
elif wp.descriptor_template == "tr(@0/**)":
# BIP-86, taproot single key
return purpose == "86'"
else:
# unknown
return False
def convert_xpub(xpub: str, xtype='standard') -> str:
bip32node = BIP32Node.from_xkey(xpub)
return BIP32Node(
xtype=xtype,
eckey=bip32node.eckey,
chaincode=bip32node.chaincode,
depth=bip32node.depth,
fingerprint=bip32node.fingerprint,
child_number=bip32node.child_number).to_xpub()
def test_pin_unlocked(func):
"""Function decorator to test the Ledger for being unlocked, and if not,
raise a human-readable exception.
"""
def catch_exception(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except SecurityStatusNotSatisfiedError:
raise UserFacingException(_('Your Ledger is locked. Please unlock it.'))
return catch_exception
# from HWI
def is_witness(script: bytes) -> Tuple[bool, int, bytes]:
"""
Determine whether a script is a segwit output script.
If so, also returns the witness version and witness program.
:param script: The script
:returns: A tuple of a bool indicating whether the script is a segwit output script,
an int representing the witness version,
and the bytes of the witness program.
"""
if len(script) < 4 or len(script) > 42:
return (False, 0, b"")
if script[0] != 0 and (script[0] < 81 or script[0] > 96):
return (False, 0, b"")
if script[1] + 2 == len(script):
return (True, script[0] - 0x50 if script[0] else 0, script[2:])
return (False, 0, b"")
# from HWI
# Only handles up to 15 of 15. Returns None if this script is not a
# multisig script. Returns (m, pubkeys) otherwise.
def parse_multisig(script: bytes) -> Optional[Tuple[int, Sequence[bytes]]]:
"""
Determine whether a script is a multisig script. If so, determine the parameters of that multisig.
:param script: The script
:returns: ``None`` if the script is not multisig.
If multisig, returns a tuple of the number of signers required,
and a sequence of public key bytes.
"""
# Get m
m = script[0] - 80
if m < 1 or m > 15:
return None
# Get pubkeys
pubkeys = []
offset = 1
while True:
pubkey_len = script[offset]
if pubkey_len != 33:
break
offset += 1
pubkeys.append(script[offset:offset + 33])
offset += 33
# Check things at the end
n = script[offset] - 80
if n != len(pubkeys):
return None
offset += 1
op_cms = script[offset]
if op_cms != 174:
return None
return (m, pubkeys)
HARDENED_FLAG = 1 << 31
def H_(x: int) -> int:
"""
Shortcut function that "hardens" a number in a BIP44 path.
"""
return x | HARDENED_FLAG
def is_hardened(i: int) -> bool:
"""
Returns whether an index is hardened
"""
return i & HARDENED_FLAG != 0
def get_bip44_purpose(addrtype: 'AddressType') -> int:
"""
Determine the BIP 44 purpose based on the given :class:`~hwilib.common.AddressType`.
:param addrtype: The address type
"""
if addrtype == AddressType.LEGACY:
return 44
elif addrtype == AddressType.SH_WIT:
return 49
elif addrtype == AddressType.WIT:
return 84
elif addrtype == AddressType.TAP:
return 86
else:
raise ValueError("Unknown address type")
def get_bip44_chain(chain: 'Chain') -> int:
"""
Determine the BIP 44 coin type based on the Bitcoin chain type.
For the Bitcoin mainnet chain, this returns 0. For the other chains, this returns 1.
:param chain: The chain
"""
if chain == Chain.MAIN:
return 0
else:
return 1
def get_addrtype_from_bip44_purpose(index: int) -> Optional['AddressType']:
purpose = index & ~HARDENED_FLAG
if purpose == 44:
return AddressType.LEGACY
elif purpose == 49:
return AddressType.SH_WIT
elif purpose == 84:
return AddressType.WIT
elif purpose == 86:
return AddressType.TAP
else:
return None
def is_standard_path(
path: Sequence[int],
addrtype: 'AddressType',
chain: 'Chain',
) -> bool:
if len(path) != 5:
return False
if not is_hardened(path[0]) or not is_hardened(path[1]) or not is_hardened(path[2]):
return False
if is_hardened(path[3]) or is_hardened(path[4]):
return False
computed_addrtype = get_addrtype_from_bip44_purpose(path[0])
if computed_addrtype is None:
return False
if computed_addrtype != addrtype:
return False
if path[1] != H_(get_bip44_chain(chain)):
return False
if path[3] not in [0, 1]:
return False
return True
def get_chain() -> 'Chain':
if constants.net.NET_NAME == "mainnet":
return Chain.MAIN
elif constants.net.NET_NAME == "testnet":
return Chain.TEST
elif constants.net.NET_NAME == "signet":
return Chain.SIGNET
elif constants.net.NET_NAME == "regtest":
return Chain.REGTEST
else:
raise ValueError("Unsupported network")
class Ledger_Client(HardwareClientBase, ABC):
is_legacy: bool
@staticmethod
def construct_new(
*args, device: Device, plugin: 'LedgerPlugin', **kwargs,
) -> Union['Ledger_Client', HardwareClientDummy]:
"""The 'real' constructor, that automatically decides which subclass to use."""
if LedgerPlugin.is_hw1(device.product_key):
return HardwareClientDummy(
plugin=plugin,
error_text="ledger hw.1 devices are no longer supported",
)
# for nano S or newer hw, decide which client impl to use based on software/firmware version:
hid_device = HID()
hid_device.path = device.path
hid_device.open()
transport = ledger_bitcoin.TransportClient('hid', hid=hid_device)
try:
cl = ledger_bitcoin.createClient(transport, chain=get_chain())
except (ledger_bitcoin.exception.errors.InsNotSupportedError,
ledger_bitcoin.exception.errors.ClaNotSupportedError) as e:
# This can happen on very old versions.
# E.g. with a "nano s", with bitcoin app 1.1.10, SE 1.3.1, MCU 1.0,
# - on machine one, ghost43 got InsNotSupportedError
# - on machine two, thomasv got ClaNotSupportedError
# unclear why the different exceptions, ledger_bitcoin version 0.2.1 in both cases
_logger.info(f"ledger_bitcoin.createClient() got exc: {e}. falling back to old plugin.")
cl = None
if isinstance(cl, ledger_bitcoin.client.NewClient):
_logger.debug(f"Ledger_Client.construct_new(). creating NewClient for {device=}.")
return Ledger_Client_New(hid_device, *args, plugin=plugin, **kwargs)
else:
_logger.debug(f"Ledger_Client.construct_new(). creating LegacyClient for {device=}.")
return Ledger_Client_Legacy(hid_device, *args, plugin=plugin, **kwargs)
def __init__(self, *, plugin: HW_PluginBase):
HardwareClientBase.__init__(self, plugin=plugin)
def get_master_fingerprint(self) -> bytes:
return self.request_root_fingerprint_from_device()
@abstractmethod
def show_address(self, address_path: str, txin_type: str):
pass
@abstractmethod
def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str):
pass
@abstractmethod
def sign_message(
self,
address_path: str,
message: str,
password,
*,
script_type: Optional[str] = None,
) -> bytes:
pass
class Ledger_Client_Legacy(Ledger_Client):
"""Client based on the bitchip library, targeting versions 2.0.* and below."""
is_legacy = True
def __init__(self, hidDevice: 'HID', *, product_key: Tuple[int, int],
plugin: HW_PluginBase):
Ledger_Client.__init__(self, plugin=plugin)
# Hack, we close the old object and instantiate a new one
hidDevice.close()
dev = hid.device()
dev.open_path(hidDevice.path)
dev.set_nonblocking(True)
self.dongleObject = btchip(HIDDongleHIDAPI(dev, True, False))
self.signing = False
self._product_key = product_key
self._soft_device_id = None
def is_pairable(self):
return True
def set_and_unset_signing(func):
"""Function decorator to set and unset self.signing."""
def wrapper(self, *args, **kwargs):
try:
self.signing = True
return func(self, *args, **kwargs)
finally:
self.signing = False
return wrapper
def give_error(self, message):
_logger.info(message)
if not self.signing:
self.handler.show_error(message)
else:
self.signing = False
raise UserFacingException(message)
@runs_in_hwd_thread
def close(self):
self.dongleObject.dongle.close()
def is_initialized(self):
return True
@runs_in_hwd_thread
def get_soft_device_id(self):
if self._soft_device_id is None:
# modern ledger can provide xpub without user interaction
# (hw1 would prompt for PIN)
if not self.is_hw1():
self._soft_device_id = self.request_root_fingerprint_from_device()
return self._soft_device_id
def is_hw1(self) -> bool:
return LedgerPlugin.is_hw1(self._product_key)
def device_model_name(self):
return LedgerPlugin.device_name_from_product_key(self._product_key)
@runs_in_hwd_thread
def has_usable_connection_with_device(self):
try:
self.dongleObject.getFirmwareVersion()
except BaseException:
return False
return True
@runs_in_hwd_thread
@test_pin_unlocked
def get_xpub(self, bip32_path, xtype):
self.checkDevice()
# bip32_path is of the form 44'/0'/1'
# S-L-O-W - we don't handle the fingerprint directly, so compute
# it manually from the previous node
# This only happens once so it's bearable
# self.get_client() # prompt for the PIN before displaying the dialog if necessary
# self.handler.show_message("Computing master public key")
if xtype in ['p2wpkh', 'p2wsh'] and not self.supports_native_segwit():
raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT)
if xtype in ['p2wpkh-p2sh', 'p2wsh-p2sh'] and not self.supports_segwit():
raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT)
bip32_path = bip32.normalize_bip32_derivation(bip32_path, hardened_char="'")
bip32_intpath = bip32.convert_bip32_strpath_to_intpath(bip32_path)
bip32_path = bip32_path[2:] # cut off "m/"
if len(bip32_intpath) >= 1:
prevPath = bip32.convert_bip32_intpath_to_strpath(bip32_intpath[:-1])[2:]
nodeData = self.dongleObject.getWalletPublicKey(prevPath)
publicKey = compress_public_key(nodeData['publicKey'])
fingerprint_bytes = hash_160(publicKey)[0:4]
childnum_bytes = bip32_intpath[-1].to_bytes(length=4, byteorder="big")
else:
fingerprint_bytes = bytes(4)
childnum_bytes = bytes(4)
nodeData = self.dongleObject.getWalletPublicKey(bip32_path)
publicKey = compress_public_key(nodeData['publicKey'])
depth = len(bip32_intpath)
return BIP32Node(xtype=xtype,
eckey=ecc.ECPubkey(bytes(publicKey)),
chaincode=nodeData['chainCode'],
depth=depth,
fingerprint=fingerprint_bytes,
child_number=childnum_bytes).to_xpub()
def has_detached_pin_support(self, client: 'btchip'):
try:
client.getVerifyPinRemainingAttempts()
return True
except BTChipException as e:
if e.sw == 0x6d00:
return False
raise e
def is_pin_validated(self, client: 'btchip'):
try:
# Invalid SET OPERATION MODE to verify the PIN status
client.dongle.exchange(bytearray([0xe0, 0x26, 0x00, 0x00, 0x01, 0xAB]))
except BTChipException as e:
if (e.sw == 0x6982):
return False
if (e.sw == 0x6A80):
return True
raise e
def supports_multi_output(self):
return self.multiOutputSupported
def supports_segwit(self):
return self.segwitSupported
def supports_native_segwit(self):
return self.nativeSegwitSupported
def supports_segwit_trustedInputs(self):
return self.segwitTrustedInputs
@runs_in_hwd_thread
def checkDevice(self):
firmwareInfo = self.dongleObject.getFirmwareVersion()
firmware = firmwareInfo['version']
self.multiOutputSupported = versiontuple(firmware) >= versiontuple(MULTI_OUTPUT_SUPPORT)
self.nativeSegwitSupported = versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT)
self.segwitSupported = self.nativeSegwitSupported or (firmwareInfo['specialVersion'] == 0x20 and versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT_SPECIAL))
self.segwitTrustedInputs = versiontuple(firmware) >= versiontuple(SEGWIT_TRUSTEDINPUTS)
def password_dialog(self, msg=None):
response = self.handler.get_word(msg)
if response is None:
return False, None, None
return True, response, response
@runs_in_hwd_thread
@test_pin_unlocked
@set_and_unset_signing
def show_address(self, address_path: str, txin_type: str):
self.handler.show_message(_("Showing address ..."))
segwit = is_segwit_script_type(txin_type)
segwitNative = txin_type == 'p2wpkh'
try:
self.dongleObject.getWalletPublicKey(address_path, showOnScreen=True, segwit=segwit, segwitNative=segwitNative)
except BTChipException as e:
if e.sw == 0x6985: # cancelled by user
pass
elif e.sw == 0x6982:
raise # pin lock. decorator will catch it
elif e.sw == 0x6b00: # hw.1 raises this
self.handler.show_error('{}\n{}\n{}'.format(
_('Error showing address') + ':',
e,
_('Your device might not have support for this functionality.')))
else:
_logger.exception('')
self.handler.show_error(e)
except BaseException as e:
_logger.exception('')
self.handler.show_error(e)
finally:
self.handler.finished()
@runs_in_hwd_thread
@test_pin_unlocked
@set_and_unset_signing
def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str):
if tx.is_complete():
return
inputs = []
inputsPaths = []
chipInputs = []
redeemScripts = []
changePath = ""
p2shTransaction = False
segwitTransaction = False
pin = ""
# prompt for the PIN before displaying the dialog if necessary
def is_txin_legacy_multisig(txin: PartialTxInput) -> bool:
desc = txin.script_descriptor
return (isinstance(desc, descriptor.SHDescriptor)
and isinstance(desc.subdescriptors[0], descriptor.MultisigDescriptor))
# Fetch inputs of the transaction to sign
for txin in tx.inputs():
if txin.is_coinbase_input():
self.give_error("Coinbase not supported") # should never happen
if is_txin_legacy_multisig(txin):
p2shTransaction = True
if txin.is_p2sh_segwit():
if not self.supports_segwit():
self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT)
segwitTransaction = True
if txin.is_native_segwit():
if not self.supports_native_segwit():
self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT)
segwitTransaction = True
my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin)
if not full_path:
self.give_error("No matching pubkey for sign_transaction") # should never happen
full_path = convert_bip32_intpath_to_strpath(full_path)[2:]
redeemScript = txin.get_scriptcode_for_sighash().hex()
txin_prev_tx = txin.utxo
if txin_prev_tx is None and not txin.is_segwit():
raise UserFacingException(_('Missing previous tx for legacy input.'))
txin_prev_tx_raw = txin_prev_tx.serialize() if txin_prev_tx else None
inputs.append([txin_prev_tx_raw,
txin.prevout.out_idx,
redeemScript,
txin.prevout.txid.hex(),
my_pubkey,
txin.nsequence,
txin.value_sats()])
inputsPaths.append(full_path)
# Sanity check
if p2shTransaction:
for txin in tx.inputs():
if not is_txin_legacy_multisig(txin):
self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen
if not self.supports_multi_output():
if len(tx.outputs()) > 2:
self.give_error("Transaction with more than 2 outputs not supported")
for txout in tx.outputs():
if not txout.address:
# note: max_size based on https://github.com/LedgerHQ/ledger-app-btc/commit/3a78dee9c0484821df58975803e40d58fbfc2c38#diff-c61ccd96a6d8b54d48f54a3bc4dfa7e2R26
validate_op_return_output(txout, max_size=190)
# Output "change" detection
# - at most one output can bypass confirmation (~change)
if not p2shTransaction:
has_change = False
any_output_on_change_branch = is_any_tx_output_on_change_branch(tx)
for txout in tx.outputs():
if txout.is_mine and len(tx.outputs()) > 1 \
and not has_change:
# prioritise hiding outputs on the 'change' branch from user
# because no more than one change address allowed
if txout.is_change == any_output_on_change_branch:
my_pubkey, changePath = keystore.find_my_pubkey_in_txinout(txout)
assert changePath
changePath = convert_bip32_intpath_to_strpath(changePath)[2:]
has_change = True
try:
# Get trusted inputs from the original transactions
for input_idx, utxo in enumerate(inputs):
self.handler.show_message(_("Preparing transaction inputs...") + f" (phase1, {input_idx}/{len(inputs)})")
sequence = int.to_bytes(utxo[5], length=4, byteorder="little", signed=False).hex()
if segwitTransaction and not self.supports_segwit_trustedInputs():
tmp = bfh(utxo[3])[::-1]
tmp += int.to_bytes(utxo[1], length=4, byteorder="little", signed=False)
tmp += int.to_bytes(utxo[6], length=8, byteorder="little", signed=False) # txin['value']
chipInputs.append({'value': tmp, 'witness': True, 'sequence': sequence})
redeemScripts.append(bfh(utxo[2]))
elif (not p2shTransaction) or self.supports_multi_output():
txtmp = bitcoinTransaction(bfh(utxo[0]))
trustedInput = self.dongleObject.getTrustedInput(txtmp, utxo[1])
trustedInput['sequence'] = sequence
if segwitTransaction:
trustedInput['witness'] = True
chipInputs.append(trustedInput)
if p2shTransaction or segwitTransaction:
redeemScripts.append(bfh(utxo[2]))
else:
redeemScripts.append(txtmp.outputs[utxo[1]].script)
else:
tmp = bfh(utxo[3])[::-1]
tmp += int.to_bytes(utxo[1], length=4, byteorder="little", signed=False)
chipInputs.append({'value': tmp, 'sequence': sequence})
redeemScripts.append(bfh(utxo[2]))
self.handler.show_message(_("Confirm Transaction on your Ledger device..."))
# Sign all inputs
firstTransaction = True
inputIndex = 0
rawTx = tx.serialize_to_network(include_sigs=False)
if segwitTransaction:
self.dongleObject.startUntrustedTransaction(True, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version)
# we don't set meaningful outputAddress, amount and fees
# as we only care about the alternateEncoding==True branch
outputData = self.dongleObject.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
while inputIndex < len(inputs):
self.handler.show_message(_("Signing transaction...") + f" (phase2, {inputIndex}/{len(inputs)})")
singleInput = [chipInputs[inputIndex]]
self.dongleObject.startUntrustedTransaction(False, 0,
singleInput, redeemScripts[inputIndex], version=tx.version)
inputSignature = self.dongleObject.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
inputSignature[0] = 0x30 # force for 1.4.9+
my_pubkey = inputs[inputIndex][4]
tx.add_signature_to_txin(txin_idx=inputIndex,
signing_pubkey=my_pubkey,
sig=inputSignature)
inputIndex = inputIndex + 1
else:
while inputIndex < len(inputs):
self.handler.show_message(_("Signing transaction...") + f" (phase2, {inputIndex}/{len(inputs)})")
self.dongleObject.startUntrustedTransaction(firstTransaction, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version)
# we don't set meaningful outputAddress, amount and fees
# as we only care about the alternateEncoding==True branch
outputData = self.dongleObject.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
# Sign input with the provided PIN
inputSignature = self.dongleObject.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
inputSignature[0] = 0x30 # force for 1.4.9+
my_pubkey = inputs[inputIndex][4]
tx.add_signature_to_txin(txin_idx=inputIndex,
signing_pubkey=my_pubkey,
sig=inputSignature)
inputIndex = inputIndex + 1
firstTransaction = False
except UserWarning:
self.handler.show_error(_('Cancelled by user'))
return
except BTChipException as e:
if e.sw in (0x6985, 0x6d00): # cancelled by user
return
elif e.sw == 0x6982:
raise # pin lock. decorator will catch it
else:
_logger.exception('')
self.give_error(e)
except BaseException as e:
_logger.exception('')
self.give_error(e)
finally:
self.handler.finished()
@runs_in_hwd_thread
@test_pin_unlocked
@set_and_unset_signing
def sign_message(
self,
address_path: str,
message: str,
password,
*,
script_type: Optional[str] = None,
) -> bytes:
message = message.encode('utf8')
message_hash = hashlib.sha256(message).hexdigest().upper()
self.handler.show_message("Signing message ...\r\nMessage hash: " + message_hash)
try:
info = self.dongleObject.signMessagePrepare(address_path, message)
pin = ""
signature = self.dongleObject.signMessageSign(pin)
except BTChipException as e:
if e.sw == 0x6a80:
self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. "
"Only alphanumerical messages shorter than 140 characters are supported. "
"Please remove any extra characters (tab, carriage return) and retry.")
elif e.sw == 0x6985: # cancelled by user
return b''
elif e.sw == 0x6982:
raise # pin lock. decorator will catch it
else:
self.give_error(e)
except UserWarning:
self.handler.show_error(_('Cancelled by user'))
return b''
except Exception as e:
self.give_error(e)
finally:
self.handler.finished()
# Parse the ASN.1 signature
rLength = signature[3]
r = signature[4: 4 + rLength]
sLength = signature[4 + rLength + 1]
s = signature[4 + rLength + 2:]
if rLength == 33:
r = r[1:]
if sLength == 33:
s = s[1:]
# And convert it
# Pad r and s points with 0x00 bytes when the point is small to get valid signature.
r_padded = bytes([0x00]) * (32 - len(r)) + r
s_padded = bytes([0x00]) * (32 - len(s)) + s
return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded
class Ledger_Client_New(Ledger_Client):
"""Client based on the ledger_bitcoin library, targeting versions 2.1.* and above."""
is_legacy = False
def __init__(self, hidDevice: 'HID', *, product_key: Tuple[int, int],
plugin: HW_PluginBase):
Ledger_Client.__init__(self, plugin=plugin)
transport = ledger_bitcoin.TransportClient('hid', hid=hidDevice)
self.client = ledger_bitcoin.client.NewClient(transport, get_chain())
self._product_key = product_key
self._soft_device_id = None
self.master_fingerprint = None
self._known_xpubs: Dict[str, str] = {} # path ==> xpub
self._registered_policies: Dict[bytes, bytes] = {} # wallet id => wallet hmac
def is_pairable(self):
return True
@runs_in_hwd_thread
def close(self):
self.client.stop()
def is_initialized(self):
return True
@runs_in_hwd_thread
def get_soft_device_id(self):
if self._soft_device_id is None:
self._soft_device_id = self.request_root_fingerprint_from_device()
return self._soft_device_id
def device_model_name(self):
return LedgerPlugin.device_name_from_product_key(self._product_key)
@runs_in_hwd_thread
def has_usable_connection_with_device(self):
try:
self.client.get_version()
except BaseException:
return False
return True
@runs_in_hwd_thread
@test_pin_unlocked
def get_xpub(self, bip32_path: str, xtype):
# try silently first; if not a standard path, repeat with on-screen display
bip32_path = normalize_bip32_derivation(bip32_path, hardened_char="'")
# cache known path/xpubs combinations in order to avoid requesting them many times
if bip32_path in self._known_xpubs:
xpub = self._known_xpubs[bip32_path]
else:
try:
xpub = self.client.get_extended_pubkey(bip32_path)
except NotSupportedError:
xpub = self.client.get_extended_pubkey(bip32_path, True)
self._known_xpubs[bip32_path] = xpub
# Ledger always returns 'standard' xpubs; convert to the right xtype
return convert_xpub(xpub, xtype)
@runs_in_hwd_thread
def request_root_fingerprint_from_device(self) -> str:
return self.client.get_master_fingerprint().hex()
@runs_in_hwd_thread
@test_pin_unlocked
def get_master_fingerprint(self) -> bytes:
if self.master_fingerprint is None:
self.master_fingerprint = self.client.get_master_fingerprint()
return self.master_fingerprint
@runs_in_hwd_thread
@test_pin_unlocked
def get_singlesig_default_wallet_policy(self, addr_type: 'AddressType', account: int) -> 'WalletPolicy':
assert account >= HARDENED_FLAG
if addr_type == AddressType.LEGACY:
template = "pkh(@0/**)"
elif addr_type == AddressType.WIT:
template = "wpkh(@0/**)"
elif addr_type == AddressType.SH_WIT:
template = "sh(wpkh(@0/**))"
elif addr_type == AddressType.TAP:
template = "tr(@0/**)"
else:
raise ValueError("Unknown address type")
fpr = self.get_master_fingerprint()
key_origin_steps = f"{get_bip44_purpose(addr_type)}'/{get_bip44_chain(self.client.chain)}'/{account & ~HARDENED_FLAG}'"
xpub = self.get_xpub(f"m/{key_origin_steps}", 'standard')
key_str = f"[{fpr.hex()}/{key_origin_steps}]{xpub}"
# Make the Wallet object
return WalletPolicy(name="", descriptor_template=template, keys_info=[key_str])
@runs_in_hwd_thread
@test_pin_unlocked
def get_singlesig_policy_for_path(self, path: str, xtype: str, master_fp: bytes) -> Optional['WalletPolicy']:
path = path.replace("h", "'")
path_parts = path.split("/")
if not 5 <= len(path_parts) <= 6:
raise UserFacingException(_('Unsupported derivation path: {}').format(path))
path_root = "/".join(path_parts[:-2])
fpr = self.get_master_fingerprint()
# Ledger always uses standard xpubs in wallet policies
xpub = self.get_xpub(f"m/{path_root}", 'standard')
key_info = f"[{fpr.hex()}/{path_root}]{xpub}"
if xtype == 'p2pkh':
name = "Legacy P2PKH"
descriptor_template = "pkh(@0/**)"
elif xtype == 'p2wpkh-p2sh':
name = "Nested SegWit"
descriptor_template = "sh(wpkh(@0/**))"
elif xtype == 'p2wpkh':
name = "SegWit"
descriptor_template = "wpkh(@0/**)"
elif xtype == 'p2tr':
name = "Taproot"
descriptor_template = "tr(@0/**)"
else:
return None
policy = WalletPolicy("", descriptor_template, [key_info])
if is_policy_standard(policy, master_fp, constants.net.BIP44_COIN_TYPE):
return policy
# Non standard policy, so give it a name
return WalletPolicy(name, descriptor_template, [key_info])
def password_dialog(self, msg=None):
response = self.handler.get_word(msg)
if response is None:
return False, None, None
return True, response, response
def _register_policy_if_needed(self, wallet_policy: 'WalletPolicy') -> Tuple[bytes, bytes]:
# If the policy is not register, registers it and saves its hmac on success
# Returns the pair of wallet id and wallet hmac
if wallet_policy.id not in self._registered_policies:
wallet_id, wallet_hmac = self.client.register_wallet(wallet_policy)
assert wallet_id == wallet_policy.id
self._registered_policies[wallet_id] = wallet_hmac
return wallet_policy.id, self._registered_policies[wallet_policy.id]
@runs_in_hwd_thread
@test_pin_unlocked
def show_address(self, address_path: str, txin_type: str):
client_ledger = self.client
self.handler.show_message(_("Showing address ..."))
# TODO: generalize for multisignature
try:
master_fp = client_ledger.get_master_fingerprint()
wallet_policy = self.get_singlesig_policy_for_path(address_path, txin_type, master_fp)
change, addr_index = [int(i) for i in address_path.split("/")[-2:]]
wallet_hmac = None
if not is_policy_standard(wallet_policy, master_fp, constants.net.BIP44_COIN_TYPE):
wallet_id, wallet_hmac = self._register_policy_if_needed(wallet_policy)
self.client.get_wallet_address(wallet_policy, wallet_hmac, change, addr_index, True)
except DenyError:
pass # cancelled by user
except BaseException as e:
_logger.exception('Error while showing an address')
self.handler.show_error(e)
finally:
self.handler.finished()
@runs_in_hwd_thread
@test_pin_unlocked
def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str):
if tx.is_complete():
return
# mostly adapted from HWI
psbt_bytes = tx.serialize_as_bytes()
psbt = ledger_bitcoin.client.PSBT()
psbt.deserialize(base64.b64encode(psbt_bytes).decode('ascii'))
try:
master_fp = self.client.get_master_fingerprint()
# Figure out which wallets are signing
wallets: Dict[bytes, Tuple[AddressType, WalletPolicy, Optional[bytes]]] = {}
for input_num, (electrum_txin, psbt_in) in enumerate(zip(tx.inputs(), psbt.inputs)):
if electrum_txin.is_coinbase_input():
raise UserFacingException(_('Coinbase not supported')) # should never happen
utxo = None
if psbt_in.witness_utxo:
utxo = psbt_in.witness_utxo
if psbt_in.non_witness_utxo:
if psbt_in.prev_txid != psbt_in.non_witness_utxo.hash:
raise UserFacingException(_('Input {} has a non_witness_utxo with the wrong hash').format(input_num))
assert psbt_in.prev_out is not None
utxo = psbt_in.non_witness_utxo.vout[psbt_in.prev_out]
if utxo is None:
continue
if (desc := electrum_txin.script_descriptor) is None:
raise Exception("script_descriptor missing for txin ")
scriptcode = desc.expand().scriptcode_for_sighash
is_wit, wit_ver, __ = is_witness(psbt_in.redeem_script or utxo.scriptPubKey)
script_addrtype = AddressType.LEGACY
if is_wit:
# if it's a segwit spend (any version), make sure the witness_utxo is also present
psbt_in.witness_utxo = utxo
if electrum_txin.is_p2sh_segwit():
if wit_ver == 0:
script_addrtype = AddressType.SH_WIT
else:
raise UserFacingException(_('Cannot have witness v1+ in p2sh'))
else:
if wit_ver == 0:
script_addrtype = AddressType.WIT
elif wit_ver == 1:
script_addrtype = AddressType.TAP
else:
continue
multisig = parse_multisig(scriptcode)
if multisig is not None:
k, ms_pubkeys = multisig
# Figure out the parent xpubs
key_exprs: List[str] = []
ok = True
our_keys = 0
for pub in ms_pubkeys:
if pub in psbt_in.hd_keypaths:
pk_origin = psbt_in.hd_keypaths[pub]
if pk_origin.fingerprint == master_fp:
our_keys += 1
for xpub_bytes, xpub_origin in psbt.xpub.items():
xpub_str = EncodeBase58Check(xpub_bytes)
if (xpub_origin.fingerprint == pk_origin.fingerprint) and (xpub_origin.path == pk_origin.path[:len(xpub_origin.path)]):
key_origin_full = pk_origin.to_string().replace('h', '\'')
# strip last two steps of derivation
key_origin_parts = key_origin_full.split('/')
if len(key_origin_parts) < 3:
raise UserFacingException(_('Unable to sign this transaction'))
key_origin = '/'.join(key_origin_parts[:-2])
key_exprs.append(f"[{key_origin}]{xpub_str}")
break
else:
# No xpub, Ledger will not accept this multisig
ok = False
if not ok:
continue
# Electrum uses sortedmulti; we make sure that the array of key information is normalized in a consistent order
key_exprs = list(sorted(key_exprs))
# Make and register the MultisigWallet
msw = MultisigWallet(f"{k} of {len(key_exprs)} Multisig", script_addrtype, k, key_exprs)
msw_id = msw.id
if msw_id not in wallets:
__, registered_hmac = self._register_policy_if_needed(msw)
wallets[msw_id] = (
script_addrtype,
msw,
registered_hmac,
)
else:
def process_origin(origin: KeyOriginInfo, *, script_addrtype=script_addrtype) -> None:
if is_standard_path(origin.path, script_addrtype, get_chain()):
# these policies do not need to be registered
policy = self.get_singlesig_default_wallet_policy(script_addrtype, origin.path[2])
wallets[policy.id] = (
script_addrtype,
self.get_singlesig_default_wallet_policy(script_addrtype, origin.path[2]),
None, # Wallet hmac
)
else:
# register the policy
if script_addrtype == AddressType.LEGACY:
name = "Legacy"
template = "pkh(@0/**)"
elif script_addrtype == AddressType.WIT:
name = "Native SegWit"
template = "wpkh(@0/**)"
elif script_addrtype == AddressType.SH_WIT:
name = "Nested SegWit"
template = "sh(wpkh(@0/**))"
elif script_addrtype == AddressType.TAP:
name = "Taproot"
template = "tr(@0/**)"
else:
raise ValueError("Unknown address type")
key_origin_info = origin.to_string()
key_origin_steps = key_origin_info.replace('h', '\'').split('/')[1:]
if len(key_origin_steps) < 3:
# Skip this input, not able to sign
return
# remove the last two steps
account_key_origin = "/".join(key_origin_steps[:-2])
# get the account-level xpub
xpub = self.get_xpub(f"m/{account_key_origin}", 'standard')
key_str = f"[{master_fp.hex()}/{account_key_origin}]{xpub}"
policy = WalletPolicy(name, template, [key_str])
__, registered_hmac = self.client.register_wallet(policy)
wallets[policy.id] = (
script_addrtype,
policy,
registered_hmac,
)
for key, origin in psbt_in.hd_keypaths.items():
if origin.fingerprint == master_fp:
process_origin(origin)
for key, (__, origin) in psbt_in.tap_bip32_paths.items():
# TODO: Support script path signing
if key == psbt_in.tap_internal_key and origin.fingerprint == master_fp:
process_origin(origin)
self.handler.show_message(_("Confirm Transaction on your Ledger device..."))
if len(wallets) == 0:
# Could not find a WalletPolicy to sign with
raise UserFacingException(_('Unable to sign this transaction'))
# For each wallet, sign
for __, (__, wallet, wallet_hmac) in wallets.items():
input_sigs = self.client.sign_psbt(psbt, wallet, wallet_hmac)
for idx, part_sig in input_sigs:
tx.add_signature_to_txin(
txin_idx=idx, signing_pubkey=part_sig.pubkey, sig=part_sig.signature)
except DenyError:
pass # cancelled by user
except BaseException as e:
_logger.exception('Error while signing')
self.handler.show_error(e)
finally:
self.handler.finished()
@runs_in_hwd_thread
@test_pin_unlocked
def sign_message(
self,
address_path: str,
message: str,
password,
*,
script_type: Optional[str] = None,
) -> bytes:
message = message.encode('utf8')
message_hash = hashlib.sha256(message).hexdigest().upper()
# prompt for the PIN before displaying the dialog if necessary
self.handler.show_message("Signing message ...\r\nMessage hash: " + message_hash)
result = b''
try:
sig_str = self.client.sign_message(message, address_path)
result = base64.b64decode(sig_str, validate=True)
except DenyError:
pass # cancelled by user
except BaseException as e:
_logger.exception('')
self.handler.show_error(e)
finally:
self.handler.finished()
return result
class Ledger_KeyStore(Hardware_KeyStore):
"""Ledger keystore. Targets all versions, will have different behavior with different clients."""
hw_type = 'ledger'
device = 'Ledger'
plugin: 'LedgerPlugin'
def __init__(self, d):
Hardware_KeyStore.__init__(self, d)
self.cfg = d.get('cfg', {'mode': 0})
def dump(self):
obj = Hardware_KeyStore.dump(self)
obj['cfg'] = self.cfg
return obj
def get_client_dongle_object(self, *, client: Optional[Ledger_Client] = None) -> Ledger_Client:
if client is None:
client = self.get_client()
return client
def decrypt_message(self, pubkey, message, password):
raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device))
def sign_message(self, sequence, *args, **kwargs):
address_path = self.get_derivation_prefix() + "/%d/%d" % sequence
address_path = normalize_bip32_derivation(address_path, hardened_char="'")
address_path = address_path[2:] # cut m/
return self.get_client_dongle_object().sign_message(address_path, *args, **kwargs)
def sign_transaction(self, *args, **kwargs):
return self.get_client_dongle_object().sign_transaction(self, *args, **kwargs)
def show_address(self, sequence, *args, **kwargs):
address_path = self.get_derivation_prefix() + "/%d/%d" % sequence
address_path = normalize_bip32_derivation(address_path, hardened_char="'")
address_path = address_path[2:] # cut m/
return self.get_client_dongle_object().show_address(address_path, *args, **kwargs)
class LedgerPlugin(HW_PluginBase):
keystore_class = Ledger_KeyStore
minimum_library = (0, 2, 0)
maximum_library = (1, 0)
DEVICE_IDS = [(0x2581, 0x1807), # HW.1 legacy btchip # not supported anymore (but we log an exception)
(0x2581, 0x2b7c), # HW.1 transitional production # not supported anymore
(0x2581, 0x3b7c), # HW.1 ledger production # not supported anymore
(0x2581, 0x4b7c), # HW.1 ledger test # not supported anymore
(0x2c97, 0x0000), # Blue
(0x2c97, 0x0001), # Nano-S
(0x2c97, 0x0004), # Nano-X
(0x2c97, 0x0005), # Nano-S Plus
(0x2c97, 0x0006), # Stax
(0x2c97, 0x0007), # Flex
(0x2c97, 0x0008), # RFU
(0x2c97, 0x0009), # RFU
(0x2c97, 0x000a)] # RFU
VENDOR_IDS = (0x2c97,)
LEDGER_MODEL_IDS = {
0x10: "Ledger Nano S",
0x40: "Ledger Nano X",
0x50: "Ledger Nano S Plus",
0x60: "Ledger Stax",
0x70: "Ledger Flex",
}
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
def __init__(self, parent, config, name):
HW_PluginBase.__init__(self, parent, config, name)
self.libraries_available = self.check_libraries_available()
if not self.libraries_available:
_logger.info("Library unavailable")
return
# to support legacy devices and legacy firmwares
self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
# to support modern firmware
self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self)
def get_library_version(self):
try:
import ledger_bitcoin
version = ledger_bitcoin.__version__
except ImportError:
raise
except Exception:
version = "unknown"
if LEDGER_BITCOIN:
return version
else:
raise LibraryFoundButUnusable(library_version=version)
@classmethod
def is_hw1(cls, product_key) -> bool:
return product_key[0] == 0x2581
@classmethod
def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]:
"""Returns (can_recognize, model_name) tuple."""
# legacy product_keys
if product_key in cls.DEVICE_IDS:
if cls.is_hw1(product_key):
return True, "Ledger HW.1"
if product_key == (0x2c97, 0x0000):
return True, "Ledger Blue"
if product_key == (0x2c97, 0x0001):
return True, "Ledger Nano S"
if product_key == (0x2c97, 0x0004):
return True, "Ledger Nano X"
if product_key == (0x2c97, 0x0005):
return True, "Ledger Nano S Plus"
if product_key == (0x2c97, 0x0006):
return True, "Ledger Stax"
if product_key == (0x2c97, 0x0007):
return True, "Ledger Flex"
return True, None
# modern product_keys
if product_key[0] == 0x2c97:
product_id = product_key[1]
model_id = product_id >> 8
if model_id in cls.LEDGER_MODEL_IDS:
model_name = cls.LEDGER_MODEL_IDS[model_id]
return True, model_name
# give up
return False, None
def can_recognize_device(self, device: Device) -> bool:
can_recognize = self._recognize_device(device.product_key)[0]
if can_recognize:
# Do a further check, duplicated from:
# https://github.com/LedgerHQ/ledgercomm/blob/bc5ada865980cb63c2b9b71a916e01f2f8e53716/ledgercomm/interfaces/hid_device.py#L79-L82
# Modern ledger devices can have multiple interfaces picked up by hid, only one of which is usable by us.
# If we try communicating with the wrong one, we might not get a reply and block forever.
if device.product_key[0] == 0x2c97:
if not (device.interface_number == 0 or device.usage_page == 0xffa0):
return False
return can_recognize
@classmethod
def device_name_from_product_key(cls, product_key) -> Optional[str]:
return cls._recognize_device(product_key)[1]
def create_device_from_hid_enumeration(self, d, *, product_key):
device = super().create_device_from_hid_enumeration(d, product_key=product_key)
if not self.can_recognize_device(device):
return None
return device
@runs_in_hwd_thread
def create_client(self, device, handler) -> Union[Ledger_Client, None, HardwareClientDummy]:
try:
return Ledger_Client.construct_new(device=device, product_key=device.product_key, plugin=self)
except Exception as e:
self.logger.info(f"cannot connect at {device.path} {e}", exc_info=e)
return None
@runs_in_hwd_thread
def show_address(self, wallet, address, keystore=None):
if keystore is None:
keystore = wallet.get_keystore()
if not self.show_address_helper(wallet, address, keystore):
return
if type(wallet) is not Standard_Wallet:
keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
return
sequence = wallet.get_address_index(address)
txin_type = wallet.get_txin_type(address)
keystore.show_address(sequence, txin_type)
def wizard_entry_for_device(self, device_info: 'DeviceInfo', *, new_wallet=True) -> str:
if new_wallet:
return 'ledger_start' if device_info.initialized else 'ledger_not_initialized'
else:
return 'ledger_unlock'
# insert ledger pages in new wallet wizard
def extend_wizard(self, wizard: 'NewWalletWizard'):
views = {
'ledger_start': {
'next': 'ledger_xpub',
},
'ledger_xpub': {
'next': lambda d: wizard.wallet_password_view(d) if wizard.last_cosigner(d) else 'multisig_cosigner_keystore',
'accept': wizard.maybe_master_pubkey,
'last': lambda d: wizard.is_single_password() and wizard.last_cosigner(d)
},
'ledger_not_initialized': {},
'ledger_unlock': {
'last': True
},
}
wizard.navmap_merge(views)