py3 in qtgui
This commit is contained in:
133
lib/x509.py
133
lib/x509.py
@@ -31,7 +31,7 @@ import six
|
||||
from datetime import datetime
|
||||
import sys
|
||||
from . import util
|
||||
from .util import profiler, print_error
|
||||
from .util import profiler, print_error, bh2u
|
||||
import ecdsa
|
||||
import hashlib
|
||||
|
||||
@@ -74,7 +74,7 @@ class CertificateError(Exception):
|
||||
|
||||
# helper functions
|
||||
def bitstr_to_bytestr(s):
|
||||
if s[0] != '\x00':
|
||||
if s[0] != 0x00:
|
||||
raise BaseException('no padding')
|
||||
return s[1:]
|
||||
|
||||
@@ -83,14 +83,13 @@ def bytestr_to_int(s):
|
||||
i = 0
|
||||
for char in s:
|
||||
i <<= 8
|
||||
i |= ord(char)
|
||||
i |= char
|
||||
return i
|
||||
|
||||
|
||||
def decode_OID(s):
|
||||
s = map(ord, s)
|
||||
r = []
|
||||
r.append(s[0] / 40)
|
||||
r.append(s[0] // 40)
|
||||
r.append(s[0] % 40)
|
||||
k = 0
|
||||
for i in s[1:]:
|
||||
@@ -103,7 +102,7 @@ def decode_OID(s):
|
||||
|
||||
|
||||
def encode_OID(oid):
|
||||
x = map(int, oid.split('.'))
|
||||
x = [int(i) for i in oid.split('.')]
|
||||
s = chr(x[0] * 40 + x[1])
|
||||
for i in x[2:]:
|
||||
ss = chr(i % 128)
|
||||
@@ -114,11 +113,11 @@ def encode_OID(oid):
|
||||
return s
|
||||
|
||||
|
||||
class ASN1_Node(str):
|
||||
class ASN1_Node(bytes):
|
||||
def get_node(self, ix):
|
||||
# return index of first byte, first content byte and last byte.
|
||||
first = ord(self[ix + 1])
|
||||
if (ord(self[ix + 1]) & 0x80) == 0:
|
||||
first = self[ix + 1]
|
||||
if (self[ix + 1] & 0x80) == 0:
|
||||
length = first
|
||||
ixf = ix + 2
|
||||
ixl = ixf + length - 1
|
||||
@@ -129,72 +128,62 @@ class ASN1_Node(str):
|
||||
ixl = ixf + length - 1
|
||||
return ix, ixf, ixl
|
||||
|
||||
def root(self):
|
||||
return self.get_node(0)
|
||||
|
||||
def root(self):
|
||||
return self.get_node(0)
|
||||
def next_node(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self.get_node(ixl + 1)
|
||||
|
||||
def first_child(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
if self[ixs] & 0x20 != 0x20:
|
||||
raise BaseException('Can only open constructed types.', hex(self[ixs]))
|
||||
return self.get_node(ixf)
|
||||
|
||||
def next_node(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self.get_node(ixl + 1)
|
||||
def is_child_of(node1, node2):
|
||||
ixs, ixf, ixl = node1
|
||||
jxs, jxf, jxl = node2
|
||||
return ((ixf <= jxs) and (jxl <= ixl)) or ((jxf <= ixs) and (ixl <= jxl))
|
||||
|
||||
def get_all(self, node):
|
||||
# return type + length + value
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixs:ixl + 1]
|
||||
|
||||
def first_child(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
if ord(self[ixs]) & 0x20 != 0x20:
|
||||
raise BaseException('Can only open constructed types.', hex(ord(self[ixs])))
|
||||
return self.get_node(ixf)
|
||||
def get_value_of_type(self, node, asn1_type):
|
||||
# verify type byte and return content
|
||||
ixs, ixf, ixl = node
|
||||
if ASN1_TYPES[asn1_type] != self[ixs]:
|
||||
raise BaseException('Wrong type:', hex(self[ixs]), hex(ASN1_TYPES[asn1_type]))
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
def get_value(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
def is_child_of(node1, node2):
|
||||
ixs, ixf, ixl = node1
|
||||
jxs, jxf, jxl = node2
|
||||
return ((ixf <= jxs) and (jxl <= ixl)) or ((jxf <= ixs) and (ixl <= jxl))
|
||||
|
||||
|
||||
def get_all(self, node):
|
||||
# return type + length + value
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixs:ixl + 1]
|
||||
|
||||
|
||||
def get_value_of_type(self, node, asn1_type):
|
||||
# verify type byte and return content
|
||||
ixs, ixf, ixl = node
|
||||
if ASN1_TYPES[asn1_type] != ord(self[ixs]):
|
||||
raise BaseException('Wrong type:', hex(ord(self[ixs])), hex(ASN1_TYPES[asn1_type]))
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
|
||||
def get_value(self, node):
|
||||
ixs, ixf, ixl = node
|
||||
return self[ixf:ixl + 1]
|
||||
|
||||
|
||||
def get_children(self, node):
|
||||
nodes = []
|
||||
ii = self.first_child(node)
|
||||
nodes.append(ii)
|
||||
while ii[2] < node[2]:
|
||||
ii = self.next_node(ii)
|
||||
def get_children(self, node):
|
||||
nodes = []
|
||||
ii = self.first_child(node)
|
||||
nodes.append(ii)
|
||||
return nodes
|
||||
while ii[2] < node[2]:
|
||||
ii = self.next_node(ii)
|
||||
nodes.append(ii)
|
||||
return nodes
|
||||
|
||||
def get_sequence(self):
|
||||
return list(map(lambda j: self.get_value(j), self.get_children(self.root())))
|
||||
|
||||
def get_sequence(self):
|
||||
return map(lambda j: self.get_value(j), self.get_children(self.root()))
|
||||
|
||||
|
||||
def get_dict(self, node):
|
||||
p = {}
|
||||
for ii in self.get_children(node):
|
||||
for iii in self.get_children(ii):
|
||||
iiii = self.first_child(iii)
|
||||
oid = decode_OID(self.get_value_of_type(iiii, 'OBJECT IDENTIFIER'))
|
||||
iiii = self.next_node(iiii)
|
||||
value = self.get_value(iiii)
|
||||
p[oid] = value
|
||||
return p
|
||||
def get_dict(self, node):
|
||||
p = {}
|
||||
for ii in self.get_children(node):
|
||||
for iii in self.get_children(ii):
|
||||
iiii = self.first_child(iii)
|
||||
oid = decode_OID(self.get_value_of_type(iiii, 'OBJECT IDENTIFIER'))
|
||||
iiii = self.next_node(iiii)
|
||||
value = self.get_value(iiii)
|
||||
p[oid] = value
|
||||
return p
|
||||
|
||||
|
||||
class X509(object):
|
||||
@@ -202,14 +191,14 @@ class X509(object):
|
||||
|
||||
self.bytes = bytearray(b)
|
||||
|
||||
der = ASN1_Node(str(b))
|
||||
der = ASN1_Node(b)
|
||||
root = der.root()
|
||||
cert = der.first_child(root)
|
||||
# data for signature
|
||||
self.data = der.get_all(cert)
|
||||
|
||||
# optional version field
|
||||
if der.get_value(cert)[0] == chr(0xa0):
|
||||
if der.get_value(cert)[0] == 0xa0:
|
||||
version = der.first_child(cert)
|
||||
serial_number = der.next_node(version)
|
||||
else:
|
||||
@@ -269,10 +258,10 @@ class X509(object):
|
||||
# Subject Key Identifier
|
||||
r = value.root()
|
||||
value = value.get_value_of_type(r, 'OCTET STRING')
|
||||
self.SKI = value.encode('hex')
|
||||
self.SKI = bh2u(value)
|
||||
elif oid == '2.5.29.35':
|
||||
# Authority Key Identifier
|
||||
self.AKI = value.get_sequence()[0].encode('hex')
|
||||
self.AKI = bh2u(value.get_sequence()[0])
|
||||
else:
|
||||
pass
|
||||
|
||||
@@ -303,8 +292,8 @@ class X509(object):
|
||||
import time
|
||||
now = time.time()
|
||||
TIMESTAMP_FMT = '%y%m%d%H%M%SZ'
|
||||
not_before = time.mktime(time.strptime(self.notBefore, TIMESTAMP_FMT))
|
||||
not_after = time.mktime(time.strptime(self.notAfter, TIMESTAMP_FMT))
|
||||
not_before = time.mktime(time.strptime(self.notBefore.decode('ascii'), TIMESTAMP_FMT))
|
||||
not_after = time.mktime(time.strptime(self.notAfter.decode('ascii'), TIMESTAMP_FMT))
|
||||
if not_before > now:
|
||||
raise CertificateError('Certificate has not entered its valid date range. (%s)' % self.get_common_name())
|
||||
if not_after <= now:
|
||||
@@ -320,7 +309,7 @@ def load_certificates(ca_path):
|
||||
ca_list = {}
|
||||
ca_keyID = {}
|
||||
with open(ca_path, 'rb') as f:
|
||||
s = f.read().decode('utf8')
|
||||
s = f.read().decode('ascii')
|
||||
bList = pem.dePemList(s, "CERTIFICATE")
|
||||
for b in bList:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user