Merge pull request #9902 from SomberNight/202506_base64_trailing_garbage
base64.b64decode: always set validate=True
This commit is contained in:
@@ -29,6 +29,7 @@ import time
|
|||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
import ast
|
import ast
|
||||||
|
import binascii
|
||||||
import base64
|
import base64
|
||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
import inspect
|
||||||
@@ -906,9 +907,12 @@ class Commands(Logger):
|
|||||||
|
|
||||||
arg:str:address:Bitcoin address
|
arg:str:address:Bitcoin address
|
||||||
arg:str:message:Clear text message. Use quotes if it contains spaces.
|
arg:str:message:Clear text message. Use quotes if it contains spaces.
|
||||||
arg:str:signature:The signature
|
arg:str:signature:The signature, base64-encoded.
|
||||||
"""
|
"""
|
||||||
sig = base64.b64decode(signature)
|
try:
|
||||||
|
sig = base64.b64decode(signature, validate=True)
|
||||||
|
except binascii.Error:
|
||||||
|
return False
|
||||||
message = util.to_bytes(message)
|
message = util.to_bytes(message)
|
||||||
return bitcoin.verify_usermessage_with_address(address, sig, message)
|
return bitcoin.verify_usermessage_with_address(address, sig, message)
|
||||||
|
|
||||||
|
|||||||
@@ -473,7 +473,7 @@ def ecies_decrypt_message(
|
|||||||
*,
|
*,
|
||||||
magic: bytes = b'BIE1',
|
magic: bytes = b'BIE1',
|
||||||
) -> bytes:
|
) -> bytes:
|
||||||
encrypted = base64.b64decode(encrypted) # type: bytes
|
encrypted = base64.b64decode(encrypted, validate=True) # type: bytes
|
||||||
if len(encrypted) < 85:
|
if len(encrypted) < 85:
|
||||||
raise Exception('invalid ciphertext: length')
|
raise Exception('invalid ciphertext: length')
|
||||||
magic_found = encrypted[:4]
|
magic_found = encrypted[:4]
|
||||||
|
|||||||
@@ -221,7 +221,7 @@ class AuthenticatedServer(Logger):
|
|||||||
if basic != 'Basic':
|
if basic != 'Basic':
|
||||||
raise AuthenticationInvalidOrMissing('UnsupportedType')
|
raise AuthenticationInvalidOrMissing('UnsupportedType')
|
||||||
encoded = to_bytes(encoded, 'utf8')
|
encoded = to_bytes(encoded, 'utf8')
|
||||||
credentials = to_string(b64decode(encoded), 'utf8')
|
credentials = to_string(b64decode(encoded, validate=True), 'utf8')
|
||||||
username, _, password = credentials.partition(':')
|
username, _, password = credentials.partition(':')
|
||||||
if not (constant_time_compare(username, self.rpc_user)
|
if not (constant_time_compare(username, self.rpc_user)
|
||||||
and constant_time_compare(password, self.rpc_password)):
|
and constant_time_compare(password, self.rpc_password)):
|
||||||
|
|||||||
@@ -383,7 +383,7 @@ class QEDaemon(AuthMixin, QObject):
|
|||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
# This can throw on invalid base64
|
# This can throw on invalid base64
|
||||||
sig = base64.b64decode(str(signature.strip()))
|
sig = base64.b64decode(str(signature.strip()), validate=True)
|
||||||
verified = verify_usermessage_with_address(address, sig, message)
|
verified = verify_usermessage_with_address(address, sig, message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verified = False
|
verified = False
|
||||||
|
|||||||
@@ -2154,7 +2154,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger, QtEventListener):
|
|||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
# This can throw on invalid base64
|
# This can throw on invalid base64
|
||||||
sig = base64.b64decode(str(signature.toPlainText()))
|
sig = base64.b64decode(str(signature.toPlainText()), validate=True)
|
||||||
verified = bitcoin.verify_usermessage_with_address(address, sig, message)
|
verified = bitcoin.verify_usermessage_with_address(address, sig, message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verified = False
|
verified = False
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ class UpdateCheckThread(QThread, Logger):
|
|||||||
for address, sig in sigs.items():
|
for address, sig in sigs.items():
|
||||||
if address not in UpdateCheck.VERSION_ANNOUNCEMENT_SIGNING_KEYS:
|
if address not in UpdateCheck.VERSION_ANNOUNCEMENT_SIGNING_KEYS:
|
||||||
continue
|
continue
|
||||||
sig = base64.b64decode(sig)
|
sig = base64.b64decode(sig, validate=True)
|
||||||
msg = version_num.encode('utf-8')
|
msg = version_num.encode('utf-8')
|
||||||
if verify_usermessage_with_address(
|
if verify_usermessage_with_address(
|
||||||
address=address, sig65=sig, message=msg,
|
address=address, sig65=sig, message=msg,
|
||||||
|
|||||||
@@ -424,7 +424,7 @@ class DigitalBitbox_Client(HardwareClientBase):
|
|||||||
authenticated_msg = base64.b64encode(msg + hmac_digest)
|
authenticated_msg = base64.b64encode(msg + hmac_digest)
|
||||||
reply = self.hid_send_plain(authenticated_msg)
|
reply = self.hid_send_plain(authenticated_msg)
|
||||||
if 'ciphertext' in reply:
|
if 'ciphertext' in reply:
|
||||||
b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"])))
|
b64_unencoded = bytes(base64.b64decode(''.join(reply["ciphertext"]), validate=True))
|
||||||
reply_hmac = b64_unencoded[-sha256_byte_len:]
|
reply_hmac = b64_unencoded[-sha256_byte_len:]
|
||||||
hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256)
|
hmac_calculated = hmac_oneshot(authentication_key, b64_unencoded[:-sha256_byte_len], hashlib.sha256)
|
||||||
if not hmac.compare_digest(reply_hmac, hmac_calculated):
|
if not hmac.compare_digest(reply_hmac, hmac_calculated):
|
||||||
@@ -702,7 +702,7 @@ class DigitalBitboxPlugin(HW_PluginBase):
|
|||||||
def comserver_post_notification(self, payload, *, handler: 'HardwareHandlerBase'):
|
def comserver_post_notification(self, payload, *, handler: 'HardwareHandlerBase'):
|
||||||
assert self.is_mobile_paired(), "unexpected mobile pairing error"
|
assert self.is_mobile_paired(), "unexpected mobile pairing error"
|
||||||
url = 'https://digitalbitbox.com/smartverification/index.php'
|
url = 'https://digitalbitbox.com/smartverification/index.php'
|
||||||
key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY])
|
key_s = base64.b64decode(self.digitalbitbox_config[ENCRYPTION_PRIVKEY_KEY], validate=True)
|
||||||
ciphertext = EncodeAES_bytes(key_s, json.dumps(payload).encode('ascii'))
|
ciphertext = EncodeAES_bytes(key_s, json.dumps(payload).encode('ascii'))
|
||||||
args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % (
|
args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % (
|
||||||
self.digitalbitbox_config[CHANNEL_ID_KEY],
|
self.digitalbitbox_config[CHANNEL_ID_KEY],
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ class Jade_Client(HardwareClientBase):
|
|||||||
|
|
||||||
# Signature verification does not work with anti-exfil, so stick with default (rfc6979)
|
# Signature verification does not work with anti-exfil, so stick with default (rfc6979)
|
||||||
sig = self.jade.sign_message(path, message)
|
sig = self.jade.sign_message(path, message)
|
||||||
return base64.b64decode(sig)
|
return base64.b64decode(sig, validate=True)
|
||||||
|
|
||||||
@runs_in_hwd_thread
|
@runs_in_hwd_thread
|
||||||
def sign_psbt(self, psbt_bytes):
|
def sign_psbt(self, psbt_bytes):
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ class LabelsPlugin(BasePlugin):
|
|||||||
|
|
||||||
def decode(self, wallet: 'Abstract_Wallet', message: str) -> str:
|
def decode(self, wallet: 'Abstract_Wallet', message: str) -> str:
|
||||||
password, iv, wallet_id = self.wallets[wallet]
|
password, iv, wallet_id = self.wallets[wallet]
|
||||||
decoded = base64.b64decode(message)
|
decoded = base64.b64decode(message, validate=True)
|
||||||
decrypted = aes_decrypt_with_iv(password, iv, decoded)
|
decrypted = aes_decrypt_with_iv(password, iv, decoded)
|
||||||
return decrypted.decode('utf8')
|
return decrypted.decode('utf8')
|
||||||
|
|
||||||
|
|||||||
@@ -1155,7 +1155,8 @@ class Ledger_Client_New(Ledger_Client):
|
|||||||
|
|
||||||
result = b''
|
result = b''
|
||||||
try:
|
try:
|
||||||
result = base64.b64decode(self.client.sign_message(message, address_path))
|
sig_str = self.client.sign_message(message, address_path)
|
||||||
|
result = base64.b64decode(sig_str, validate=True)
|
||||||
except DenyError:
|
except DenyError:
|
||||||
pass # cancelled by user
|
pass # cancelled by user
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
|
|||||||
@@ -160,7 +160,7 @@ class WalletStorage(Logger):
|
|||||||
|
|
||||||
def _init_encryption_version(self):
|
def _init_encryption_version(self):
|
||||||
try:
|
try:
|
||||||
magic = base64.b64decode(self.raw)[0:4]
|
magic = base64.b64decode(self.raw, validate=True)[0:4]
|
||||||
if magic == b'BIE1':
|
if magic == b'BIE1':
|
||||||
return StorageEncryptionVersion.USER_PASSWORD
|
return StorageEncryptionVersion.USER_PASSWORD
|
||||||
elif magic == b'BIE2':
|
elif magic == b'BIE2':
|
||||||
|
|||||||
@@ -1515,7 +1515,7 @@ def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str:
|
|||||||
# try base64
|
# try base64
|
||||||
if raw[0:6] in ('cHNidP', b'cHNidP'): # base64 psbt
|
if raw[0:6] in ('cHNidP', b'cHNidP'): # base64 psbt
|
||||||
try:
|
try:
|
||||||
return base64.b64decode(raw).hex()
|
return base64.b64decode(raw, validate=True).hex()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
# raw bytes (do not strip whitespaces in this case)
|
# raw bytes (do not strip whitespaces in this case)
|
||||||
@@ -2226,7 +2226,7 @@ class PartialTransaction(Transaction):
|
|||||||
if raw[0:10].lower() in (b'70736274ff', '70736274ff'): # hex
|
if raw[0:10].lower() in (b'70736274ff', '70736274ff'): # hex
|
||||||
raw = bytes.fromhex(raw)
|
raw = bytes.fromhex(raw)
|
||||||
elif raw[0:6] in (b'cHNidP', 'cHNidP'): # base64
|
elif raw[0:6] in (b'cHNidP', 'cHNidP'): # base64
|
||||||
raw = base64.b64decode(raw)
|
raw = base64.b64decode(raw, validate=True)
|
||||||
if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
|
if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
|
||||||
raise BadHeaderMagic("bad magic")
|
raise BadHeaderMagic("bad magic")
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import binascii
|
||||||
import unittest
|
import unittest
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
@@ -137,6 +138,28 @@ class TestCommands(ElectrumTestCase):
|
|||||||
self.assertEqual(['p2wpkh:L15oxP24NMNAXxq5r2aom24pHPtt3Fet8ZutgL155Bad93GSubM2', 'p2wpkh:L4rYY5QpfN6wJEF4SEKDpcGhTPnCe9zcGs6hiSnhpprZqVywFifN'],
|
self.assertEqual(['p2wpkh:L15oxP24NMNAXxq5r2aom24pHPtt3Fet8ZutgL155Bad93GSubM2', 'p2wpkh:L4rYY5QpfN6wJEF4SEKDpcGhTPnCe9zcGs6hiSnhpprZqVywFifN'],
|
||||||
await cmds.getprivatekeys(['bc1q3g5tmkmlvxryhh843v4dz026avatc0zzr6h3af', 'bc1q9pzjpjq4nqx5ycnywekcmycqz0wjp2nq604y2n'], wallet=wallet))
|
await cmds.getprivatekeys(['bc1q3g5tmkmlvxryhh843v4dz026avatc0zzr6h3af', 'bc1q9pzjpjq4nqx5ycnywekcmycqz0wjp2nq604y2n'], wallet=wallet))
|
||||||
|
|
||||||
|
async def test_verifymessage_enforces_strict_base64(self):
|
||||||
|
cmds = Commands(config=self.config)
|
||||||
|
msg = "hello there"
|
||||||
|
addr = "bc1qq2tmmcngng78nllq2pvrkchcdukemtj56uyue0"
|
||||||
|
sig = "HznHvCsY//Zr5JvPIR3rN/RbCkttvrUs8Yt+vw+e1c29BLMSlcrN4+Y4Pq8e/UJuh2bDrUboTfsFhBJap+fPmNY="
|
||||||
|
self.assertTrue(await cmds.verifymessage(addr, sig, msg))
|
||||||
|
self.assertFalse(await cmds.verifymessage(addr, sig+"trailinggarbage", msg))
|
||||||
|
|
||||||
|
@mock.patch.object(wallet.Abstract_Wallet, 'save_db')
|
||||||
|
async def test_decrypt_enforces_strict_base64(self, mock_save_db):
|
||||||
|
cmds = Commands(config=self.config)
|
||||||
|
wallet = restore_wallet_from_text('9dk',
|
||||||
|
gap_limit=2,
|
||||||
|
path='if_this_exists_mocking_failed_648151893',
|
||||||
|
config=self.config)['wallet'] # type: Abstract_Wallet
|
||||||
|
plaintext = "hello there"
|
||||||
|
ciphertext = "QklFMQJEFgxfkXj+UNblbHR+4y6ZA2rGEeEhWo7h84lBFjlRY5JOPfV1zyC1fw5YmhIr7+3ceIV11lpf/Yv7gSqQCQ5Wuf1aGXceHZO0GjKVxBsuew=="
|
||||||
|
pubkey = "02a0507c8bb3d96dfd7731bafb0ae30e6ed10bbadd6a9f9f88eaf0602b9cc99adc"
|
||||||
|
self.assertEqual(plaintext, await cmds.decrypt(pubkey, ciphertext, wallet=wallet))
|
||||||
|
with self.assertRaises(binascii.Error): # perhaps it should raise some nice UserFacingException instead
|
||||||
|
await cmds.decrypt(pubkey, ciphertext+"trailinggarbage", wallet=wallet)
|
||||||
|
|
||||||
|
|
||||||
class TestCommandsTestnet(ElectrumTestCase):
|
class TestCommandsTestnet(ElectrumTestCase):
|
||||||
TESTNET = True
|
TESTNET = True
|
||||||
|
|||||||
Reference in New Issue
Block a user