use x509 to check if server certificate has expired
This commit is contained in:
@@ -27,48 +27,12 @@ from version import ELECTRUM_VERSION, PROTOCOL_VERSION
|
||||
from util import print_error, print_msg
|
||||
from simple_config import SimpleConfig
|
||||
|
||||
import x509
|
||||
|
||||
DEFAULT_TIMEOUT = 5
|
||||
proxy_modes = ['socks4', 'socks5', 'http']
|
||||
|
||||
|
||||
def check_cert(host, cert):
|
||||
from OpenSSL import crypto as c
|
||||
_cert = c.load_certificate(c.FILETYPE_PEM, cert)
|
||||
|
||||
m = "host: %s\n"%host
|
||||
m += "has_expired: %s\n"% _cert.has_expired()
|
||||
m += "pubkey: %s bits\n" % _cert.get_pubkey().bits()
|
||||
m += "serial number: %s\n"% _cert.get_serial_number()
|
||||
#m += "issuer: %s\n"% _cert.get_issuer()
|
||||
#m += "algo: %s\n"% _cert.get_signature_algorithm()
|
||||
m += "version: %s\n"% _cert.get_version()
|
||||
print_msg(m)
|
||||
|
||||
|
||||
def cert_has_expired(cert_path):
|
||||
try:
|
||||
import OpenSSL
|
||||
except Exception:
|
||||
print_error("Warning: cannot import OpenSSL")
|
||||
return False
|
||||
from OpenSSL import crypto as c
|
||||
with open(cert_path) as f:
|
||||
cert = f.read()
|
||||
_cert = c.load_certificate(c.FILETYPE_PEM, cert)
|
||||
return _cert.has_expired()
|
||||
|
||||
|
||||
def check_certificates():
|
||||
config = SimpleConfig()
|
||||
mydir = os.path.join(config.path, "certs")
|
||||
certs = os.listdir(mydir)
|
||||
for c in certs:
|
||||
print c
|
||||
p = os.path.join(mydir,c)
|
||||
with open(p) as f:
|
||||
cert = f.read()
|
||||
check_cert(c, cert)
|
||||
|
||||
|
||||
def cert_verify_hostname(s):
|
||||
@@ -392,11 +356,23 @@ class Interface(threading.Thread):
|
||||
os.unlink(rej)
|
||||
os.rename(temporary_path, rej)
|
||||
else:
|
||||
if cert_has_expired(cert_path):
|
||||
with open(cert_path) as f:
|
||||
cert = f.read()
|
||||
try:
|
||||
x = x509.X509()
|
||||
x.parse(cert)
|
||||
x.slow_parse()
|
||||
except:
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
print_error("wrong certificate", self.host)
|
||||
return
|
||||
try:
|
||||
x.check_date()
|
||||
except:
|
||||
print_error("certificate has expired:", cert_path)
|
||||
os.unlink(cert_path)
|
||||
else:
|
||||
print_error("wrong certificate", self.host)
|
||||
return
|
||||
print_error("wrong certificate", self.host)
|
||||
return
|
||||
except Exception:
|
||||
print_error("wrap_socket failed", self.host)
|
||||
@@ -614,6 +590,38 @@ class Interface(threading.Thread):
|
||||
return out
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
check_certificates()
|
||||
|
||||
def check_cert(host, cert):
|
||||
try:
|
||||
x = x509.X509()
|
||||
x.parse(cert)
|
||||
x.slow_parse()
|
||||
except:
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
return
|
||||
|
||||
try:
|
||||
x.check_date()
|
||||
expired = False
|
||||
except:
|
||||
expired = True
|
||||
|
||||
m = "host: %s\n"%host
|
||||
m += "has_expired: %s\n"% expired
|
||||
print_msg(m)
|
||||
|
||||
|
||||
def test_certificates():
|
||||
config = SimpleConfig()
|
||||
mydir = os.path.join(config.path, "certs")
|
||||
certs = os.listdir(mydir)
|
||||
for c in certs:
|
||||
print c
|
||||
p = os.path.join(mydir,c)
|
||||
with open(p) as f:
|
||||
cert = f.read()
|
||||
check_cert(c, cert)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_certificates()
|
||||
|
||||
Reference in New Issue
Block a user