base64.b64decode: always set validate=True
Notably verifymessage and decrypt(message) were silently ignoring trailing garbage or inserted non-base64 characters present in signatures/ciphertext. (both the CLI commands and in the GUI) I think it is much cleaner and preferable to treat such signatures/ciphertext as invalid. In fact I find it surprising that base64.b64decode(validate=False) is the default. Perhaps we should create a helper function for it that set validate=True and use that.
This commit is contained in:
@@ -29,6 +29,7 @@ import time
|
||||
import argparse
|
||||
import json
|
||||
import ast
|
||||
import binascii
|
||||
import base64
|
||||
import asyncio
|
||||
import inspect
|
||||
@@ -906,9 +907,12 @@ class Commands(Logger):
|
||||
|
||||
arg:str:address:Bitcoin address
|
||||
arg:str:message:Clear text message. Use quotes if it contains spaces.
|
||||
arg:str:signature:The signature
|
||||
arg:str:signature:The signature, base64-encoded.
|
||||
"""
|
||||
sig = base64.b64decode(signature)
|
||||
try:
|
||||
sig = base64.b64decode(signature, validate=True)
|
||||
except binascii.Error:
|
||||
return False
|
||||
message = util.to_bytes(message)
|
||||
return bitcoin.verify_usermessage_with_address(address, sig, message)
|
||||
|
||||
|
||||
@@ -473,7 +473,7 @@ def ecies_decrypt_message(
|
||||
*,
|
||||
magic: bytes = b'BIE1',
|
||||
) -> bytes:
|
||||
encrypted = base64.b64decode(encrypted) # type: bytes
|
||||
encrypted = base64.b64decode(encrypted, validate=True) # type: bytes
|
||||
if len(encrypted) < 85:
|
||||
raise Exception('invalid ciphertext: length')
|
||||
magic_found = encrypted[:4]
|
||||
|
||||
@@ -221,7 +221,7 @@ class AuthenticatedServer(Logger):
|
||||
if basic != 'Basic':
|
||||
raise AuthenticationInvalidOrMissing('UnsupportedType')
|
||||
encoded = to_bytes(encoded, 'utf8')
|
||||
credentials = to_string(b64decode(encoded), 'utf8')
|
||||
credentials = to_string(b64decode(encoded, validate=True), 'utf8')
|
||||
username, _, password = credentials.partition(':')
|
||||
if not (constant_time_compare(username, self.rpc_user)
|
||||
and constant_time_compare(password, self.rpc_password)):
|
||||
|
||||
@@ -383,7 +383,7 @@ class QEDaemon(AuthMixin, QObject):
|
||||
return False
|
||||
try:
|
||||
# This can throw on invalid base64
|
||||
sig = base64.b64decode(str(signature.strip()))
|
||||
sig = base64.b64decode(str(signature.strip()), validate=True)
|
||||
verified = verify_usermessage_with_address(address, sig, message)
|
||||
except Exception as e:
|
||||
verified = False
|
||||
|
||||
@@ -2154,7 +2154,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
|
||||
return
|
||||
try:
|
||||
# This can throw on invalid base64
|
||||
sig = base64.b64decode(str(signature.toPlainText()))
|
||||
sig = base64.b64decode(str(signature.toPlainText()), validate=True)
|
||||
verified = bitcoin.verify_usermessage_with_address(address, sig, message)
|
||||
except Exception as e:
|
||||
verified = False
|
||||
|
||||
@@ -122,7 +122,7 @@ class UpdateCheckThread(QThread, Logger):
|
||||
for address, sig in sigs.items():
|
||||
if address not in UpdateCheck.VERSION_ANNOUNCEMENT_SIGNING_KEYS:
|
||||
continue
|
||||
sig = base64.b64decode(sig)
|
||||
sig = base64.b64decode(sig, validate=True)
|
||||
msg = version_num.encode('utf-8')
|
||||
if verify_usermessage_with_address(
|
||||
address=address, sig65=sig, message=msg,
|
||||
|
||||
@@ -424,7 +424,7 @@ class DigitalBitbox_Client(HardwareClientBase):
|
||||
authenticated_msg = base64.b64encode(msg + hmac_digest)
|
||||
reply = self.hid_send_plain(authenticated_msg)
|
||||
if 'ciphertext' in reply:
|
||||
b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"])))
|
||||
b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"]), validate=True))
|
||||
reply_hmac = b64_unencoded[-sha256_byte_len:]
|
||||
hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256)
|
||||
if not hmac.compare_digest(reply_hmac, hmac_calculated):
|
||||
@@ -702,7 +702,7 @@ class DigitalBitboxPlugin(HW_PluginBase):
|
||||
def comserver_post_notification(self, payload, *, handler: 'HardwareHandlerBase'):
|
||||
assert self.is_mobile_paired(), "unexpected mobile pairing error"
|
||||
url = 'https://digitalbitbox.com/smartverification/index.php'
|
||||
key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY])
|
||||
key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY], validate=True)
|
||||
ciphertext = EncodeAES_bytes(key_s, json.dumps(payload).encode('ascii'))
|
||||
args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % (
|
||||
self.digitalbitbox_config[CHANNEL_ID_KEY],
|
||||
|
||||
@@ -192,7 +192,7 @@ class Jade_Client(HardwareClientBase):
|
||||
|
||||
# Signature verification does not work with anti-exfil, so stick with default (rfc6979)
|
||||
sig = self.jade.sign_message(path, message)
|
||||
return base64.b64decode(sig)
|
||||
return base64.b64decode(sig, validate=True)
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def sign_psbt(self, psbt_bytes):
|
||||
|
||||
@@ -45,7 +45,7 @@ class LabelsPlugin(BasePlugin):
|
||||
|
||||
def decode(self, wallet: 'Abstract_Wallet', message: str) -> str:
|
||||
password, iv, wallet_id = self.wallets[wallet]
|
||||
decoded = base64.b64decode(message)
|
||||
decoded = base64.b64decode(message, validate=True)
|
||||
decrypted = aes_decrypt_with_iv(password, iv, decoded)
|
||||
return decrypted.decode('utf8')
|
||||
|
||||
|
||||
@@ -1155,7 +1155,8 @@ class Ledger_Client_New(Ledger_Client):
|
||||
|
||||
result = b''
|
||||
try:
|
||||
result = base64.b64decode(self.client.sign_message(message, address_path))
|
||||
sig_str = self.client.sign_message(message, address_path)
|
||||
result = base64.b64decode(sig_str, validate=True)
|
||||
except DenyError:
|
||||
pass # cancelled by user
|
||||
except BaseException as e:
|
||||
|
||||
@@ -160,7 +160,7 @@ class WalletStorage(Logger):
|
||||
|
||||
def _init_encryption_version(self):
|
||||
try:
|
||||
magic = base64.b64decode(self.raw)[0:4]
|
||||
magic = base64.b64decode(self.raw, validate=True)[0:4]
|
||||
if magic == b'BIE1':
|
||||
return StorageEncryptionVersion.USER_PASSWORD
|
||||
elif magic == b'BIE2':
|
||||
|
||||
@@ -1515,7 +1515,7 @@ def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str:
|
||||
# try base64
|
||||
if raw[0:6] in ('cHNidP', b'cHNidP'): # base64 psbt
|
||||
try:
|
||||
return base64.b64decode(raw).hex()
|
||||
return base64.b64decode(raw, validate=True).hex()
|
||||
except Exception:
|
||||
pass
|
||||
# raw bytes (do not strip whitespaces in this case)
|
||||
@@ -2226,7 +2226,7 @@ class PartialTransaction(Transaction):
|
||||
if raw[0:10].lower() in (b'70736274ff', '70736274ff'): # hex
|
||||
raw = bytes.fromhex(raw)
|
||||
elif raw[0:6] in (b'cHNidP', 'cHNidP'): # base64
|
||||
raw = base64.b64decode(raw)
|
||||
raw = base64.b64decode(raw, validate=True)
|
||||
if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
|
||||
raise BadHeaderMagic("bad magic")
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import binascii
|
||||
import unittest
|
||||
from unittest import mock
|
||||
from decimal import Decimal
|
||||
@@ -137,6 +138,28 @@ class TestCommands(ElectrumTestCase):
|
||||
self.assertEqual(['p2wpkh:L15oxP24NMNAXxq5r2aom24pHPtt3Fet8ZutgL155Bad93GSubM2', 'p2wpkh:L4rYY5QpfN6wJEF4SEKDpcGhTPnCe9zcGs6hiSnhpprZqVywFifN'],
|
||||
await cmds.getprivatekeys(['bc1q3g5tmkmlvxryhh843v4dz026avatc0zzr6h3af', 'bc1q9pzjpjq4nqx5ycnywekcmycqz0wjp2nq604y2n'], wallet=wallet))
|
||||
|
||||
async def test_verifymessage_enforces_strict_base64(self):
|
||||
cmds = Commands(config=self.config)
|
||||
msg = "hello there"
|
||||
addr = "bc1qq2tmmcngng78nllq2pvrkchcdukemtj56uyue0"
|
||||
sig = "HznHvCsY//Zr5JvPIR3rN/RbCkttvrUs8Yt+vw+e1c29BLMSlcrN4+Y4Pq8e/UJuh2bDrUboTfsFhBJap+fPmNY="
|
||||
self.assertTrue(await cmds.verifymessage(addr, sig, msg))
|
||||
self.assertFalse(await cmds.verifymessage(addr, sig+"trailinggarbage", msg))
|
||||
|
||||
@mock.patch.object(wallet.Abstract_Wallet, 'save_db')
|
||||
async def test_decrypt_enforces_strict_base64(self, mock_save_db):
|
||||
cmds = Commands(config=self.config)
|
||||
wallet = restore_wallet_from_text('9dk',
|
||||
gap_limit=2,
|
||||
path='if_this_exists_mocking_failed_648151893',
|
||||
config=self.config)['wallet'] # type: Abstract_Wallet
|
||||
plaintext = "hello there"
|
||||
ciphertext = "QklFMQJEFgxfkXj+UNblbHR+4y6ZA2rGEeEhWo7h84lBFjlRY5JOPfV1zyC1fw5YmhIr7+3ceIV11lpf/Yv7gSqQCQ5Wuf1aGXceHZO0GjKVxBsuew=="
|
||||
pubkey = "02a0507c8bb3d96dfd7731bafb0ae30e6ed10bbadd6a9f9f88eaf0602b9cc99adc"
|
||||
self.assertEqual(plaintext, await cmds.decrypt(pubkey, ciphertext, wallet=wallet))
|
||||
with self.assertRaises(binascii.Error): # perhaps it should raise some nice UserFacingException instead
|
||||
await cmds.decrypt(pubkey, ciphertext+"trailinggarbage", wallet=wallet)
|
||||
|
||||
|
||||
class TestCommandsTestnet(ElectrumTestCase):
|
||||
TESTNET = True
|
||||
|
||||
Reference in New Issue
Block a user