x509 fixes and plugins
This commit is contained in:
53
lib/x509.py
53
lib/x509.py
@@ -63,6 +63,7 @@ ASN1_TYPES = {
|
||||
'PrintableString' : 0x13,
|
||||
'IA5String' : 0x16,
|
||||
'UTCTime' : 0x17,
|
||||
'GeneralizedTime' : 0x18,
|
||||
'ENUMERATED' : 0x0A,
|
||||
'UTF8String' : 0x0C,
|
||||
}
|
||||
@@ -75,7 +76,7 @@ class CertificateError(Exception):
|
||||
# helper functions
|
||||
def bitstr_to_bytestr(s):
|
||||
if s[0] != 0x00:
|
||||
raise BaseException('no padding')
|
||||
raise TypeError('no padding')
|
||||
return s[1:]
|
||||
|
||||
|
||||
@@ -117,7 +118,7 @@ class ASN1_Node(bytes):
|
||||
def get_node(self, ix):
|
||||
# return index of first byte, first content byte and last byte.
|
||||
first = self[ix + 1]
|
||||
if (self[ix + 1] & 0x80) == 0:
|
||||
if (first & 0x80) == 0:
|
||||
length = first
|
||||
ixf = ix + 2
|
||||
ixl = ixf + length - 1
|
||||
@@ -138,7 +139,7 @@ class ASN1_Node(bytes):
|
||||
def first_child(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
if self[ixs] & 0x20 != 0x20:
|
||||
raise BaseException('Can only open constructed types.', hex(self[ixs]))
|
||||
raise TypeError('Can only open constructed types.', hex(self[ixs]))
|
||||
return self.get_node(ixf)
|
||||
|
||||
def is_child_of(node1, node2):
|
||||
@@ -155,7 +156,7 @@ class ASN1_Node(bytes):
|
||||
# verify type byte and return content
|
||||
ixs, ixf, ixl = node
|
||||
if ASN1_TYPES[asn1_type] != self[ixs]:
|
||||
raise BaseException('Wrong type:', hex(self[ixs]), hex(ASN1_TYPES[asn1_type]))
|
||||
raise TypeError('Wrong type:', hex(self[ixs]), hex(ASN1_TYPES[asn1_type]))
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
def get_value(self, node):
|
||||
@@ -217,9 +218,15 @@ class X509(object):
|
||||
# validity
|
||||
validity = der.next_node(issuer)
|
||||
ii = der.first_child(validity)
|
||||
self.notBefore = der.get_value_of_type(ii, 'UTCTime')
|
||||
try:
|
||||
self.notBefore = der.get_value_of_type(ii, 'UTCTime')
|
||||
except TypeError:
|
||||
self.notBefore = der.get_value_of_type(ii, 'GeneralizedTime')[2:] # strip year
|
||||
ii = der.next_node(ii)
|
||||
self.notAfter = der.get_value_of_type(ii, 'UTCTime')
|
||||
try:
|
||||
self.notAfter = der.get_value_of_type(ii, 'UTCTime')
|
||||
except TypeError:
|
||||
self.notAfter = der.get_value_of_type(ii, 'GeneralizedTime')[2:] # strip year
|
||||
|
||||
# subject
|
||||
subject = der.next_node(validity)
|
||||
@@ -229,17 +236,22 @@ class X509(object):
|
||||
ii = der.first_child(public_key_algo)
|
||||
self.public_key_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER'))
|
||||
|
||||
# pubkey modulus and exponent
|
||||
subject_public_key = der.next_node(public_key_algo)
|
||||
spk = der.get_value_of_type(subject_public_key, 'BIT STRING')
|
||||
spk = ASN1_Node(bitstr_to_bytestr(spk))
|
||||
r = spk.root()
|
||||
modulus = spk.first_child(r)
|
||||
exponent = spk.next_node(modulus)
|
||||
rsa_n = spk.get_value_of_type(modulus, 'INTEGER')
|
||||
rsa_e = spk.get_value_of_type(exponent, 'INTEGER')
|
||||
self.modulus = ecdsa.util.string_to_number(rsa_n)
|
||||
self.exponent = ecdsa.util.string_to_number(rsa_e)
|
||||
if self.public_key_algo != '1.2.840.10045.2.1': # for non EC public key
|
||||
# pubkey modulus and exponent
|
||||
subject_public_key = der.next_node(public_key_algo)
|
||||
spk = der.get_value_of_type(subject_public_key, 'BIT STRING')
|
||||
spk = ASN1_Node(bitstr_to_bytestr(spk))
|
||||
r = spk.root()
|
||||
modulus = spk.first_child(r)
|
||||
exponent = spk.next_node(modulus)
|
||||
rsa_n = spk.get_value_of_type(modulus, 'INTEGER')
|
||||
rsa_e = spk.get_value_of_type(exponent, 'INTEGER')
|
||||
self.modulus = ecdsa.util.string_to_number(rsa_n)
|
||||
self.exponent = ecdsa.util.string_to_number(rsa_e)
|
||||
else:
|
||||
subject_public_key = der.next_node(public_key_algo)
|
||||
spk = der.get_value_of_type(subject_public_key, 'BIT STRING')
|
||||
self.ec_public_key = spk
|
||||
|
||||
# extensions
|
||||
self.CA = False
|
||||
@@ -308,14 +320,17 @@ def load_certificates(ca_path):
|
||||
from . import pem
|
||||
ca_list = {}
|
||||
ca_keyID = {}
|
||||
with open(ca_path, 'rb') as f:
|
||||
s = f.read().decode('ascii')
|
||||
# ca_path = '/tmp/tmp.txt'
|
||||
with open(ca_path, 'r') as f:
|
||||
s = f.read()
|
||||
bList = pem.dePemList(s, "CERTIFICATE")
|
||||
for b in bList:
|
||||
try:
|
||||
x = X509(b)
|
||||
x.check_date()
|
||||
except BaseException as e:
|
||||
# with open('/tmp/tmp.txt', 'w') as f:
|
||||
# f.write(pem.pem(b, 'CERTIFICATE').decode('ascii'))
|
||||
util.print_error("cert error:", e)
|
||||
continue
|
||||
|
||||
|
||||
Reference in New Issue
Block a user