1
0

use libsecp256k1 if available. abstract away ecc stuff. move symmetric crypto and hash functions to crypto.py

This commit is contained in:
SomberNight
2018-05-24 18:57:13 +02:00
parent 756cc323e7
commit 16e4827e8c
28 changed files with 1085 additions and 537 deletions

View File

@@ -1,21 +1,26 @@
import base64
import unittest
import sys
from ecdsa.util import number_to_string
from lib import bitcoin
from lib.bitcoin import (
generator_secp256k1, point_to_ser, public_key_to_p2pkh, EC_KEY,
bip32_root, bip32_public_derivation, bip32_private_derivation, pw_encode,
pw_decode, Hash, public_key_from_private_key, address_from_private_key,
public_key_to_p2pkh,
bip32_root, bip32_public_derivation, bip32_private_derivation,
Hash, address_from_private_key,
is_address, is_private_key, xpub_from_xprv, is_new_seed, is_old_seed,
var_int, op_push, address_to_script, regenerate_key,
verify_message, deserialize_privkey, serialize_privkey, is_segwit_address,
var_int, op_push, address_to_script,
deserialize_privkey, serialize_privkey, is_segwit_address,
is_b58_address, address_to_scripthash, is_minikey, is_compressed, is_xpub,
xpub_type, is_xprv, is_bip32_derivation, seed_type, EncodeBase58Check,
script_num_to_hex, push_script, add_number_to_script)
from lib import ecc, crypto, ecc_fast
from lib.ecc import number_to_string, string_to_number
from lib.transaction import opcodes
from lib.util import bfh, bh2u
from lib import constants
from lib.storage import WalletStorage
from . import SequentialTestCase
from . import TestCaseForTestnet
@@ -26,27 +31,54 @@ except ImportError:
sys.exit("Error: python-ecdsa does not seem to be installed. Try 'sudo pip install ecdsa'")
class Test_bitcoin(unittest.TestCase):
def needs_test_with_all_ecc_implementations(func):
"""Function decorator to run a unit test twice:
once when libsecp256k1 is not available, once when it is.
NOTE: this is inherently sequential;
tests running in parallel would break things
"""
def run_test(*args, **kwargs):
ecc_fast.undo_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1()
try:
# first test without libsecp
func(*args, **kwargs)
finally:
# if libsecp is not available, we are done
if not ecc_fast._libsecp256k1:
return
ecc_fast.do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1()
# if libsecp is available, test again now
func(*args, **kwargs)
return run_test
class Test_bitcoin(SequentialTestCase):
def test_libsecp256k1_is_available(self):
# we want the unit testing framework to test with libsecp256k1 available.
self.assertTrue(bool(ecc_fast._libsecp256k1))
@needs_test_with_all_ecc_implementations
def test_crypto(self):
for message in [b"Chancellor on brink of second bailout for banks", b'\xff'*512]:
self._do_test_crypto(message)
def _do_test_crypto(self, message):
G = generator_secp256k1
G = ecc.generator()
_r = G.order()
pvk = ecdsa.util.randrange( pow(2,256) ) %_r
pvk = ecdsa.util.randrange(_r)
Pub = pvk*G
pubkey_c = point_to_ser(Pub,True)
pubkey_c = Pub.get_public_key_bytes(True)
#pubkey_u = point_to_ser(Pub,False)
addr_c = public_key_to_p2pkh(pubkey_c)
#print "Private key ", '%064x'%pvk
eck = EC_KEY(number_to_string(pvk,_r))
eck = ecc.ECPrivkey(number_to_string(pvk,_r))
#print "Compressed public key ", pubkey_c.encode('hex')
enc = EC_KEY.encrypt_message(message, pubkey_c)
enc = ecc.ECPubkey(pubkey_c).encrypt_message(message)
dec = eck.decrypt_message(enc)
self.assertEqual(message, dec)
@@ -57,15 +89,16 @@ class Test_bitcoin(unittest.TestCase):
signature = eck.sign_message(message, True)
#print signature
EC_KEY.verify_message(eck, signature, message)
eck.verify_message_for_address(signature, message)
@needs_test_with_all_ecc_implementations
def test_msg_signing(self):
msg1 = b'Chancellor on brink of second bailout for banks'
msg2 = b'Electrum'
def sign_message_with_wif_privkey(wif_privkey, msg):
txin_type, privkey, compressed = deserialize_privkey(wif_privkey)
key = regenerate_key(privkey)
key = ecc.ECPrivkey(privkey)
return key.sign_message(msg, compressed)
sig1 = sign_message_with_wif_privkey(
@@ -81,30 +114,61 @@ class Test_bitcoin(unittest.TestCase):
self.assertEqual(sig1_b64, b'H/9jMOnj4MFbH3d7t4yCQ9i7DgZU/VZ278w3+ySv2F4yIsdqjsc5ng3kmN8OZAThgyfCZOQxZCWza9V5XzlVY0Y=')
self.assertEqual(sig2_b64, b'G84dmJ8TKIDKMT9qBRhpX2sNmR0y5t+POcYnFFJCs66lJmAs3T8A6Sbpx7KA6yTQ9djQMabwQXRrDomOkIKGn18=')
self.assertTrue(verify_message(addr1, sig1, msg1))
self.assertTrue(verify_message(addr2, sig2, msg2))
self.assertTrue(ecc.verify_message_with_address(addr1, sig1, msg1))
self.assertTrue(ecc.verify_message_with_address(addr2, sig2, msg2))
self.assertFalse(verify_message(addr1, b'wrong', msg1))
self.assertFalse(verify_message(addr1, sig2, msg1))
self.assertFalse(ecc.verify_message_with_address(addr1, b'wrong', msg1))
self.assertFalse(ecc.verify_message_with_address(addr1, sig2, msg1))
@needs_test_with_all_ecc_implementations
def test_decrypt_message(self):
key = WalletStorage.get_eckey_from_password('pw123')
self.assertEqual(b'me<(s_s)>age', key.decrypt_message(b'QklFMQMDFtgT3zWSQsa+Uie8H/WvfUjlu9UN9OJtTt3KlgKeSTi6SQfuhcg1uIz9hp3WIUOFGTLr4RNQBdjPNqzXwhkcPi2Xsbiw6UCNJncVPJ6QBg=='))
self.assertEqual(b'me<(s_s)>age', key.decrypt_message(b'QklFMQKXOXbylOQTSMGfo4MFRwivAxeEEkewWQrpdYTzjPhqjHcGBJwdIhB7DyRfRQihuXx1y0ZLLv7XxLzrILzkl/H4YUtZB4uWjuOAcmxQH4i/Og=='))
self.assertEqual(b'hey_there' * 100, key.decrypt_message(b'QklFMQLOOsabsXtGQH8edAa6VOUa5wX8/DXmxX9NyHoAx1a5bWgllayGRVPeI2bf0ZdWK0tfal0ap0ZIVKbd2eOJybqQkILqT6E1/Syzq0Zicyb/AA1eZNkcX5y4gzloxinw00ubCA8M7gcUjJpOqbnksATcJ5y2YYXcHMGGfGurWu6uJ/UyrNobRidWppRMW5yR9/6utyNvT6OHIolCMEf7qLcmtneoXEiz51hkRdZS7weNf9mGqSbz9a2NL3sdh1A0feHIjAZgcCKcAvksNUSauf0/FnIjzTyPRpjRDMeDC8Ci3sGiuO3cvpWJwhZfbjcS26KmBv2CHWXfRRNFYOInHZNIXWNAoBB47Il5bGSMd+uXiGr+SQ9tNvcu+BiJNmFbxYqg+oQ8dGAl1DtvY2wJVY8k7vO9BIWSpyIxfGw7EDifhc5vnOmGe016p6a01C3eVGxgl23UYMrP7+fpjOcPmTSF4rk5U5ljEN3MSYqlf1QEv0OqlI9q1TwTK02VBCjMTYxDHsnt04OjNBkNO8v5uJ4NR+UUDBEp433z53I59uawZ+dbk4v4ZExcl8EGmKm3Gzbal/iJ/F7KQuX2b/ySEhLOFVYFWxK73X1nBvCSK2mC2/8fCw8oI5pmvzJwQhcCKTdEIrz3MMvAHqtPScDUOjzhXxInQOCb3+UBj1PPIdqkYLvZss1TEaBwYZjLkVnK2MBj7BaqT6Rp6+5A/fippUKHsnB6eYMEPR2YgDmCHL+4twxHJG6UWdP3ybaKiiAPy2OHNP6PTZ0HrqHOSJzBSDD+Z8YpaRg29QX3UEWlqnSKaan0VYAsV1VeaN0XFX46/TWO0L5tjhYVXJJYGqo6tIQJymxATLFRF6AZaD1Mwd27IAL04WkmoQoXfO6OFfwdp/shudY/1gBkDBvGPICBPtnqkvhGF+ZF3IRkuPwiFWeXmwBxKHsRx/3+aJu32Ml9+za41zVk2viaxcGqwTc5KMexQFLAUwqhv+aIik7U+5qk/gEVSuRoVkihoweFzKolNF+BknH2oB4rZdPixag5Zje3DvgjsSFlOl69W/67t/Gs8htfSAaHlsB8vWRQr9+v/lxTbrAw+O0E+sYGoObQ4qQMyQshNZEHbpPg63eWiHtJJnrVBvOeIbIHzoLDnMDsWVWZSMzAQ1vhX1H5QLgSEbRlKSliVY03kDkh/Nk/KOn+B2q37Ialq4JcRoIYFGJ8AoYEAD0tRuTqFddIclE75HzwaNG7NyKW1plsa72ciOPwsPJsdd5F0qdSQ3OSKtooTn7uf6dXOc4lDkfrVYRlZ0PX'))
@needs_test_with_all_ecc_implementations
def test_encrypt_message(self):
key = WalletStorage.get_eckey_from_password('secret_password77')
msgs = [
bytes([0] * 555),
b'cannot think of anything funny'
]
for plaintext in msgs:
ciphertext1 = key.encrypt_message(plaintext)
ciphertext2 = key.encrypt_message(plaintext)
self.assertEqual(plaintext, key.decrypt_message(ciphertext1))
self.assertEqual(plaintext, key.decrypt_message(ciphertext2))
self.assertNotEqual(ciphertext1, ciphertext2)
@needs_test_with_all_ecc_implementations
def test_sign_transaction(self):
eckey1 = ecc.ECPrivkey(bfh('7e1255fddb52db1729fc3ceb21a46f95b8d9fe94cc83425e936a6c5223bb679d'))
sig1 = eckey1.sign_transaction(bfh('5a548b12369a53faaa7e51b5081829474ebdd9c924b3a8230b69aa0be254cd94'))
self.assertEqual(bfh('3045022100902a288b98392254cd23c0e9a49ac6d7920f171b8249a48e484b998f1874a2010220723d844826828f092cf400cb210c4fa0b8cd1b9d1a7f21590e78e022ff6476b9'), sig1)
eckey2 = ecc.ECPrivkey(bfh('c7ce8c1462c311eec24dff9e2532ac6241e50ae57e7d1833af21942136972f23'))
sig2 = eckey2.sign_transaction(bfh('642a2e66332f507c92bda910158dfe46fc10afbf72218764899d3af99a043fac'))
self.assertEqual(bfh('30440220618513f4cfc87dde798ce5febae7634c23e7b9254a1eabf486be820f6a7c2c4702204fef459393a2b931f949e63ced06888f35e286e446dc46feb24b5b5f81c6ed52'), sig2)
def test_aes_homomorphic(self):
"""Make sure AES is homomorphic."""
payload = u'\u66f4\u7a33\u5b9a\u7684\u4ea4\u6613\u5e73\u53f0'
password = u'secret'
enc = pw_encode(payload, password)
dec = pw_decode(enc, password)
enc = crypto.pw_encode(payload, password)
dec = crypto.pw_decode(enc, password)
self.assertEqual(dec, payload)
def test_aes_encode_without_password(self):
"""When not passed a password, pw_encode is noop on the payload."""
payload = u'\u66f4\u7a33\u5b9a\u7684\u4ea4\u6613\u5e73\u53f0'
enc = pw_encode(payload, None)
enc = crypto.pw_encode(payload, None)
self.assertEqual(payload, enc)
def test_aes_deencode_without_password(self):
"""When not passed a password, pw_decode is noop on the payload."""
payload = u'\u66f4\u7a33\u5b9a\u7684\u4ea4\u6613\u5e73\u53f0'
enc = pw_decode(payload, None)
enc = crypto.pw_decode(payload, None)
self.assertEqual(payload, enc)
def test_aes_decode_with_invalid_password(self):
@@ -112,8 +176,8 @@ class Test_bitcoin(unittest.TestCase):
payload = u"blah"
password = u"uber secret"
wrong_password = u"not the password"
enc = pw_encode(payload, password)
self.assertRaises(Exception, pw_decode, enc, wrong_password)
enc = crypto.pw_encode(payload, password)
self.assertRaises(Exception, crypto.pw_decode, enc, wrong_password)
def test_hash(self):
"""Make sure the Hash function does sha256 twice"""
@@ -238,7 +302,7 @@ class Test_bitcoin_testnet(TestCaseForTestnet):
self.assertEqual(address_to_script('2NE4ZdmxFmUgwu5wtfoN2gVniyMgRDYq1kk'), 'a914e4567743d378957cd2ee7072da74b1203c1a7a0b87')
class Test_xprv_xpub(unittest.TestCase):
class Test_xprv_xpub(SequentialTestCase):
xprv_xpub = (
# Taken from test vectors in https://en.bitcoin.it/wiki/BIP_0032_TestVectors
@@ -269,6 +333,7 @@ class Test_xprv_xpub(unittest.TestCase):
return xpub, xprv
@needs_test_with_all_ecc_implementations
def test_bip32(self):
# see https://en.bitcoin.it/wiki/BIP_0032_TestVectors
xpub, xprv = self._do_test_bip32("000102030405060708090a0b0c0d0e0f", "m/0'/1/2'/2/1000000000")
@@ -279,12 +344,14 @@ class Test_xprv_xpub(unittest.TestCase):
self.assertEqual("xpub6FnCn6nSzZAw5Tw7cgR9bi15UV96gLZhjDstkXXxvCLsUXBGXPdSnLFbdpq8p9HmGsApME5hQTZ3emM2rnY5agb9rXpVGyy3bdW6EEgAtqt", xpub)
self.assertEqual("xprvA2nrNbFZABcdryreWet9Ea4LvTJcGsqrMzxHx98MMrotbir7yrKCEXw7nadnHM8Dq38EGfSh6dqA9QWTyefMLEcBYJUuekgW4BYPJcr9E7j", xprv)
@needs_test_with_all_ecc_implementations
def test_xpub_from_xprv(self):
"""We can derive the xpub key from a xprv."""
for xprv_details in self.xprv_xpub:
result = xpub_from_xprv(xprv_details['xprv'])
self.assertEqual(result, xprv_details['xpub'])
@needs_test_with_all_ecc_implementations
def test_is_xpub(self):
for xprv_details in self.xprv_xpub:
xpub = xprv_details['xpub']
@@ -292,11 +359,13 @@ class Test_xprv_xpub(unittest.TestCase):
self.assertFalse(is_xpub('xpub1nval1d'))
self.assertFalse(is_xpub('xpub661MyMwAqRbcFWohJWt7PHsFEJfZAvw9ZxwQoDa4SoMgsDDM1T7WK3u9E4edkC4ugRnZ8E4xDZRpk8Rnts3Nbt97dPwT52WRONGBADWRONG'))
@needs_test_with_all_ecc_implementations
def test_xpub_type(self):
for xprv_details in self.xprv_xpub:
xpub = xprv_details['xpub']
self.assertEqual(xprv_details['xtype'], xpub_type(xpub))
@needs_test_with_all_ecc_implementations
def test_is_xprv(self):
for xprv_details in self.xprv_xpub:
xprv = xprv_details['xprv']
@@ -388,7 +457,7 @@ class Test_xprv_xpub_testnet(TestCaseForTestnet):
self.assertTrue(xkey_b58.startswith(xpub_headers_b58[xtype]))
class Test_keyImport(unittest.TestCase):
class Test_keyImport(SequentialTestCase):
priv_pub_addr = (
{'priv': 'KzMFjMC2MPadjvX5Cd7b8AKKjjpBSoRKUTpoAtN6B3J9ezWYyXS6',
@@ -475,19 +544,22 @@ class Test_keyImport(unittest.TestCase):
'scripthash': '60ad5a8b922f758cd7884403e90ee7e6f093f8d21a0ff24c9a865e695ccefdf1'},
)
@needs_test_with_all_ecc_implementations
def test_public_key_from_private_key(self):
for priv_details in self.priv_pub_addr:
txin_type, privkey, compressed = deserialize_privkey(priv_details['priv'])
result = public_key_from_private_key(privkey, compressed)
result = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
self.assertEqual(priv_details['pub'], result)
self.assertEqual(priv_details['txin_type'], txin_type)
self.assertEqual(priv_details['compressed'], compressed)
@needs_test_with_all_ecc_implementations
def test_address_from_private_key(self):
for priv_details in self.priv_pub_addr:
addr2 = address_from_private_key(priv_details['priv'])
self.assertEqual(priv_details['address'], addr2)
@needs_test_with_all_ecc_implementations
def test_is_valid_address(self):
for priv_details in self.priv_pub_addr:
addr = priv_details['address']
@@ -503,6 +575,7 @@ class Test_keyImport(unittest.TestCase):
self.assertFalse(is_address("not an address"))
@needs_test_with_all_ecc_implementations
def test_is_private_key(self):
for priv_details in self.priv_pub_addr:
self.assertTrue(is_private_key(priv_details['priv']))
@@ -511,30 +584,34 @@ class Test_keyImport(unittest.TestCase):
self.assertFalse(is_private_key(priv_details['address']))
self.assertFalse(is_private_key("not a privkey"))
@needs_test_with_all_ecc_implementations
def test_serialize_privkey(self):
for priv_details in self.priv_pub_addr:
txin_type, privkey, compressed = deserialize_privkey(priv_details['priv'])
priv2 = serialize_privkey(privkey, compressed, txin_type)
self.assertEqual(priv_details['exported_privkey'], priv2)
@needs_test_with_all_ecc_implementations
def test_address_to_scripthash(self):
for priv_details in self.priv_pub_addr:
sh = address_to_scripthash(priv_details['address'])
self.assertEqual(priv_details['scripthash'], sh)
@needs_test_with_all_ecc_implementations
def test_is_minikey(self):
for priv_details in self.priv_pub_addr:
minikey = priv_details['minikey']
priv = priv_details['priv']
self.assertEqual(minikey, is_minikey(priv))
@needs_test_with_all_ecc_implementations
def test_is_compressed(self):
for priv_details in self.priv_pub_addr:
self.assertEqual(priv_details['compressed'],
is_compressed(priv_details['priv']))
class Test_seeds(unittest.TestCase):
class Test_seeds(SequentialTestCase):
""" Test old and new seeds. """
mnemonics = {