1
0
Files
electrum/electrum/plugins/jade/jadepy/jade.py
2022-09-15 11:54:42 +01:00

1748 lines
63 KiB
Python

import cbor
import hashlib
import json
import time
import logging
import collections
import collections.abc
import traceback
import random
import sys
# JadeError
from .jade_error import JadeError
# Low-level comms backends
from .jade_serial import JadeSerialImpl
from .jade_tcp import JadeTCPImpl
# 'jade' logger
logger = logging.getLogger('jade')
device_logger = logging.getLogger('jade-device')
# BLE comms backend is optional
# It relies on the BLE dependencies being available
try:
from .jade_ble import JadeBleImpl
except ImportError as e:
logger.warn(e)
logger.warn('BLE scanning/connectivity will not be available')
# Default serial connection
DEFAULT_SERIAL_DEVICE = '/dev/ttyUSB0'
DEFAULT_BAUD_RATE = 115200
DEFAULT_SERIAL_TIMEOUT = 120
# Default BLE connection
DEFAULT_BLE_DEVICE_NAME = 'Jade'
DEFAULT_BLE_SERIAL_NUMBER = None
DEFAULT_BLE_SCAN_TIMEOUT = 60
def _hexlify(data):
"""
Helper to map bytes-like types into hex-strings
to make for prettier message-logging.
Parameters
----------
data : any
The object to hexlify.
- bytes or bytearrays have 'hex()' method invoked
- list and dicts (values) have this function mapped over them
- Otherwise the input is returned unchanged
"""
if data is None:
return None
elif isinstance(data, bytes) or isinstance(data, bytearray):
return data.hex()
elif isinstance(data, list):
return [_hexlify(item) for item in data]
elif isinstance(data, dict):
return {k: _hexlify(v) for k, v in data.items()}
else:
return data
# NOTE: Removed entirely for electrum - so it is not used silently as a fallback.
# (hard error preferred in that case)
# Jade repo api will be improved to make enabling this function more explicit
# try:
# import requests
#
# def _http_request(params):
# """
# Simple http request function which can be used when a Jade response
# requires an external http call.
# The default implementation used in JadeAPI._jadeRpc() below.
# NOTE: Only available if the 'requests' dependency is available.
#
# Callers can supply their own implmentation of this call where it is required.
#
# Parameters
# ----------
# data : dict
# A dictionary structure describing the http call to make
#
# Returns
# -------
# dict
# with single key 'body', whose value is the json returned from the call
#
# """
# logger.debug('_http_request: {}'.format(params))
#
# # Use the first non-onion url
# url = [url for url in params['urls'] if not url.endswith('.onion')][0]
# if params['method'] == 'GET':
# assert 'data' not in params, 'Cannot pass body to requests.get'
# f = requests.get(url)
# elif params['method'] == 'POST':
# data = json.dumps(params['data'])
# f = requests.post(url, data)
#
# logger.debug("http_request received reply: {}".format(f.text))
#
# if f.status_code != 200:
# logger.error("http error {} : {}".format(f.status_code, f.text))
# raise ValueError(f.status_code)
#
# assert params['accept'] == 'json'
# f = f.json()
#
# return {'body': f}
#
# except ImportError as e:
# logger.warn(e)
# logger.warn('Default _http_requests() function will not be available')
#
class JadeAPI:
"""
High-Level Jade Client API
Builds on a JadeInterface to provide a meaningful API
Either:
a) use with JadeAPI.create_[serial|ble]() as jade:
(recommended)
or:
b) use JadeAPI.create_[serial|ble], then call connect() before
using, and disconnect() when finished
(caveat cranium)
or:
c) use ctor to wrap existing JadeInterface instance
(caveat cranium)
"""
def __init__(self, jade):
assert jade is not None
self.jade = jade
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc, tb):
if (exc_type):
logger.error("Exception causing JadeAPI context exit.")
logger.error(exc_type)
logger.error(exc)
traceback.print_tb(tb)
self.disconnect(exc_type is not None)
@staticmethod
def create_serial(device=None, baud=None, timeout=None):
"""
Create a JadeAPI object using the serial interface described.
Parameters
----------
device : str, optional
The device identifier for the serial device.
Underlying implementation will default (to /dev/ttyUSB0)
baud : int, optional
The communication baud rate.
Underlying implementation will default (to 115200)
timeout : int, optional
The serial read timeout when awaiting messages.
Underlying implementation will default (to 120s)
Returns
-------
JadeAPI
API object configured to use given serial parameters.
NOTE: the api instance has not yet tried to contact the hw
- caller must call 'connect()' before trying to use the Jade.
"""
impl = JadeInterface.create_serial(device, baud, timeout)
return JadeAPI(impl)
@staticmethod
def create_ble(device_name=None, serial_number=None,
scan_timeout=None, loop=None):
"""
Create a JadeAPI object using the BLE interface described.
NOTE: raises JadeError if BLE dependencies not installed.
Parameters
----------
device_name : str, optional
The device name of the desired BLE device.
Underlying implementation will default (to 'Jade')
serial_number : int, optional
The serial number of the desired BLE device
- used to disambiguate multiple beacons with the same 'device name'
Underlying implementation will connect to the first beacon it scans
with the matching 'device name'.
scan_timeout : int, optional
The timeout when scanning for devices which match the device name/serial number.
Underlying implementation will default (to 60s)
loop : optional
The asynchio event loop to use, if required.
Underlying implementation will default (to asyncio.get_event_loop())
Returns
-------
JadeAPI
API object configured to use given BLE parameters.
NOTE: the api instance has not yet tried to contact the hw
- caller must call 'connect()' before trying to use the Jade.
Raises
------
JadeError if BLE backend not available (ie. BLE dependencies not installed)
"""
impl = JadeInterface.create_ble(device_name, serial_number,
scan_timeout, loop)
return JadeAPI(impl)
def connect(self):
"""
Try to connect the underlying transport interface (eg. serial, ble, etc.)
Raises an exception on failure.
"""
self.jade.connect()
def disconnect(self, drain=False):
"""
Disconnect the underlying transport (eg. serial, ble, etc.)
Parameters
----------
drain : bool, optional
When true log any/all remaining messages/data, otherwise silently discard.
NOTE: can prevent disconnection if data is arriving constantly.
Defaults to False.
"""
self.jade.disconnect(drain)
def drain(self):
"""
Log any/all outstanding messages/data.
NOTE: can run indefinitely if data is arriving constantly.
"""
self.jade.drain()
@staticmethod
def _get_result_or_raise_error(reply):
"""
Raise any error message returned from a Jade rpc call as an exception.
Parameters
----------
reply : dict
Dictionary representing a reply from a Jade rpc call.
Returns
-------
dict
Any nested 'result' structure, if the reply is not an error.
Raises
------
JadeError
If the reply represented an error, including all details received.
"""
if 'error' in reply:
e = reply['error']
raise JadeError(e.get('code'), e.get('message'), e.get('data'))
return reply['result']
def _jadeRpc(self, method, params=None, inputid=None, http_request_fn=None, long_timeout=False):
"""
Helper to make a request/reply rpc call over the underlying transport interface.
NOTE: interface must be 'connected'.
If the call returns an 'http_request' structure, this is handled here and the http
call is made, and the result is passed into the rpc method given in 'on reply', by
calling this function recursively.
Parameters
----------
method : str
rpc method to invoke
params : dict, optional
any parameters to pass to the rpc method
Defaults to None.
inputid : str, optional
Any specific 'id' to use in the rpc message.
Defaults to a using a pseudo-random id generated in-situ.
http_request_fn : function, optional
A function which accepts a dict (containing a description of the http request), makes
the described http call, and returns the body data in an element called 'body'.
Defaults to _http_request() above.
long_timeout : bool, optional
Whether the rpc call should use an indefinitely long timeout, rather than that set on
construction.
(Useful if the call involves a non-trivial user interaction with the device.)
Defaults to False.
Returns
-------
dict
The reply from the rpc call.
NOTE: will return the last/final reply after a sequence of calls, where 'http_request'
was returned and remote data was fetched and passed into s subsequent call.
"""
newid = inputid if inputid else str(random.randint(100000, 999999))
request = self.jade.build_request(newid, method, params)
reply = self.jade.make_rpc_call(request, long_timeout)
result = self._get_result_or_raise_error(reply)
# The Jade can respond with a request for interaction with a remote
# http server. This is used for interaction with the pinserver but the
# code below acts as a dumb proxy and simply makes the http request and
# forwards the response back to the Jade.
# Note: the function called to make the http-request can be passed in,
# or it can default to the simple _http_request() function above, if available.
if isinstance(result, collections.abc.Mapping) and 'http_request' in result:
this_module = sys.modules[__name__]
make_http_request = http_request_fn or getattr(this_module, '_http_request', None)
assert make_http_request, 'Default _http_request() function not available'
http_request = result['http_request']
http_response = make_http_request(http_request['params'])
return self._jadeRpc(
http_request['on-reply'],
http_response['body'],
http_request_fn=make_http_request,
long_timeout=long_timeout)
return result
def get_version_info(self):
"""
RPC call to fetch summary details pertaining to the hardware unit and running firmware.
Returns
-------
dict
Contains keys for various info describing the hw and running fw
"""
return self._jadeRpc('get_version_info')
def add_entropy(self, entropy):
"""
RPC call to add client entropy into the unit RNG entropy pool.
Parameters
----------
entropy : bytes
Bytes to fold into the hw entropy pool.
Returns
-------
bool
True on success
"""
params = {'entropy': entropy}
return self._jadeRpc('add_entropy', params)
def set_epoch(self, epoch=None):
"""
RPC call to set the current time epoch value, required for TOTP use.
NOTE: The time is lost on each power-down and must be reset on restart/reconnect before
TOTP can be used.
Parameters
----------
epoch : int, optional
Current epoch value, in seconds. Defaults to int(time.time()) value.
Returns
-------
bool
True on success
"""
params = {'epoch': epoch if epoch is not None else int(time.time())}
return self._jadeRpc('set_epoch', params)
def ota_update(self, fwcmp, fwlen, chunksize, patchlen=None, cb=None):
"""
RPC call to attempt to update the unit's firmware.
Parameters
----------
fwcmp : bytes
The compressed firmware image to upload to the Jade unit. Can be a full firmware or
and incremental diff to be applied to the currently running firmware image.
fwlen : int
The size of the new complete (uncompressed) firmware image (after any delta is applied).
chunksize : int
The size of the chunks used to upload the compressed firmware. Each chunk is uploaded
and ack'd by the hw unit.
The maximum supported chunk size is given in the version info data, under the key
'JADE_OTA_MAX_CHUNK'.
patchlen: int, optional
If the compressed firmware bytes are an incremental diff to be applied to the running
firmware image, this is the size of that patch when uncompressed.
Defaults to None, implying the compressed data is a full firmware image upload.
(Compare with fwlen - the size of the final fw image.)
cb : function, optional
Callback function accepting two integers - the amount of compressed firmware sent thus
far, and the total length of the compressed firmware to send.
If passed, this function is invoked each time a fw chunk is successfully uploaded and
ack'd by the hw, to notify of upload progress.
Defaults to None, and nothing is called to report upload progress.
Returns
-------
bool
True if no errors were reported - on next restart the hw unit will attempt to boot the
new firmware.
"""
# Compute the sha256 hash of the compressed file being uploaded
cmphasher = hashlib.sha256()
cmphasher.update(fwcmp)
cmphash = cmphasher.digest()
cmplen = len(fwcmp)
# Initiate OTA
ota_method = 'ota'
params = {'fwsize': fwlen,
'cmpsize': cmplen,
'cmphash': cmphash}
if patchlen is not None:
ota_method = 'ota_delta'
params['patchsize'] = patchlen
result = self._jadeRpc(ota_method, params)
assert result is True
# Write binary chunks
written = 0
while written < cmplen:
remaining = cmplen - written
length = min(remaining, chunksize)
chunk = bytes(fwcmp[written:written + length])
result = self._jadeRpc('ota_data', chunk)
assert result is True
written += length
if (cb):
cb(written, cmplen)
# All binary data uploaded
return self._jadeRpc('ota_complete')
def run_remote_selfcheck(self):
"""
RPC call to run in-built tests.
NOTE: Only available in a DEBUG build of the firmware.
Returns
-------
bool
True on success.
"""
return self._jadeRpc('debug_selfcheck', long_timeout=True)
def clean_reset(self):
"""
RPC call to clean/reset memory and storage, as much as is practical.
NOTE: Only available in a DEBUG build of the firmware.
Returns
-------
bool
True on success.
"""
return self._jadeRpc('debug_clean_reset')
def set_mnemonic(self, mnemonic, passphrase=None, temporary_wallet=False):
"""
RPC call to set the wallet mnemonic (in RAM only - flash storage is untouched).
NOTE: Only available in a DEBUG build of the firmware.
Parameters
----------
mnemonic : str
The wallet mnemonic to set.
passphrase : str, optional
Any bip39 passphrase to apply.
Defaults to None.
temporary_wallet : bool, optional
Whether to treat this wallet/mnemonic as an 'Emergency Restore' temporary wallet, as
opposed to one successfully loaded from the flash storage.
NOTE: in either case the wallet is only set in RAM, and flash storage is not affected.
Defaults to False.
Returns
-------
bool
True on success.
"""
params = {'mnemonic': mnemonic, 'passphrase': passphrase,
'temporary_wallet': temporary_wallet}
return self._jadeRpc('debug_set_mnemonic', params)
def set_seed(self, seed):
"""
RPC call to set the wallet seed.
NOTE: Only available in a DEBUG build of the firmware.
NOTE: Setting a seed always sets a 'temporary' wallet.
Parameters
----------
seed : bytes
The wallet seed to set as a temporary wallet (cannot be persisted in flash).
Returns
-------
bool
True on success.
"""
params = {'seed': seed}
return self._jadeRpc('debug_set_mnemonic', params)
def set_pinserver(self, urlA=None, urlB=None, pubkey=None, cert=None):
"""
RPC call to explicitly set (override) the details of the blind pinserver used to
authenticate the PIN entered on the Jade unit.
This data is recorded in the hw flash, and returned to the caller when authenticating
(in auth_user(), below).
Parameters
----------
urlA : str, optional
The primary url of the pinserver to use.
urlB : str, optional
Any secondary url of the pinserver to use.
pubkey : bytes, optional
The public key used to verify pinserver signed payloads.
cert : bytes, optional
Any additional certificate required to verify the pinserver identity.
Returns
-------
bool
True on success.
"""
params = {}
if urlA is not None or urlB is not None:
params['urlA'] = urlA
params['urlB'] = urlB
if pubkey is not None:
params['pubkey'] = pubkey
if cert is not None:
params['certificate'] = cert
return self._jadeRpc('update_pinserver', params)
def reset_pinserver(self, reset_details, reset_certificate):
"""
RPC call to reset any formerly overidden pinserver details to their defauts.
Parameters
----------
reset_details : bool, optional
If set, any overidden urls and pubkey are reset to their defaults.
reset_certificate : bool, optional
If set, any additional certificate is reset (to None).
Returns
-------
bool
True on success.
"""
params = {'reset_details': reset_details,
'reset_certificate': reset_certificate}
return self._jadeRpc('update_pinserver', params)
def auth_user(self, network, http_request_fn=None, epoch=None):
"""
RPC call to authenticate the user on the hw device, for using with the network provided.
Parameters
----------
network : str
The name of the network intended for use - eg. 'mainnet', 'liquid', 'testnet' etc.
This is verified against the networks allowed on the hardware.
http_request_fn : function, optional
Optional http-request function to pass http requests to the Jade pinserver.
Default behaviour is to use the '_http_request()' function which defers to the
'requests' module.
If the 'reqests' module is not available, no default http-request function is created,
and one must be supplied here.
epoch : int, optional
Current epoch value, in seconds. Defaults to int(time.time()) value.
Returns
-------
bool
True is returned immediately if the hw is already unlocked for use on the given network.
True if the PIN is entered and verified with the remote blind pinserver.
False if the PIN entered was incorrect.
"""
params = {'network': network, 'epoch': epoch if epoch is not None else int(time.time())}
return self._jadeRpc('auth_user', params,
http_request_fn=http_request_fn,
long_timeout=True)
def register_otp(self, otp_name, otp_uri):
"""
RPC call to register a new OTP record on the hw device.
Parameters
----------
otp_name : str
An identifying name for this OTP record
otp_uri : str
The uri of this OTP record - must begin 'otpauth://'
Returns
-------
bool
True if the OTP uri was validated and persisted on the hw
"""
params = {'name': otp_name, 'uri': otp_uri}
return self._jadeRpc('register_otp', params)
def get_otp_code(self, otp_name, value_override=None):
"""
RPC call to fetch a new OTP code from the hw device.
Parameters
----------
otp_name : str
An identifying name for the OTP record to use
value_override : int
An overriding HOTP counter or TOTP timestamp to use.
NOTE: Only available in a DEBUG build of the firmware.
Returns
-------
bool
True if the OTP uri was validated and persisted on the hw
"""
params = {'name': otp_name}
if value_override is not None:
params['override'] = value_override
return self._jadeRpc('get_otp_code', params)
def get_xpub(self, network, path):
"""
RPC call to fetch an xpub for the given bip32 path for the given network.
Parameters
----------
network : str
Network to which the xpub applies - eg. 'mainnet', 'liquid', 'testnet', etc.
path : [int]
bip32 path for which the xpub should be generated.
Returns
-------
str
base58 encoded xpub
"""
params = {'network': network, 'path': path}
return self._jadeRpc('get_xpub', params)
def get_registered_multisigs(self):
"""
RPC call to fetch brief summaries of any multisig wallets registered to this signer.
Returns
-------
dict
Brief description of registered multisigs, keyed by registration name.
Each entry contains keys:
variant - str, script type, eg. 'sh(wsh(multi(k)))'
sorted - boolean, whether bip67 key sorting is applied
threshold - int, number of signers required,N
num_signers - total number of signatories, M
master_blinding_key - 32-bytes, any liquid master blinding key for this wallet
"""
return self._jadeRpc('get_registered_multisigs')
def register_multisig(self, network, multisig_name, variant, sorted_keys, threshold, signers,
master_blinding_key=None):
"""
RPC call to register a new multisig wallet, which must contain the hw signer.
A registration name is provided - if it already exists that record is overwritten.
Parameters
----------
network : string
Network to which the multisig should apply - eg. 'mainnet', 'liquid', 'testnet', etc.
multisig_name : string
Name to use to identify this multisig wallet registration record.
If a registration record exists with the name given, that record is overwritten.
variant : str
The script type - one of 'sh(multi(k))', 'wsh(multi(k))', 'sh(wsh(multi(k)))'
sorted_keys : bool
Whether this is a 'sortedmulti()' wallet - ie. whether to apply bip67 sorting to the
pubkeys when generating redeem scripts.
threshold : int
Number of signers required.
signers : [dict]
Description of signers - should include keys:
- 'fingerprint' - 4 bytes, origin fingerprint
- 'derivation' - [int], bip32 path from origin to signer xpub provided
- 'xpub' - str, base58 xpub of signer - will be verified for hw unit signer
- 'path' - [int], any fixed path to always apply after the xpub - usually empty.
master_blinding_key : 32-bytes, optional
The master blinding key to use for this multisig wallet on liquid.
Optional, defaults to None.
Logically mandatory when 'network' indicates a liquid network and the Jade is to be
used to generate confidential addresses, blinding keys, blinding nonces, asset blinding
factors or output commitments.
Returns
-------
bool
True on success, implying the mutisig wallet can now be used.
"""
params = {'network': network, 'multisig_name': multisig_name,
'descriptor': {'variant': variant, 'sorted': sorted_keys,
'threshold': threshold, 'signers': signers,
'master_blinding_key': master_blinding_key}}
return self._jadeRpc('register_multisig', params)
def get_receive_address(self, *args, recovery_xpub=None, csv_blocks=0,
variant=None, multisig_name=None, confidential=None):
"""
RPC call to generate, show, and return an address for the given path.
The call has three forms.
Parameters
----------
network: str
Network to which the address should apply - eg. 'mainnet', 'liquid', 'testnet', etc.
Then either:
1. Blockstream Green (multisig shield) addresses
subaccount : int
Blockstream Green subaccount
branch : int
Blockstream Green derivation branch
pointer : int
Blockstream Green address pointer
recovery_xpub : str, optional
xpub of recovery key for 2of3 subaccounts. Otherwise should be omitted.
Defaults to None (ie. not a 2of3 subaccount).
csv_blocks : int, optional
Number of blocks to include in csv redeem script, if this is a csv-enabled account.
Otherwise should be omitted.
Defaults to 0 (ie. does not apply/not a csv-enabled account.)
2. Generic single-sig addresses
path: [int]
bip32 path for which the xpub should be generated.
variant: str
The script type - one of 'pkh(k)', 'wpkh(k)', 'sh(wpkh(k))'
3. Generic multisig addresses
paths: [[int]]
bip32 path suffixes, one for each signer, applied as a suffix to the registered
signer path. Usually these path suffixes will all be identical.
multisig_name : str
The name of the registered multisig wallet record used to generate the address.
Returns
-------
str
The address generated for the given parameters.
"""
if multisig_name is not None:
assert len(args) == 2
keys = ['network', 'paths', 'multisig_name']
args += (multisig_name,)
elif variant is not None:
assert len(args) == 2
keys = ['network', 'path', 'variant']
args += (variant,)
else:
assert len(args) == 4
keys = ['network', 'subaccount', 'branch', 'pointer', 'recovery_xpub', 'csv_blocks']
args += (recovery_xpub, csv_blocks)
params = dict(zip(keys, args))
if confidential is not None:
params['confidential'] = confidential
return self._jadeRpc('get_receive_address', params)
def sign_message(self, path, message, use_ae_signatures=False,
ae_host_commitment=None, ae_host_entropy=None):
"""
RPC call to format and sign the given message, using the given bip32 path.
Supports RFC6979 and anti-exfil signatures.
Parameters
----------
path : [int]
bip32 path for which the signature should be generated.
message : str
Message string to format and sign.
ae_host_commitment : 32-bytes, optional
The host-commitment to use for Antil-Exfil signatures
ae_host_entropy : 32-bytes, optional
The host-entropy to use for Antil-Exfil signatures
Returns
-------
1. Legacy/RFC6979 signatures
str
base64-encoded signature
2. Anti-exfil signatures
(bytes, str)
signer-commitment, base64-encoded signature
"""
if use_ae_signatures:
# Anti-exfil protocol:
# We send the signing request and receive the signer-commitment in
# reply once the user confirms.
# We can then request the actual signature passing the ae-entropy.
params = {'path': path, 'message': message, 'ae_host_commitment': ae_host_commitment}
signer_commitment = self._jadeRpc('sign_message', params)
params = {'ae_host_entropy': ae_host_entropy}
signature = self._jadeRpc('get_signature', params)
return signer_commitment, signature
else:
# Standard EC signature, simple case
params = {'path': path, 'message': message}
return self._jadeRpc('sign_message', params)
def get_identity_pubkey(self, identity, curve, key_type, index=0):
"""
RPC call to fetch a pubkey for the given identity (slip13/slip17).
NOTE: this api returns an uncompressed public key
Parameters
----------
identity : str
Identity string to format and sign. For example ssh://satoshi@bitcoin.org
curve : str
Name of curve to use - currently only 'nist256p1' is supported
key_type : str
Key derivation type - must be either 'slip-0013' for an identity pubkey, or 'slip-0017'
for an ecdh pubkey.
index : int, optional
Index number (if require multiple keys/sigs per identity)
Defaults to 0
Returns
-------
65-bytes
Uncompressed public key for the given identity and index.
Consistent with 'sign_identity' or 'get_identity_shared_key', depending on the
'key_type'.
"""
params = {'identity': identity, 'curve': curve, 'type': key_type, 'index': index}
return self._jadeRpc('get_identity_pubkey', params)
def get_identity_shared_key(self, identity, curve, their_pubkey, index=0):
"""
RPC call to fetch a SLIP-0017 shared ecdh key for the identity and counterparty public key.
NOTE: this api takes an uncompressed public key
Parameters
----------
identity : str
Identity string to format and sign. For example ssh://satoshi@bitcoin.org
curve : str
Name of curve to use - currently only 'nist256p1' is supported
their_pubkey : 65-bytes
The counterparty's uncompressed public key
index : int, optional
Index number (if require multiple keys/sigs per identity)
Defaults to 0
Returns
-------
32-bytes
The shared ecdh key for the given identity and cpty public key
Consistent with 'get_identity_pubkey' with 'key_type=slip-0017'
"""
params = {'identity': identity, 'curve': curve, 'index': index,
'their_pubkey': their_pubkey}
return self._jadeRpc('get_identity_shared_key', params)
def sign_identity(self, identity, curve, challenge, index=0):
"""
RPC call to authenticate the given identity through a challenge.
Supports RFC6979.
Returns the signature and the associated SLIP-0013 pubkey
NOTE: this api returns an uncompressed public key
Parameters
----------
identity : str
Identity string to format and sign. For example ssh://satoshi@bitcoin.org
curve : str
Name of curve to use - currently only 'nist256p1' is supported
challenge : bytes
Challenge bytes to sign
index : int, optional
Index number (if require multiple keys/sigs per identity)
Defaults to 0
Returns
-------
dict
Contains keys:
pubkey - 65-bytes, the uncompressed SLIP-0013 public key, consistent with
'get_identity_pubkey' with 'key_type=slip-0013'
signature - 65-bytes, RFC6979 deterministic signature, prefixed with 0x00
"""
params = {'identity': identity, 'curve': curve, 'index': index, 'challenge': challenge}
return self._jadeRpc('sign_identity', params)
def get_master_blinding_key(self):
"""
RPC call to fetch the master (SLIP-077) blinding key for the hw signer.
NOTE: the master blinding key of any registered multisig wallets can be obtained from
the result of `get_registered_multisigs()`.
Returns
-------
32-bytes
SLIP-077 master blinding key
"""
return self._jadeRpc('get_master_blinding_key')
def get_blinding_key(self, script, multisig_name=None):
"""
RPC call to fetch the public blinding key for the hw signer.
Parameters
----------
script : bytes
The script for which the public blinding key is required.
multisig_name : str, optional
The name of any registered multisig wallet for which to fetch the blinding key.
Defaults to None
Returns
-------
33-bytes
Public blinding key for the passed script.
"""
params = {'script': script, 'multisig_name': multisig_name}
return self._jadeRpc('get_blinding_key', params)
def get_shared_nonce(self, script, their_pubkey, include_pubkey=False, multisig_name=None):
"""
RPC call to get the shared secret to unblind a tx, given the receiving script and
the pubkey of the sender (sometimes called "blinding nonce" in Liquid).
Optionally fetch the hw signer's public blinding key also.
Parameters
----------
script : bytes
The script for which the blinding nonce is required.
their_pubkey : 33-bytes
The counterparty public key.
include_pubkey : bool, optional
Whether to also return the wallet's public blinding key.
Defaults to False.
multisig_name : str, optional
The name of any registered multisig wallet for which to fetch the blinding nonce.
Defaults to None
Returns
-------
1. include_pubkey is False
33-bytes
Public blinding nonce for the passed script and counterparty public key.
2. include_pubkey is True
dict
Contains keys:
shared_nonce - 32-bytes, public blinding nonce for the passed script as above.
blinding_key - 33-bytes, public blinding key for the passed script.
"""
params = {'script': script, 'their_pubkey': their_pubkey,
'include_pubkey': include_pubkey, 'multisig_name': multisig_name}
return self._jadeRpc('get_shared_nonce', params)
def get_blinding_factor(self, hash_prevouts, output_index, bftype, multisig_name=None):
"""
RPC call to get a deterministic "trusted" blinding factor to blind an output.
Normally the blinding factors are generated and returned in the `get_commitments` call,
but for the last output the vbf must be generated on the host, so this call allows the
host to get a valid abf to compute the generator and then the "final" vbf.
Nonetheless, this call is kept generic, and can also generate vbfs, hence the "bftype"
parameter.
Parameters
----------
hash_prevouts : 32-bytes
This value is computed as specified in bip143.
It is verified immediately since at this point Jade doesn't have the tx in question.
It will be checked later during `sign_liquid_tx()`.
output_index : int
The index of the output we are trying to blind
bftype : str
Can be eitehr "ASSET" or "VALUE", to generate abfs or vbfs.
multisig_name : str, optional
The name of any registered multisig wallet for which to fetch the blinding factor.
Defaults to None
Returns
-------
32-bytes
The requested blinding factor
"""
params = {'hash_prevouts': hash_prevouts,
'output_index': output_index,
'type': bftype,
'multisig_name': multisig_name}
return self._jadeRpc('get_blinding_factor', params)
def get_commitments(self,
asset_id,
value,
hash_prevouts,
output_index,
vbf=None,
multisig_name=None):
"""
RPC call to generate deterministic blinding factors and commitments for a given output.
Can optionally get a "custom" VBF, normally used for the last input where the vbf is not
computed here, but generated on the host according to all the other values.
The commitments generated here should be passed back into `sign_liquid_tx()`.
Parameters
----------
asset_id : 32-bytes
asset_id as usually displayed - ie. reversed compared to network/consensus order
value : int
value in 'satoshi' or equivalent atomic integral unit
hash_prevouts : 32-bytes
This value is computed as specified in bip143.
It is verified immediately since at this point Jade doesn't have the tx in question.
It will be checked later during `sign_liquid_tx()`.
output_index : int
The index of the output we are trying to blind
vbf : 32-bytes, optional
The vbf to use, in preference to deterministically generating one in this call.
multisig_name : str, optional
The name of any registered multisig wallet for which to fetch the blinding factor.
Defaults to None
Returns
-------
dict
Containing the following the blinding factors and output commitments.
"""
params = {'asset_id': asset_id,
'value': value,
'hash_prevouts': hash_prevouts,
'output_index': output_index,
'vbf': vbf,
'multisig_name': multisig_name}
return self._jadeRpc('get_commitments', params)
def _send_tx_inputs(self, base_id, inputs, use_ae_signatures):
"""
Helper call to send the tx inputs to Jade for signing.
Handles legacy RFC6979 signatures, as well as the Anti-Exfil protocol.
Parameters
----------
base_id : int
The ids of the messages sent will be increments from this base id.
inputs : [dict]
The tx inputs - see `sign_tx()` / `sign_liquid_tx()` for details.
use_ae_signatures : bool
Whether to use the anti-exfil protocol to generate the signatures
Returns
-------
1. if use_ae_signatures is False
[bytes]
An array of signatures corresponding to the array of inputs passed.
The signatures are in DER format with the sighash appended.
'None' placeholder elements are used for inputs not requiring a signature.
2. if use_ae_signatures is True
[(32-bytes, bytes)]
An array of pairs of signer-commitments and signatures corresponding to the inputs.
The signatures are in DER format with the sighash appended.
(None, None) placeholder elements are used for inputs not requiring a signature.
"""
if use_ae_signatures:
# Anti-exfil protocol:
# We send one message per input (which includes host-commitment *but
# not* the host entropy) and receive the signer-commitment in reply.
# Once all n input messages are sent, we can request the actual signatures
# (as the user has a chance to confirm/cancel at this point).
# We request the signatures passing the ae-entropy for each one.
# Send inputs one at a time, receiving 'signer-commitment' in reply
signer_commitments = []
host_ae_entropy_values = []
for txinput in inputs:
# ae-protocol - do not send the host entropy immediately
txinput = txinput.copy() # shallow copy
host_ae_entropy_values.append(txinput.pop('ae_host_entropy', None))
base_id += 1
input_id = str(base_id)
reply = self._jadeRpc('tx_input', txinput, input_id)
signer_commitments.append(reply)
# Request the signatures one at a time, sending the entropy
signatures = []
for (i, host_ae_entropy) in enumerate(host_ae_entropy_values, 1):
base_id += 1
sig_id = str(base_id)
params = {'ae_host_entropy': host_ae_entropy}
reply = self._jadeRpc('get_signature', params, sig_id)
signatures.append(reply)
assert len(signatures) == len(inputs)
return list(zip(signer_commitments, signatures))
else:
# Legacy protocol:
# We send one message per input - without expecting replies.
# Once all n input messages are sent, the hw then sends all n replies
# (as the user has a chance to confirm/cancel at this point).
# Then receive all n replies for the n signatures.
# NOTE: *NOT* a sequence of n blocking rpc calls.
# NOTE: at some point this flow should be removed in favour of the one
# above, albeit without passing anti-exfil entropy or commitment data.
# Send all n inputs
requests = []
for txinput in inputs:
base_id += 1
msg_id = str(base_id)
request = self.jade.build_request(msg_id, 'tx_input', txinput)
self.jade.write_request(request)
requests.append(request)
time.sleep(0.1)
# Receive all n signatures
signatures = []
for request in requests:
reply = self.jade.read_response()
self.jade.validate_reply(request, reply)
signature = self._get_result_or_raise_error(reply)
signatures.append(signature)
assert len(signatures) == len(inputs)
return signatures
def sign_liquid_tx(self, network, txn, inputs, commitments, change, use_ae_signatures=False,
asset_info=None):
"""
RPC call to sign a liquid transaction.
Parameters
----------
network : str
Network to which the address should apply - eg. 'liquid', 'liquid-testnet', etc.
txn : bytes
The transaction to sign
inputs : [dict]
The tx inputs. Should contain keys:
is_witness, bool - whether this is a segwit input
value_commitment, 33-bytes - The value commitment of ths input
These are only required if signing this input:
script, bytes- the redeem script
path, [int] - the bip32 path to sign with
These are only required for Anti-Exfil signatures:
ae_host_commitment, 32-bytes - The host-commitment for Anti-Exfil signatures
ae_host_entropy, 32-bytes - The host-entropy for Anti-Exfil signatures
commitments : [dict]
An array sized for the number of outputs.
Unblinded outputs should have a 'null' placeholder element.
The commitments as retrieved from `get_commitments()`, with the addition of:
'blinding_key', <bytes> - the output's public blinding key
(as retrieved from `get_blinding_key()`)
change : [dict]
An array sized for the number of outputs.
Outputs which are not change should have a 'null' placeholder element.
Change elements with data will be automatically verified by Jade, and not by the user.
Populated elements should contain sufficient data to generate the change address.
See `get_receive_address()`
use_ae_signatures : bool, optional
Whether to use the anti-exfil protocol to generate the signatures.
Defaults to False.
asset_info : [dict]
Any asset-registry data relevant to the assets being transacted, such that Jade can
display a meaningful name, issuer, ticker etc. rather than just asset-id.
At the very least must contain 'asset_id', 'contract' and 'issuance_prevout' items,
exactly as in the registry data. NOTE: asset_info for the network policy-asset is
not required.
Defaults to None.
Returns
-------
1. if use_ae_signatures is False
[bytes]
An array of signatures corresponding to the array of inputs passed.
The signatures are in DER format with the sighash appended.
'None' placeholder elements are used for inputs not requiring a signature.
2. if use_ae_signatures is True
[(32-bytes, bytes)]
An array of pairs of signer-commitments and signatures corresponding to the inputs.
The signatures are in DER format with the sighash appended.
(None, None) placeholder elements are used for inputs not requiring a signature.
"""
# 1st message contains txn and number of inputs we are going to send.
# Reply ok if that corresponds to the expected number of inputs (n).
base_id = 100 * random.randint(1000, 9999)
params = {'network': network,
'txn': txn,
'num_inputs': len(inputs),
'trusted_commitments': commitments,
'use_ae_signatures': use_ae_signatures,
'change': change,
'asset_info': asset_info}
reply = self._jadeRpc('sign_liquid_tx', params, str(base_id))
assert reply
# Send inputs and receive signatures
return self._send_tx_inputs(base_id, inputs, use_ae_signatures)
def sign_tx(self, network, txn, inputs, change, use_ae_signatures=False):
"""
RPC call to sign a btc transaction.
Parameters
----------
network : str
Network to which the address should apply - eg. 'mainnet', 'testnet', etc.
txn : bytes
The transaction to sign
inputs : [dict]
The tx inputs. Should contain keys:
is_witness, bool - whether this is a segwit input
These are only required if signing this input:
script, bytes- the redeem script
path, [int] - the bip32 path to sign with
One of these is required:
input_tx, bytes - The prior transaction which created the utxo of this input
satoshi, int - The satoshi amount of this input - can be used in place of
'input_tx' for a tx with a single segwit input
These are only required for Anti-Exfil signatures:
ae_host_commitment, 32-bytes - The host-commitment for Anti-Exfil signatures
ae_host_entropy, 32-bytes - The host-entropy for Anti-Exfil signatures
change : [dict]
An array sized for the number of outputs.
Outputs which are not change should have a 'null' placeholder element.
Change elements with data will be automatically verified by Jade, and not by the user.
Populated elements should contain sufficient data to generate the change address.
See `get_receive_address()`
use_ae_signatures : bool
Whether to use the anti-exfil protocol to generate the signatures
Returns
-------
1. if use_ae_signatures is False
[bytes]
An array of signatures corresponding to the array of inputs passed.
The signatures are in DER format with the sighash appended.
'None' placeholder elements are used for inputs not requiring a signature.
2. if use_ae_signatures is True
[(32-bytes, bytes)]
An array of pairs of signer-commitments and signatures corresponding to the inputs.
The signatures are in DER format with the sighash appended.
(None, None) placeholder elements are used for inputs not requiring a signature.
"""
# 1st message contains txn and number of inputs we are going to send.
# Reply ok if that corresponds to the expected number of inputs (n).
base_id = 100 * random.randint(1000, 9999)
params = {'network': network,
'txn': txn,
'num_inputs': len(inputs),
'use_ae_signatures': use_ae_signatures,
'change': change}
reply = self._jadeRpc('sign_tx', params, str(base_id))
assert reply
# Send inputs and receive signatures
return self._send_tx_inputs(base_id, inputs, use_ae_signatures)
class JadeInterface:
"""
Mid-level interface to Jade
Wraps either a serial or a ble connection
Calls to send and receive bytes and cbor messages over the interface.
Either:
a) use wrapped with JadeAPI
(recommended)
or:
b) use with JadeInterface.create_[serial|ble]() as jade:
...
or:
c) use JadeInterface.create_[serial|ble], then call connect() before
using, and disconnect() when finished
(caveat cranium)
or:
d) use ctor to wrap existing low-level implementation instance
(caveat cranium)
"""
def __init__(self, impl):
assert impl is not None
self.impl = impl
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc, tb):
if (exc_type):
logger.error("Exception causing JadeInterface context exit.")
logger.error(exc_type)
logger.error(exc)
traceback.print_tb(tb)
self.disconnect(exc_type is not None)
@staticmethod
def create_serial(device=None, baud=None, timeout=None):
"""
Create a JadeInterface object using the serial interface described.
Parameters
----------
device : str, optional
The device identifier for the serial device.
Underlying implementation will default (to /dev/ttyUSB0)
baud : int, optional
The communication baud rate.
Underlying implementation will default (to 115200)
timeout : int, optional
The serial read timeout when awaiting messages.
Underlying implementation will default (to 120s)
Returns
-------
JadeInterface
Inerface object configured to use given serial parameters.
NOTE: the instance has not yet tried to contact the hw
- caller must call 'connect()' before trying to use the Jade.
"""
if device and JadeTCPImpl.isSupportedDevice(device):
impl = JadeTCPImpl(device)
else:
impl = JadeSerialImpl(device or DEFAULT_SERIAL_DEVICE,
baud or DEFAULT_BAUD_RATE,
timeout or DEFAULT_SERIAL_TIMEOUT)
return JadeInterface(impl)
@staticmethod
def create_ble(device_name=None, serial_number=None,
scan_timeout=None, loop=None):
"""
Create a JadeInterface object using the BLE interface described.
NOTE: raises JadeError if BLE dependencies not installed.
Parameters
----------
device_name : str, optional
The device name of the desired BLE device.
Underlying implementation will default (to 'Jade')
serial_number : int, optional
The serial number of the desired BLE device
- used to disambiguate multiple beacons with the same 'device name'
Underlying implementation will connect to the first beacon it scans
with the matching 'device name'.
scan_timeout : int, optional
The timeout when scanning for devices which match the device name/serial number.
Underlying implementation will default (to 60s)
loop : optional
The asynchio event loop to use, if required.
Underlying implementation will default (to asyncio.get_event_loop())
Returns
-------
JadeInterface
Inerface object configured to use given BLE parameters.
NOTE: the instance has not yet tried to contact the hw
- caller must call 'connect()' before trying to use the Jade.
Raises
------
JadeError if BLE backend not available (ie. BLE dependencies not installed)
"""
this_module = sys.modules[__name__]
if not hasattr(this_module, "JadeBleImpl"):
raise JadeError(1, "BLE support not installed", None)
impl = JadeBleImpl(device_name or DEFAULT_BLE_DEVICE_NAME,
serial_number or DEFAULT_BLE_SERIAL_NUMBER,
scan_timeout or DEFAULT_BLE_SCAN_TIMEOUT,
loop=loop)
return JadeInterface(impl)
def connect(self):
"""
Try to connect the underlying transport interface (eg. serial, ble, etc.)
Raises an exception on failure.
"""
self.impl.connect()
def disconnect(self, drain=False):
"""
Disconnect the underlying transport (eg. serial, ble, etc.)
Parameters
----------
drain : bool, optional
When true log any/all remaining messages/data, otherwise silently discard.
NOTE: can prevent disconnection if data is arriving constantly.
Defaults to False.
"""
if drain:
self.drain()
self.impl.disconnect()
def drain(self):
"""
Log any/all outstanding messages/data.
NOTE: can run indefinitely if data is arriving constantly.
"""
logger.warn("Draining interface...")
drained = bytearray()
finished = False
while not finished:
byte_ = self.impl.read(1)
drained.extend(byte_)
finished = byte_ == b''
if finished or byte_ == b'\n' or len(drained) > 256:
try:
device_logger.warn(drained.decode('utf-8'))
except Exception as e:
# Dump the bytes raw and as hex if decoding as utf-8 failed
device_logger.warn("Raw:")
device_logger.warn(drained)
device_logger.warn("----")
device_logger.warn("Hex dump:")
device_logger.warn(drained.hex())
# Clear and loop to continue collecting
drained.clear()
@staticmethod
def build_request(input_id, method, params=None):
"""
Build a request dict from passed parameters
Parameters
----------
input_id : str
The id of the request message to construct
method : str
rpc method to invoke
params : dict, optional
any parameters to pass to the rpc method
Defaults to None.
Returns
-------
dict
The request object as a dict
"""
request = {"method": method, "id": input_id}
if params is not None:
request["params"] = params
return request
@staticmethod
def serialise_cbor_request(request):
"""
Method to format a request dict as a cbor message
Parameters
----------
request : dict
The request dict
Returns
-------
bytes
The request formatted as cbor message bytes
"""
dump = cbor.dumps(request)
len_dump = len(dump)
if 'method' in request and 'ota_data' in request['method']:
msg = 'Sending ota_data message {} as cbor of size {}'.format(request['id'], len_dump)
logger.info(msg)
else:
logger.info('Sending: {} as cbor of size {}'.format(_hexlify(request), len_dump))
return dump
def write(self, bytes_):
"""
Write bytes over the underlying interface
Parameters
----------
bytes_ : bytes
The bytes to write
Returns
-------
int
The number of bytes written
"""
logger.debug("Sending: {} bytes".format(len(bytes_)))
wrote = self.impl.write(bytes_)
logger.debug("Sent: {} bytes".format(len(bytes_)))
return wrote
def write_request(self, request):
"""
Write a request dict over the underlying interface, formatted as cbor.
Parameters
----------
request : dict
The request dict to write
"""
msg = self.serialise_cbor_request(request)
written = 0
while written < len(msg):
written += self.write(msg[written:])
def read(self, n):
"""
Try to read bytes from the underlying interface.
Returns
-------
bytes
The bytes received
"""
logger.debug("Reading {} bytes...".format(n))
bytes_ = self.impl.read(n)
logger.debug("Received: {} bytes".format(len(bytes_)))
return bytes_
def read_cbor_message(self):
"""
Try to read a single cbor (response) message from the underlying interface.
Respects the any read timeout.
If any 'log' messages are received, logs them locally at the nearest corresponding level
and awaits the next message. Returns when it receives what appears to be a reply message.
Returns
-------
dict
The message received, as a dict
"""
while True:
# 'self' is sufficiently 'file-like' to act as a load source.
# Throws EOFError on end of stream/timeout/lost-connection etc.
message = cbor.load(self)
if isinstance(message, collections.abc.Mapping):
# A message response (to a prior request)
if 'id' in message:
logger.info("Received msg: {}".format(_hexlify(message)))
return message
# A log message - handle as normal
if 'log' in message:
response = message['log']
log_method = device_logger.error
try:
response = message['log'].decode("utf-8")
log_methods = {
'E': device_logger.error,
'W': device_logger.warn,
'I': device_logger.info,
'D': device_logger.debug,
'V': device_logger.debug,
}
if len(response) > 1 and response[1] == ' ':
lvl = response[0]
log_method = log_methods.get(lvl, device_logger.error)
except Exception as e:
logger.error('Error processing log message: {}'.format(e))
log_method('>> {}'.format(response))
continue
# Unknown/unhandled/unexpected message
logger.error("Unhandled message received")
device_logger.error(message)
def read_response(self, long_timeout=False):
"""
Try to read a single cbor (response) message from the underlying interface.
If any 'log' messages are received, logs them locally at the nearest corresponding level
and awaits the next message. Returns when it receives what appears to be a reply message.
If `long_timeout` is false, any read-timeout is respected. If True, the call will block
indefinitely awaiting a response message.
Parameters
----------
long_timeout : bool
Whether to wait indefinitely for the next (response) message.
Returns
-------
dict
The message received, as a dict
"""
while True:
try:
return self.read_cbor_message()
except EOFError as e:
if not long_timeout:
raise
@staticmethod
def validate_reply(request, reply):
"""
Helper to minimally validate a reply message, in the context of a request.
Asserts if the reply does contain the expected minimal fields.
"""
assert isinstance(reply, dict) and 'id' in reply
assert ('result' in reply) != ('error' in reply)
assert reply['id'] == request['id'] or \
reply['id'] == '00' and 'error' in reply
def make_rpc_call(self, request, long_timeout=False):
"""
Method to send a request over the underlying interface, and await a response.
The request is minimally validated before it is sent, and the response is simialrly
validated before being returned.
Any read-timeout is respected unless 'long_timeout' is passed, in which case the call
blocks indefinitely awaiting a response.
Parameters
----------
long_timeout : bool
Whether to wait indefinitely for the response.
Returns
-------
dict
The (minimally validated) response message received, as a dict
"""
# Write outgoing request message
assert isinstance(request, dict)
assert 'id' in request and len(request['id']) > 0
assert 'method' in request and len(request['method']) > 0
assert len(request['id']) < 16 and len(request['method']) < 32
self.write_request(request)
# Read and validate incoming message
reply = self.read_response(long_timeout)
self.validate_reply(request, reply)
return reply