check SSL certificate in config dialog
This commit is contained in:
@@ -119,75 +119,22 @@ class PaymentRequest:
|
||||
return False
|
||||
|
||||
def verify_x509(self, paymntreq):
|
||||
""" verify chain of certificates. The last certificate is the CA"""
|
||||
if not ca_list:
|
||||
self.error = "Trusted certificate authorities list not found"
|
||||
return False
|
||||
cert = pb2.X509Certificates()
|
||||
cert.ParseFromString(paymntreq.pki_data)
|
||||
cert_num = len(cert.certificate)
|
||||
x509_chain = []
|
||||
for i in range(cert_num):
|
||||
x = x509.X509()
|
||||
x.parseBinary(bytearray(cert.certificate[i]))
|
||||
x509_chain.append(x)
|
||||
if i == 0:
|
||||
try:
|
||||
x.check_date()
|
||||
except Exception as e:
|
||||
self.error = str(e)
|
||||
return
|
||||
self.requestor = x.get_common_name()
|
||||
if self.requestor.startswith('*.'):
|
||||
self.requestor = self.requestor[2:]
|
||||
else:
|
||||
if not x.check_ca():
|
||||
self.error = "ERROR: Supplied CA Certificate Error"
|
||||
return
|
||||
if not cert_num > 1:
|
||||
self.error = "ERROR: CA Certificate Chain Not Provided by Payment Processor"
|
||||
# verify the chain of certificates
|
||||
try:
|
||||
x, ca = verify_cert_chain(cert.certificate)
|
||||
except BaseException as e:
|
||||
self.error = str(e)
|
||||
return False
|
||||
# if the root CA is not supplied, add it to the chain
|
||||
ca = x509_chain[cert_num-1]
|
||||
if ca.getFingerprint() not in ca_list:
|
||||
keyID = ca.get_issuer_keyID()
|
||||
f = ca_keyID.get(keyID)
|
||||
if f:
|
||||
root = ca_list[f]
|
||||
x509_chain.append(root)
|
||||
else:
|
||||
self.error = "Supplied CA Not Found in Trusted CA Store."
|
||||
return False
|
||||
# verify the chain of signatures
|
||||
cert_num = len(x509_chain)
|
||||
for i in range(1, cert_num):
|
||||
x = x509_chain[i]
|
||||
prev_x = x509_chain[i-1]
|
||||
algo, sig, data = prev_x.get_signature()
|
||||
sig = bytearray(sig)
|
||||
|
||||
pubkey = rsakey.RSAKey(x.modulus, x.exponent)
|
||||
|
||||
if algo == x509.ALGO_RSA_SHA1:
|
||||
verify = pubkey.hashAndVerify(sig, data)
|
||||
elif algo == x509.ALGO_RSA_SHA256:
|
||||
hashBytes = bytearray(hashlib.sha256(data).digest())
|
||||
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA256 + hashBytes)
|
||||
elif algo == x509.ALGO_RSA_SHA384:
|
||||
hashBytes = bytearray(hashlib.sha384(data).digest())
|
||||
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA384 + hashBytes)
|
||||
elif algo == x509.ALGO_RSA_SHA512:
|
||||
hashBytes = bytearray(hashlib.sha512(data).digest())
|
||||
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA512 + hashBytes)
|
||||
else:
|
||||
self.error = "Algorithm not supported"
|
||||
util.print_error(self.error, algo.getComponentByName('algorithm'))
|
||||
return False
|
||||
if not verify:
|
||||
self.error = "Certificate not Signed by Provided CA Certificate Chain"
|
||||
return False
|
||||
# get requestor name
|
||||
self.requestor = x.get_common_name()
|
||||
if self.requestor.startswith('*.'):
|
||||
self.requestor = self.requestor[2:]
|
||||
# verify the BIP70 signature
|
||||
x = x509_chain[0]
|
||||
pubkey0 = rsakey.RSAKey(x.modulus, x.exponent)
|
||||
sig = paymntreq.signature
|
||||
paymntreq.signature = ''
|
||||
@@ -330,6 +277,75 @@ def sign_request_with_alias(pr, alias, alias_privkey):
|
||||
|
||||
|
||||
|
||||
def verify_cert_chain(chain):
|
||||
""" Verify a chain of certificates. The last certificate is the CA"""
|
||||
# parse the chain
|
||||
cert_num = len(chain)
|
||||
x509_chain = []
|
||||
for i in range(cert_num):
|
||||
x = x509.X509()
|
||||
x.parseBinary(bytearray(chain[i]))
|
||||
x509_chain.append(x)
|
||||
if i == 0:
|
||||
x.check_date()
|
||||
else:
|
||||
if not x.check_ca():
|
||||
raise BaseException("ERROR: Supplied CA Certificate Error")
|
||||
if not cert_num > 1:
|
||||
raise BaseException("ERROR: CA Certificate Chain Not Provided by Payment Processor")
|
||||
# if the root CA is not supplied, add it to the chain
|
||||
ca = x509_chain[cert_num-1]
|
||||
if ca.getFingerprint() not in ca_list:
|
||||
keyID = ca.get_issuer_keyID()
|
||||
f = ca_keyID.get(keyID)
|
||||
if f:
|
||||
root = ca_list[f]
|
||||
x509_chain.append(root)
|
||||
else:
|
||||
raise BaseException("Supplied CA Not Found in Trusted CA Store.")
|
||||
# verify the chain of signatures
|
||||
cert_num = len(x509_chain)
|
||||
for i in range(1, cert_num):
|
||||
x = x509_chain[i]
|
||||
prev_x = x509_chain[i-1]
|
||||
algo, sig, data = prev_x.get_signature()
|
||||
sig = bytearray(sig)
|
||||
pubkey = rsakey.RSAKey(x.modulus, x.exponent)
|
||||
if algo == x509.ALGO_RSA_SHA1:
|
||||
verify = pubkey.hashAndVerify(sig, data)
|
||||
elif algo == x509.ALGO_RSA_SHA256:
|
||||
hashBytes = bytearray(hashlib.sha256(data).digest())
|
||||
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA256 + hashBytes)
|
||||
elif algo == x509.ALGO_RSA_SHA384:
|
||||
hashBytes = bytearray(hashlib.sha384(data).digest())
|
||||
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA384 + hashBytes)
|
||||
elif algo == x509.ALGO_RSA_SHA512:
|
||||
hashBytes = bytearray(hashlib.sha512(data).digest())
|
||||
verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA512 + hashBytes)
|
||||
else:
|
||||
raise BaseException("Algorithm not supported")
|
||||
util.print_error(self.error, algo.getComponentByName('algorithm'))
|
||||
if not verify:
|
||||
raise BaseException("Certificate not Signed by Provided CA Certificate Chain")
|
||||
|
||||
return x509_chain[0], ca
|
||||
|
||||
|
||||
def check_ssl_config(config):
|
||||
import pem
|
||||
key_path = config.get('ssl_privkey')
|
||||
cert_path = config.get('ssl_chain')
|
||||
with open(key_path, 'r') as f:
|
||||
params = pem.parse_private_key(f.read())
|
||||
privkey = rsakey.RSAKey(*params)
|
||||
with open(cert_path, 'r') as f:
|
||||
s = f.read()
|
||||
bList = pem.dePemList(s, "CERTIFICATE")
|
||||
# verify chain
|
||||
x, ca = verify_cert_chain(bList)
|
||||
# verify pubkey
|
||||
return x.get_common_name()
|
||||
|
||||
def sign_request_with_x509(pr, key_path, cert_path):
|
||||
import pem
|
||||
with open(key_path, 'r') as f:
|
||||
|
||||
Reference in New Issue
Block a user