py3
This commit is contained in:
193
lib/x509.py
193
lib/x509.py
@@ -22,56 +22,63 @@
|
||||
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
|
||||
import six
|
||||
from datetime import datetime
|
||||
import sys
|
||||
import util
|
||||
from util import profiler, print_error
|
||||
from . import util
|
||||
from .util import profiler, print_error
|
||||
import ecdsa
|
||||
import hashlib
|
||||
|
||||
|
||||
# algo OIDs
|
||||
ALGO_RSA_SHA1 = '1.2.840.113549.1.1.5'
|
||||
ALGO_RSA_SHA1 = '1.2.840.113549.1.1.5'
|
||||
ALGO_RSA_SHA256 = '1.2.840.113549.1.1.11'
|
||||
ALGO_RSA_SHA384 = '1.2.840.113549.1.1.12'
|
||||
ALGO_RSA_SHA512 = '1.2.840.113549.1.1.13'
|
||||
ALGO_ECDSA_SHA256 = '1.2.840.10045.4.3.2'
|
||||
|
||||
# prefixes, see http://stackoverflow.com/questions/3713774/c-sharp-how-to-calculate-asn-1-der-encoding-of-a-particular-hash-algorithm
|
||||
PREFIX_RSA_SHA256 = bytearray([0x30,0x31,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x01,0x05,0x00,0x04,0x20])
|
||||
PREFIX_RSA_SHA384 = bytearray([0x30,0x41,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x02,0x05,0x00,0x04,0x30])
|
||||
PREFIX_RSA_SHA512 = bytearray([0x30,0x51,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x03,0x05,0x00,0x04,0x40])
|
||||
PREFIX_RSA_SHA256 = bytearray(
|
||||
[0x30, 0x31, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01, 0x05, 0x00, 0x04, 0x20])
|
||||
PREFIX_RSA_SHA384 = bytearray(
|
||||
[0x30, 0x41, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x02, 0x05, 0x00, 0x04, 0x30])
|
||||
PREFIX_RSA_SHA512 = bytearray(
|
||||
[0x30, 0x51, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03, 0x05, 0x00, 0x04, 0x40])
|
||||
|
||||
# types used in ASN1 structured data
|
||||
ASN1_TYPES = {
|
||||
'BOOLEAN': 0x01,
|
||||
'INTEGER': 0x02,
|
||||
'BIT STRING': 0x03,
|
||||
'OCTET STRING': 0x04,
|
||||
'NULL': 0x05,
|
||||
'BOOLEAN' : 0x01,
|
||||
'INTEGER' : 0x02,
|
||||
'BIT STRING' : 0x03,
|
||||
'OCTET STRING' : 0x04,
|
||||
'NULL' : 0x05,
|
||||
'OBJECT IDENTIFIER': 0x06,
|
||||
'SEQUENCE': 0x70,
|
||||
'SET': 0x71,
|
||||
'PrintableString': 0x13,
|
||||
'IA5String': 0x16,
|
||||
'UTCTime': 0x17,
|
||||
'ENUMERATED': 0x0A,
|
||||
'UTF8String': 0x0C,
|
||||
'PrintableString': 0x13,
|
||||
'SEQUENCE' : 0x70,
|
||||
'SET' : 0x71,
|
||||
'PrintableString' : 0x13,
|
||||
'IA5String' : 0x16,
|
||||
'UTCTime' : 0x17,
|
||||
'ENUMERATED' : 0x0A,
|
||||
'UTF8String' : 0x0C,
|
||||
}
|
||||
|
||||
|
||||
class CertificateError(Exception):
|
||||
pass
|
||||
|
||||
# helper functions
|
||||
|
||||
# helper functions
|
||||
def bitstr_to_bytestr(s):
|
||||
if s[0] != '\x00':
|
||||
raise BaseException('no padding')
|
||||
return s[1:]
|
||||
|
||||
|
||||
def bytestr_to_int(s):
|
||||
i = 0
|
||||
for char in s:
|
||||
@@ -79,6 +86,7 @@ def bytestr_to_int(s):
|
||||
i |= ord(char)
|
||||
return i
|
||||
|
||||
|
||||
def decode_OID(s):
|
||||
s = map(ord, s)
|
||||
r = []
|
||||
@@ -87,103 +95,109 @@ def decode_OID(s):
|
||||
k = 0
|
||||
for i in s[1:]:
|
||||
if i < 128:
|
||||
r.append(i + 128*k)
|
||||
r.append(i + 128 * k)
|
||||
k = 0
|
||||
else:
|
||||
k = (i - 128) + 128*k
|
||||
k = (i - 128) + 128 * k
|
||||
return '.'.join(map(str, r))
|
||||
|
||||
|
||||
def encode_OID(oid):
|
||||
x = map(int, oid.split('.'))
|
||||
s = chr(x[0]*40 + x[1])
|
||||
s = chr(x[0] * 40 + x[1])
|
||||
for i in x[2:]:
|
||||
ss = chr(i % 128)
|
||||
while i > 128:
|
||||
i = i / 128
|
||||
i //= 128
|
||||
ss = chr(128 + i % 128) + ss
|
||||
s += ss
|
||||
return s
|
||||
|
||||
|
||||
|
||||
|
||||
class ASN1_Node(str):
|
||||
|
||||
def get_node(self, ix):
|
||||
# return index of first byte, first content byte and last byte.
|
||||
first = ord(self[ix+1])
|
||||
if (ord(self[ix+1]) & 0x80) == 0:
|
||||
first = ord(self[ix + 1])
|
||||
if (ord(self[ix + 1]) & 0x80) == 0:
|
||||
length = first
|
||||
ixf = ix + 2
|
||||
ixl = ixf + length - 1
|
||||
else:
|
||||
else:
|
||||
lengthbytes = first & 0x7F
|
||||
length = bytestr_to_int(self[ix+2:ix+2+lengthbytes])
|
||||
length = bytestr_to_int(self[ix + 2:ix + 2 + lengthbytes])
|
||||
ixf = ix + 2 + lengthbytes
|
||||
ixl = ixf + length -1
|
||||
return (ix, ixf, ixl)
|
||||
ixl = ixf + length - 1
|
||||
return ix, ixf, ixl
|
||||
|
||||
def root(self):
|
||||
return self.get_node(0)
|
||||
|
||||
def next_node(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self.get_node(ixl + 1)
|
||||
def root(self):
|
||||
return self.get_node(0)
|
||||
|
||||
def first_child(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
if ord(self[ixs]) & 0x20 != 0x20:
|
||||
raise BaseException('Can only open constructed types.', hex(ord(self[ixs])))
|
||||
return self.get_node(ixf)
|
||||
|
||||
def is_child_of(node1, node2):
|
||||
ixs, ixf, ixl = node1
|
||||
jxs, jxf, jxl = node2
|
||||
return ( (ixf <= jxs) and (jxl <= ixl) ) or ( (jxf <= ixs) and (ixl <= jxl) )
|
||||
def next_node(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self.get_node(ixl + 1)
|
||||
|
||||
def get_all(self, node):
|
||||
# return type + length + value
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixs:ixl+1]
|
||||
|
||||
def get_value_of_type(self, node, asn1_type):
|
||||
# verify type byte and return content
|
||||
ixs, ixf, ixl = node
|
||||
if ASN1_TYPES[asn1_type] != ord(self[ixs]):
|
||||
raise BaseException('Wrong type:', hex(ord(self[ixs])), hex(ASN1_TYPES[asn1_type]) )
|
||||
return self[ixf:ixl+1]
|
||||
def first_child(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
if ord(self[ixs]) & 0x20 != 0x20:
|
||||
raise BaseException('Can only open constructed types.', hex(ord(self[ixs])))
|
||||
return self.get_node(ixf)
|
||||
|
||||
def get_value(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixf:ixl+1]
|
||||
|
||||
def get_children(self, node):
|
||||
nodes = []
|
||||
ii = self.first_child(node)
|
||||
def is_child_of(node1, node2):
|
||||
ixs, ixf, ixl = node1
|
||||
jxs, jxf, jxl = node2
|
||||
return ((ixf <= jxs) and (jxl <= ixl)) or ((jxf <= ixs) and (ixl <= jxl))
|
||||
|
||||
|
||||
def get_all(self, node):
|
||||
# return type + length + value
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixs:ixl + 1]
|
||||
|
||||
|
||||
def get_value_of_type(self, node, asn1_type):
|
||||
# verify type byte and return content
|
||||
ixs, ixf, ixl = node
|
||||
if ASN1_TYPES[asn1_type] != ord(self[ixs]):
|
||||
raise BaseException('Wrong type:', hex(ord(self[ixs])), hex(ASN1_TYPES[asn1_type]))
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
|
||||
def get_value(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
|
||||
def get_children(self, node):
|
||||
nodes = []
|
||||
ii = self.first_child(node)
|
||||
nodes.append(ii)
|
||||
while ii[2] < node[2]:
|
||||
ii = self.next_node(ii)
|
||||
nodes.append(ii)
|
||||
while ii[2] < node[2]:
|
||||
ii = self.next_node(ii)
|
||||
nodes.append(ii)
|
||||
return nodes
|
||||
return nodes
|
||||
|
||||
def get_sequence(self):
|
||||
return map(lambda j: self.get_value(j), self.get_children(self.root()))
|
||||
|
||||
def get_dict(self, node):
|
||||
p = {}
|
||||
for ii in self.get_children(node):
|
||||
for iii in self.get_children(ii):
|
||||
iiii = self.first_child(iii)
|
||||
oid = decode_OID(self.get_value_of_type(iiii, 'OBJECT IDENTIFIER'))
|
||||
iiii = self.next_node(iiii)
|
||||
value = self.get_value(iiii)
|
||||
p[oid] = value
|
||||
return p
|
||||
def get_sequence(self):
|
||||
return map(lambda j: self.get_value(j), self.get_children(self.root()))
|
||||
|
||||
|
||||
def get_dict(self, node):
|
||||
p = {}
|
||||
for ii in self.get_children(node):
|
||||
for iii in self.get_children(ii):
|
||||
iiii = self.first_child(iii)
|
||||
oid = decode_OID(self.get_value_of_type(iiii, 'OBJECT IDENTIFIER'))
|
||||
iiii = self.next_node(iiii)
|
||||
value = self.get_value(iiii)
|
||||
p[oid] = value
|
||||
return p
|
||||
|
||||
|
||||
class X509(object):
|
||||
|
||||
def __init__(self, b):
|
||||
|
||||
self.bytes = bytearray(b)
|
||||
@@ -292,24 +306,21 @@ class X509(object):
|
||||
not_before = time.mktime(time.strptime(self.notBefore, TIMESTAMP_FMT))
|
||||
not_after = time.mktime(time.strptime(self.notAfter, TIMESTAMP_FMT))
|
||||
if not_before > now:
|
||||
raise CertificateError('Certificate has not entered its valid date range. (%s)'%self.get_common_name())
|
||||
raise CertificateError('Certificate has not entered its valid date range. (%s)' % self.get_common_name())
|
||||
if not_after <= now:
|
||||
raise CertificateError('Certificate has expired. (%s)'%self.get_common_name())
|
||||
raise CertificateError('Certificate has expired. (%s)' % self.get_common_name())
|
||||
|
||||
def getFingerprint(self):
|
||||
return hashlib.sha1(self.bytes).digest()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@profiler
|
||||
def load_certificates(ca_path):
|
||||
import pem
|
||||
from . import pem
|
||||
ca_list = {}
|
||||
ca_keyID = {}
|
||||
with open(ca_path, 'r') as f:
|
||||
s = f.read()
|
||||
with open(ca_path, 'rb') as f:
|
||||
s = f.read().decode('utf8')
|
||||
bList = pem.dePemList(s, "CERTIFICATE")
|
||||
for b in bList:
|
||||
try:
|
||||
@@ -328,7 +339,7 @@ def load_certificates(ca_path):
|
||||
|
||||
if __name__ == "__main__":
|
||||
import requests
|
||||
|
||||
util.set_verbosity(True)
|
||||
ca_path = requests.certs.where()
|
||||
ca_list, ca_keyID = load_certificates(ca_path)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user