plugins/digitalbitbox: compatibility with firmware v5.0.0
This commit is contained in:
@@ -4,7 +4,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from electrum.crypto import sha256d, EncodeAES_base64, DecodeAES_base64
|
from electrum.crypto import sha256d, EncodeAES_base64, EncodeAES_bytes, DecodeAES_bytes, hmac_oneshot
|
||||||
from electrum.bitcoin import (TYPE_ADDRESS, push_script, var_int, public_key_to_p2pkh,
|
from electrum.bitcoin import (TYPE_ADDRESS, push_script, var_int, public_key_to_p2pkh,
|
||||||
is_address)
|
is_address)
|
||||||
from electrum.bip32 import serialize_xpub, deserialize_xpub
|
from electrum.bip32 import serialize_xpub, deserialize_xpub
|
||||||
@@ -30,6 +30,8 @@ try:
|
|||||||
import base64
|
import base64
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import re
|
||||||
|
import hmac
|
||||||
DIGIBOX = True
|
DIGIBOX = True
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
DIGIBOX = False
|
DIGIBOX = False
|
||||||
@@ -43,6 +45,14 @@ except ImportError as e:
|
|||||||
def to_hexstr(s):
|
def to_hexstr(s):
|
||||||
return binascii.hexlify(s).decode('ascii')
|
return binascii.hexlify(s).decode('ascii')
|
||||||
|
|
||||||
|
|
||||||
|
def derive_keys(x):
|
||||||
|
h = sha256d(x)
|
||||||
|
h = hashlib.sha512(h).digest()
|
||||||
|
return (h[:32],h[32:])
|
||||||
|
|
||||||
|
MIN_MAJOR_VERSION = 5
|
||||||
|
|
||||||
class DigitalBitbox_Client():
|
class DigitalBitbox_Client():
|
||||||
|
|
||||||
def __init__(self, plugin, hidDevice):
|
def __init__(self, plugin, hidDevice):
|
||||||
@@ -110,7 +120,6 @@ class DigitalBitbox_Client():
|
|||||||
else:
|
else:
|
||||||
raise Exception('no reply')
|
raise Exception('no reply')
|
||||||
|
|
||||||
|
|
||||||
def dbb_has_password(self):
|
def dbb_has_password(self):
|
||||||
reply = self.hid_send_plain(b'{"ping":""}')
|
reply = self.hid_send_plain(b'{"ping":""}')
|
||||||
if 'ping' not in reply:
|
if 'ping' not in reply:
|
||||||
@@ -121,7 +130,6 @@ class DigitalBitbox_Client():
|
|||||||
|
|
||||||
|
|
||||||
def stretch_key(self, key):
|
def stretch_key(self, key):
|
||||||
import hmac
|
|
||||||
return to_hexstr(hashlib.pbkdf2_hmac('sha512', key.encode('utf-8'), b'Digital Bitbox', iterations = 20480))
|
return to_hexstr(hashlib.pbkdf2_hmac('sha512', key.encode('utf-8'), b'Digital Bitbox', iterations = 20480))
|
||||||
|
|
||||||
|
|
||||||
@@ -158,6 +166,12 @@ class DigitalBitbox_Client():
|
|||||||
|
|
||||||
|
|
||||||
def check_device_dialog(self):
|
def check_device_dialog(self):
|
||||||
|
match = re.search(r'v([0-9])+\.[0-9]+\.[0-9]+', self.dbb_hid.get_serial_number_string())
|
||||||
|
if match is None:
|
||||||
|
raise Exception("error detecting firmware version")
|
||||||
|
major_version = int(match.group(1))
|
||||||
|
if major_version < MIN_MAJOR_VERSION:
|
||||||
|
raise Exception("Please upgrade to the newest firmware using the BitBox Desktop app: https://shiftcrypto.ch/start")
|
||||||
# Set password if fresh device
|
# Set password if fresh device
|
||||||
if self.password is None and not self.dbb_has_password():
|
if self.password is None and not self.dbb_has_password():
|
||||||
if not self.setupRunning:
|
if not self.setupRunning:
|
||||||
@@ -393,13 +407,21 @@ class DigitalBitbox_Client():
|
|||||||
|
|
||||||
|
|
||||||
def hid_send_encrypt(self, msg):
|
def hid_send_encrypt(self, msg):
|
||||||
|
sha256_byte_len = 32
|
||||||
reply = ""
|
reply = ""
|
||||||
try:
|
try:
|
||||||
secret = sha256d(self.password)
|
encryption_key, authentication_key = derive_keys(self.password)
|
||||||
msg = EncodeAES_base64(secret, msg)
|
msg = EncodeAES_bytes(encryption_key, msg)
|
||||||
reply = self.hid_send_plain(msg)
|
hmac_digest = hmac_oneshot(authentication_key, msg, hashlib.sha256)
|
||||||
|
authenticated_msg = base64.b64encode(msg + hmac_digest)
|
||||||
|
reply = self.hid_send_plain(authenticated_msg)
|
||||||
if 'ciphertext' in reply:
|
if 'ciphertext' in reply:
|
||||||
reply = DecodeAES_base64(secret, ''.join(reply["ciphertext"]))
|
b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"])))
|
||||||
|
reply_hmac = b64_unencoded[-sha256_byte_len:]
|
||||||
|
hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256)
|
||||||
|
if not hmac.compare_digest(reply_hmac, hmac_calculated):
|
||||||
|
raise Exception("Failed to validate HMAC")
|
||||||
|
reply = DecodeAES_bytes(encryption_key, b64_unencoded[:-sha256_byte_len])
|
||||||
reply = to_string(reply, 'utf8')
|
reply = to_string(reply, 'utf8')
|
||||||
reply = json.loads(reply)
|
reply = json.loads(reply)
|
||||||
if 'error' in reply:
|
if 'error' in reply:
|
||||||
|
|||||||
Reference in New Issue
Block a user