#!/usr/bin/env python # Copyright 2017 The Chromium Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """This script is called without any arguments to re-generate all of the *.pem files in the script's parent directory. """ from pyasn1.codec.der import decoder, encoder from pyasn1_modules import rfc2560, rfc2459 from pyasn1.type import univ, useful import hashlib, datetime import subprocess import os from OpenSSL import crypto import base64 NEXT_SERIAL = 0 # 1/1/2017 00:00 GMT CERT_DATE = datetime.datetime(2017, 1, 1, 0, 0) # 1/1/2018 00:00 GMT CERT_EXPIRE = CERT_DATE + datetime.timedelta(days=365) # 2/1/2017 00:00 GMT REVOKE_DATE = datetime.datetime(2017, 2, 1, 0, 0) # 3/1/2017 00:00 GMT THIS_DATE = datetime.datetime(2017, 3, 1, 0, 0) # 3/2/2017 00:00 GMT PRODUCED_DATE = datetime.datetime(2017, 3, 2, 0, 0) # 6/1/2017 00:00 GMT NEXT_DATE = datetime.datetime(2017, 6, 1, 0, 0) sha1oid = univ.ObjectIdentifier('1.3.14.3.2.26') sha1rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.5') sha256oid = univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1') sha256rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.11') def SigAlgOid(sig_alg): if sig_alg == 'sha1': return sha1rsaoid return sha256rsaoid def CreateCert(name, signer=None, ocsp=False): global NEXT_SERIAL pkey = crypto.PKey() pkey.generate_key(crypto.TYPE_RSA, 1024) cert = crypto.X509() cert.set_version(2) cert.get_subject().CN = name cert.set_pubkey(pkey) cert.set_serial_number(NEXT_SERIAL) NEXT_SERIAL += 1 cert.set_notBefore(CERT_DATE.strftime('%Y%m%d%H%M%SZ')) cert.set_notAfter(CERT_EXPIRE.strftime('%Y%m%d%H%M%SZ')) if ocsp: cert.add_extensions( [crypto.X509Extension('extendedKeyUsage', False, 'OCSPSigning')]) if signer: cert.set_issuer(signer[1].get_subject()) cert.sign(signer[2], 'sha1') else: cert.set_issuer(cert.get_subject()) cert.sign(pkey, 'sha1') asn1cert = decoder.decode( crypto.dump_certificate(crypto.FILETYPE_ASN1, cert), asn1Spec=rfc2459.Certificate())[0] if not signer: signer = [asn1cert] return (asn1cert, cert, pkey, signer[0]) def CreateExtension(oid='1.2.3.4', critical=False): ext = rfc2459.Extension() ext.setComponentByName('extnID', univ.ObjectIdentifier(oid)) ext.setComponentByName('extnValue', 'DEADBEEF') if critical: ext.setComponentByName('critical', univ.Boolean('True')) else: ext.setComponentByName('critical', univ.Boolean('False')) return ext ROOT_CA = CreateCert('Test CA', None) CA = CreateCert('Test Intermediate CA', ROOT_CA) CA_LINK = CreateCert('Test OCSP Signer', CA, True) CA_BADLINK = CreateCert('Test False OCSP Signer', CA, False) CERT = CreateCert('Test Cert', CA) JUNK_CERT = CreateCert('Random Cert', None) EXTENSION = CreateExtension() def GetName(c): rid = rfc2560.ResponderID() subject = c[0].getComponentByName('tbsCertificate').getComponentByName( 'subject') rn = rid.componentType.getTypeByPosition(0).clone() for i in range(len(subject)): rn.setComponentByPosition(i, subject.getComponentByPosition(i)) rid.setComponentByName('byName', rn) return rid def GetKeyHash(c): rid = rfc2560.ResponderID() spk = c[0].getComponentByName('tbsCertificate').getComponentByName( 'subjectPublicKeyInfo').getComponentByName('subjectPublicKey') keyHash = hashlib.sha1(encoder.encode(spk)[4:]).digest() rid.setComponentByName('byKey', keyHash) return rid def CreateSingleResponse(cert=CERT, status=0, next=None, revoke_time=None, reason=None, extensions=[]): sr = rfc2560.SingleResponse() cid = sr.setComponentByName('certID').getComponentByName('certID') issuer_tbs = cert[3].getComponentByName('tbsCertificate') tbs = cert[0].getComponentByName('tbsCertificate') name_hash = hashlib.sha1( encoder.encode(issuer_tbs.getComponentByName('subject'))).digest() key_hash = hashlib.sha1( encoder.encode( issuer_tbs.getComponentByName('subjectPublicKeyInfo') .getComponentByName('subjectPublicKey'))[4:]).digest() sn = tbs.getComponentByName('serialNumber') ha = cid.setComponentByName('hashAlgorithm').getComponentByName( 'hashAlgorithm') ha.setComponentByName('algorithm', sha1oid) cid.setComponentByName('issuerNameHash', name_hash) cid.setComponentByName('issuerKeyHash', key_hash) cid.setComponentByName('serialNumber', sn) cs = rfc2560.CertStatus() if status == 0: cs.setComponentByName('good') elif status == 1: ri = cs.componentType.getTypeByPosition(1).clone() if revoke_time == None: revoke_time = REVOKE_DATE ri.setComponentByName('revocationTime', useful.GeneralizedTime( revoke_time.strftime('%Y%m%d%H%M%SZ'))) if reason: ri.setComponentByName('revocationReason', reason) cs.setComponentByName('revoked', ri) else: ui = cs.componentType.getTypeByPosition(2).clone() cs.setComponentByName('unknown', ui) sr.setComponentByName('certStatus', cs) sr.setComponentByName('thisUpdate', useful.GeneralizedTime( THIS_DATE.strftime('%Y%m%d%H%M%SZ'))) if next: sr.setComponentByName('nextUpdate', next.strftime('%Y%m%d%H%M%SZ')) if extensions: elist = sr.setComponentByName('singleExtensions').getComponentByName( 'singleExtensions') for i in range(len(extensions)): elist.setComponentByPosition(i, extensions[i]) return sr def Create(signer=None, response_status=0, response_type='1.3.6.1.5.5.7.48.1.1', signature=None, version=1, responder=None, responses=None, extensions=None, certs=None, sigAlg='sha1'): ocsp = rfc2560.OCSPResponse() ocsp.setComponentByName('responseStatus', response_status) if response_status != 0: return ocsp tbs = rfc2560.ResponseData() if version != 1: tbs.setComponentByName('version', version) if not signer: signer = CA if not responder: responder = GetName(signer) tbs.setComponentByName('responderID', responder) tbs.setComponentByName('producedAt', useful.GeneralizedTime( PRODUCED_DATE.strftime('%Y%m%d%H%M%SZ'))) rlist = tbs.setComponentByName('responses').getComponentByName('responses') if responses == None: responses = [CreateSingleResponse(CERT, 0)] if responses: for i in range(len(responses)): rlist.setComponentByPosition(i, responses[i]) if extensions: elist = tbs.setComponentByName('responseExtensions').getComponentByName( 'responseExtensions') for i in range(len(extensions)): elist.setComponentByPosition(i, extensions[i]) sa = rfc2459.AlgorithmIdentifier() sa.setComponentByName('algorithm', SigAlgOid(sigAlg)) # TODO(mattm): If pyasn1 gives an error # "Component value is tag-incompatible: Null() vs Any()", try hacking # pyasn1_modules/rfc2459.py's AlgorithmIdentifier to specify univ.Null as the # type for 'parameters'. (Which is an ugly hack, but lets the script work.) sa.setComponentByName('parameters', univ.Null()) basic = rfc2560.BasicOCSPResponse() basic.setComponentByName('tbsResponseData', tbs) basic.setComponentByName('signatureAlgorithm', sa) if not signature: signature = crypto.sign(signer[2], encoder.encode(tbs), sigAlg) basic.setComponentByName('signature', univ.BitString("'%s'H" % (signature.encode('hex')))) if certs: cs = basic.setComponentByName('certs').getComponentByName('certs') for i in range(len(certs)): cs.setComponentByPosition(i, certs[i][0]) rbytes = ocsp.componentType.getTypeByPosition(1) rbytes.setComponentByName('responseType', univ.ObjectIdentifier(response_type)) rbytes.setComponentByName('response', encoder.encode(basic)) ocsp.setComponentByName('responseBytes', rbytes) return ocsp def MakePemBlock(der, name): b64 = base64.b64encode(der) wrapped = '\n'.join(b64[pos:pos + 64] for pos in xrange(0, len(b64), 64)) return '-----BEGIN %s-----\n%s\n-----END %s-----' % (name, wrapped, name) def WriteStringToFile(data, path): with open(path, "w") as f: f.write(data) def ReadFileToString(path): with open(path, 'r') as f: return f.read() def CreateOCSPRequestDer(issuer_cert_pem, cert_pem): '''Uses OpenSSL to generate a basic OCSPRequest for |cert_pem|.''' issuer_path = "tmp_issuer.pem" cert_path = "tmp_cert.pem" request_path = "tmp_request.der" WriteStringToFile(issuer_cert_pem, issuer_path) WriteStringToFile(cert_pem, cert_path) p = subprocess.Popen(["openssl", "ocsp", "-no_nonce", "-issuer", issuer_path, "-cert", cert_path, "-reqout", request_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout_data, stderr_data = p.communicate() os.remove(issuer_path) os.remove(cert_path) result = None if p.returncode == 0: result = ReadFileToString(request_path) os.remove(request_path) return result def Store(fname, description, ca, data): ca_cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, ca[1]) cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, CERT[1]) ocsp_request_der = CreateOCSPRequestDer(ca_cert_pem, cert_pem) out = ('%s\n%s\n%s\n\n%s\n%s') % ( description, MakePemBlock(encoder.encode(data), "OCSP RESPONSE"), ca_cert_pem.replace('CERTIFICATE', 'CA CERTIFICATE'), cert_pem, MakePemBlock(ocsp_request_der, "OCSP REQUEST")) open('%s.pem' % fname, 'w').write(out) Store( 'no_response', 'No SingleResponses attached to the response', CA, Create(responses=[])) Store( 'malformed_request', 'Has a status of MALFORMED_REQUEST', CA, Create(response_status=1)) Store( 'bad_status', 'Has an invalid status larger than the defined Status enumeration', CA, Create(response_status=17)) Store( 'bad_ocsp_type', 'Has an invalid OCSP OID', CA, Create(response_type='1.3.6.1.5.5.7.48.1.2')) Store( 'bad_signature', 'Has an invalid signature', CA, Create(signature='\xde\xad\xbe\xef')) Store('ocsp_sign_direct', 'Signed directly by the issuer', CA, Create(signer=CA, certs=[])) Store('ocsp_sign_indirect', 'Signed indirectly through an intermediate', CA, Create(signer=CA_LINK, certs=[CA_LINK])) Store('ocsp_sign_indirect_missing', 'Signed indirectly through a missing intermediate', CA, Create(signer=CA_LINK, certs=[])) Store('ocsp_sign_bad_indirect', 'Signed through an intermediate without the correct key usage', CA, Create(signer=CA_BADLINK, certs=[CA_BADLINK])) Store('ocsp_extra_certs', 'Includes extra certs', CA, Create(signer=CA, certs=[CA, CA_LINK])) Store('has_version', 'Includes a default version V1', CA, Create(version=1)) Store( 'responder_name', 'Uses byName to identify the signer', CA, Create(responder=GetName(CA))) # TODO(eroman): pyasn1 module has a bug in rfc2560.ResponderID() that will use # IMPLICIT rather than EXPLICIT tagging for byKey # (https://github.com/etingof/pyasn1-modules/issues/8). If using an affected # version of the library you will need to patch pyasn1_modules/rfc2560.py and # replace "implicitTag" with "explicitTag" in ResponderID to generate this # test data correctly. Store( 'responder_id', 'Uses byKey to identify the signer', CA, Create(responder=GetKeyHash(CA))) Store( 'has_extension', 'Includes an x509v3 extension', CA, Create(extensions=[EXTENSION])) Store( 'good_response', 'Is a valid response for the cert', CA, Create(responses=[CreateSingleResponse(CERT, 0)])) Store('good_response_sha256', 'Is a valid response for the cert with a SHA256 signature', CA, Create(responses=[CreateSingleResponse(CERT, 0)], sigAlg='sha256')) Store( 'good_response_next_update', 'Is a valid response for the cert until nextUpdate', CA, Create(responses=[CreateSingleResponse(CERT, 0, next=NEXT_DATE)])) Store( 'revoke_response', 'Is a REVOKE response for the cert', CA, Create(responses=[CreateSingleResponse(CERT, 1)])) Store( 'revoke_response_reason', 'Is a REVOKE response for the cert with a reason', CA, Create(responses=[ CreateSingleResponse(CERT, 1, revoke_time=REVOKE_DATE, reason=1) ])) Store( 'unknown_response', 'Is an UNKNOWN response for the cert', CA, Create(responses=[CreateSingleResponse(CERT, 2)])) Store( 'multiple_response', 'Has multiple responses for the cert', CA, Create(responses=[ CreateSingleResponse(CERT, 0), CreateSingleResponse(CERT, 2) ])) Store( 'other_response', 'Is a response for a different cert', CA, Create(responses=[ CreateSingleResponse(JUNK_CERT, 0), CreateSingleResponse(JUNK_CERT, 1) ])) Store( 'has_single_extension', 'Has an extension in the SingleResponse', CA, Create(responses=[ CreateSingleResponse(CERT, 0, extensions=[CreateExtension()]) ])) Store( 'has_critical_single_extension', 'Has a critical extension in the SingleResponse', CA, Create(responses=[ CreateSingleResponse( CERT, 0, extensions=[CreateExtension('1.2.3.4', critical=True)]) ])) Store( 'has_critical_response_extension', 'Has a critical extension in the ResponseData', CA, Create( responses=[CreateSingleResponse(CERT, 0)], extensions=[CreateExtension('1.2.3.4', critical=True)])) Store( 'has_critical_ct_extension', 'Has a critical CT extension in the SingleResponse', CA, Create(responses=[ CreateSingleResponse( CERT, 0, extensions=[ CreateExtension('1.3.6.1.4.1.11129.2.4.5', critical=True) ]) ])) Store('missing_response', 'Missing a response for the cert', CA, Create(response_status=0, responses=[]))