file reorganization with top-level module
This commit is contained in:
151
electrum/crypto.py
Normal file
151
electrum/crypto.py
Normal file
@@ -0,0 +1,151 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Electrum - lightweight Bitcoin client
|
||||
# Copyright (C) 2018 The Electrum developers
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation files
|
||||
# (the "Software"), to deal in the Software without restriction,
|
||||
# including without limitation the rights to use, copy, modify, merge,
|
||||
# publish, distribute, sublicense, and/or sell copies of the Software,
|
||||
# and to permit persons to whom the Software is furnished to do so,
|
||||
# subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
import base64
|
||||
import os
|
||||
import hashlib
|
||||
import hmac
|
||||
|
||||
import pyaes
|
||||
|
||||
from .util import assert_bytes, InvalidPassword, to_bytes, to_string
|
||||
|
||||
|
||||
try:
|
||||
from Cryptodome.Cipher import AES
|
||||
except:
|
||||
AES = None
|
||||
|
||||
|
||||
class InvalidPadding(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def append_PKCS7_padding(data):
|
||||
assert_bytes(data)
|
||||
padlen = 16 - (len(data) % 16)
|
||||
return data + bytes([padlen]) * padlen
|
||||
|
||||
|
||||
def strip_PKCS7_padding(data):
|
||||
assert_bytes(data)
|
||||
if len(data) % 16 != 0 or len(data) == 0:
|
||||
raise InvalidPadding("invalid length")
|
||||
padlen = data[-1]
|
||||
if padlen > 16:
|
||||
raise InvalidPadding("invalid padding byte (large)")
|
||||
for i in data[-padlen:]:
|
||||
if i != padlen:
|
||||
raise InvalidPadding("invalid padding byte (inconsistent)")
|
||||
return data[0:-padlen]
|
||||
|
||||
|
||||
def aes_encrypt_with_iv(key, iv, data):
|
||||
assert_bytes(key, iv, data)
|
||||
data = append_PKCS7_padding(data)
|
||||
if AES:
|
||||
e = AES.new(key, AES.MODE_CBC, iv).encrypt(data)
|
||||
else:
|
||||
aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
|
||||
aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
|
||||
e = aes.feed(data) + aes.feed() # empty aes.feed() flushes buffer
|
||||
return e
|
||||
|
||||
|
||||
def aes_decrypt_with_iv(key, iv, data):
|
||||
assert_bytes(key, iv, data)
|
||||
if AES:
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
data = cipher.decrypt(data)
|
||||
else:
|
||||
aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
|
||||
aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
|
||||
data = aes.feed(data) + aes.feed() # empty aes.feed() flushes buffer
|
||||
try:
|
||||
return strip_PKCS7_padding(data)
|
||||
except InvalidPadding:
|
||||
raise InvalidPassword()
|
||||
|
||||
|
||||
def EncodeAES(secret, s):
|
||||
assert_bytes(s)
|
||||
iv = bytes(os.urandom(16))
|
||||
ct = aes_encrypt_with_iv(secret, iv, s)
|
||||
e = iv + ct
|
||||
return base64.b64encode(e)
|
||||
|
||||
def DecodeAES(secret, e):
|
||||
e = bytes(base64.b64decode(e))
|
||||
iv, e = e[:16], e[16:]
|
||||
s = aes_decrypt_with_iv(secret, iv, e)
|
||||
return s
|
||||
|
||||
def pw_encode(s, password):
|
||||
if password:
|
||||
secret = Hash(password)
|
||||
return EncodeAES(secret, to_bytes(s, "utf8")).decode('utf8')
|
||||
else:
|
||||
return s
|
||||
|
||||
def pw_decode(s, password):
|
||||
if password is not None:
|
||||
secret = Hash(password)
|
||||
try:
|
||||
d = to_string(DecodeAES(secret, s), "utf8")
|
||||
except Exception:
|
||||
raise InvalidPassword()
|
||||
return d
|
||||
else:
|
||||
return s
|
||||
|
||||
|
||||
def sha256(x: bytes) -> bytes:
|
||||
x = to_bytes(x, 'utf8')
|
||||
return bytes(hashlib.sha256(x).digest())
|
||||
|
||||
|
||||
def Hash(x: bytes) -> bytes:
|
||||
x = to_bytes(x, 'utf8')
|
||||
out = bytes(sha256(sha256(x)))
|
||||
return out
|
||||
|
||||
|
||||
def hash_160(x: bytes) -> bytes:
|
||||
try:
|
||||
md = hashlib.new('ripemd160')
|
||||
md.update(sha256(x))
|
||||
return md.digest()
|
||||
except BaseException:
|
||||
from . import ripemd
|
||||
md = ripemd.new(sha256(x))
|
||||
return md.digest()
|
||||
|
||||
|
||||
def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
|
||||
if hasattr(hmac, 'digest'):
|
||||
# requires python 3.7+; faster
|
||||
return hmac.digest(key, msg, digest)
|
||||
else:
|
||||
return hmac.new(key, msg, digest).digest()
|
||||
Reference in New Issue
Block a user