URL: https://github.com/freeipa/freeipa/pull/162 Author: frasertweedale Title: #162: Certificate processing refactoring Action: opened
PR body: """ This PR contains ready-for-review/test commits that: - support converting python-cryptography Name type to DN - avoid the need to parse friendlyName from CSR and remove the code that does that - convert `ipalib.pkcs10` to use python-cryptography instead of NSS for processing CSRs. - eliminate our use of the nss.data_to_hex function - switch `ipalib.x509` to use ASN.1 specifications provided by *pyasn1-modules* library, and remove our hand-rolled definitions. It was discussed to target subteam staging branches for the ongoing refactoring work but it does not seem that these were created yet. I can retarget the PR after the cert refactoring branch gets created. """ To pull the PR as Git branch: git remote add ghfreeipa https://github.com/freeipa/freeipa git fetch ghfreeipa pull/162/head:pr162 git checkout pr162
From 7b22e9d9ed300e4abf7a5679b7f7d0014976c80e Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <ftwee...@redhat.com> Date: Mon, 10 Oct 2016 16:08:52 +1000 Subject: [PATCH 1/5] dn: support conversion from python-cryptography Name The upcoming change to using python-cryptography for certificate process will require a way to convert ``cryptography.x509.name.Name`` values to ``ipapython.dn.DN``. Update the ``DN`` constructor to accept a ``Name``. Part of: https://fedorahosted.org/freeipa/ticket/6398 --- ipapython/dn.py | 43 ++++++++++++++++++++++++++++++++++++-- ipatests/test_ipapython/test_dn.py | 23 ++++++++++++++++++-- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/ipapython/dn.py b/ipapython/dn.py index 3ee35c6..682e0ca 100644 --- a/ipapython/dn.py +++ b/ipapython/dn.py @@ -422,6 +422,7 @@ import sys import functools +import cryptography.x509 from ldap.dn import str2dn, dn2str from ldap import DECODING_ERROR import six @@ -976,6 +977,8 @@ class DN(object): to yield one or more RDN's which will be appended in order to the DN. The parsing recognizes the DN syntax escaping rules. + * A single ``cryptography.x509.name.Name`` object. + * A RDN object, the RDN will copied respecting the constructors keyword configuration parameters and appended in order. @@ -1125,9 +1128,17 @@ def _rdns_from_value(self, value): rdns = [[ava]] elif isinstance(value, RDN): rdns = [value.to_openldap()] + elif isinstance(value, cryptography.x509.name.Name): + rdns = list(reversed([ + [get_ava( + _ATTR_NAME_BY_OID.get(ava.oid, ava.oid.dotted_string), + ava.value)] + for ava in value + ])) else: - raise TypeError("must be str, unicode, tuple, or RDN or DN, got %s instead" % - type(value)) + raise TypeError( + "must be str, unicode, tuple, Name, RDN or DN, got %s instead" + % type(value)) return rdns def _rdns_from_sequence(self, seq): @@ -1407,3 +1418,31 @@ def rindex(self, pattern, start=None, end=None): if i == -1: raise ValueError("pattern not found") return i + + +_ATTR_NAME_BY_OID = { + cryptography.x509.oid.NameOID.COMMON_NAME: 'CN', + cryptography.x509.oid.NameOID.COUNTRY_NAME: 'C', + cryptography.x509.oid.NameOID.LOCALITY_NAME: 'L', + cryptography.x509.oid.NameOID.STATE_OR_PROVINCE_NAME: 'ST', + cryptography.x509.oid.NameOID.ORGANIZATION_NAME: 'O', + cryptography.x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME: 'OU', + cryptography.x509.oid.NameOID.SERIAL_NUMBER: 'serialNumber', + cryptography.x509.oid.NameOID.SURNAME: 'SN', + cryptography.x509.oid.NameOID.GIVEN_NAME: 'givenName', + cryptography.x509.oid.NameOID.TITLE: 'title', + cryptography.x509.oid.NameOID.GENERATION_QUALIFIER: 'generationQualifier', + cryptography.x509.oid.NameOID.DN_QUALIFIER: 'dnQualifier', + cryptography.x509.oid.NameOID.PSEUDONYM: 'pseudonym', + cryptography.x509.oid.NameOID.DOMAIN_COMPONENT: 'DC', + cryptography.x509.oid.NameOID.EMAIL_ADDRESS: 'E', + cryptography.x509.oid.NameOID.JURISDICTION_COUNTRY_NAME: + 'incorporationCountry', + cryptography.x509.oid.NameOID.JURISDICTION_LOCALITY_NAME: + 'incorporationLocality', + cryptography.x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME: + 'incorporationState', + cryptography.x509.oid.NameOID.BUSINESS_CATEGORY: 'businessCategory', + cryptography.x509.ObjectIdentifier('2.5.4.9'): 'STREET', + cryptography.x509.ObjectIdentifier('0.9.2342.19200300.100.1.1'): 'UID', +} diff --git a/ipatests/test_ipapython/test_dn.py b/ipatests/test_ipapython/test_dn.py index a96bd33..3ca3b57 100644 --- a/ipatests/test_ipapython/test_dn.py +++ b/ipatests/test_ipapython/test_dn.py @@ -2,6 +2,7 @@ import unittest import pytest +from cryptography import x509 import six from ipapython.dn import DN, RDN, AVA @@ -621,7 +622,7 @@ class TestDN(unittest.TestCase): def setUp(self): # ava1 must sort before ava2 self.attr1 = 'cn' - self.value1 = 'Bob' + self.value1 = u'Bob' self.str_ava1 = '%s=%s' % (self.attr1, self.value1) self.ava1 = AVA(self.attr1, self.value1) @@ -629,7 +630,7 @@ def setUp(self): self.rdn1 = RDN((self.attr1, self.value1)) self.attr2 = 'ou' - self.value2 = 'people' + self.value2 = u'people' self.str_ava2 = '%s=%s' % (self.attr2, self.value2) self.ava2 = AVA(self.attr2, self.value2) @@ -656,6 +657,11 @@ def setUp(self): self.base_container_dn = DN((self.attr1, self.value1), self.container_dn, self.base_dn) + self.x500name = x509.Name([ + x509.NameAttribute( + x509.NameOID.ORGANIZATIONAL_UNIT_NAME, self.value2), + x509.NameAttribute(x509.NameOID.COMMON_NAME, self.value1), + ]) def assertExpectedClass(self, klass, obj, component): self.assertIs(obj.__class__, expected_class(klass, component)) @@ -794,6 +800,19 @@ def test_create(self): self.assertEqual(dn1[0], self.rdn1) self.assertEqual(dn1[1], self.rdn2) + # Create with a python-cryptography 'Name' + dn1 = DN(self.x500name) + self.assertEqual(len(dn1), 2) + self.assertExpectedClass(DN, dn1, 'self') + for i in range(0, len(dn1)): + self.assertExpectedClass(DN, dn1[i], 'RDN') + for j in range(0, len(dn1[i])): + self.assertExpectedClass(DN, dn1[i][j], 'AVA') + self.assertIsInstance(dn1[i].attr, unicode) + self.assertIsInstance(dn1[i].value, unicode) + self.assertEqual(dn1[0], self.rdn1) + self.assertEqual(dn1[1], self.rdn2) + # Create with RDN, and 2 DN's (e.g. attr + container + base) dn1 = DN((self.attr1, self.value1), self.container_dn, self.base_dn) self.assertEqual(len(dn1), 5) From 1b8447af13251ef7a52930960cdc750a93817730 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <ftwee...@redhat.com> Date: Mon, 10 Oct 2016 14:31:49 +1000 Subject: [PATCH 2/5] pkcs10: use python-cryptography for CSR processing Update ``ipalib.pkcs10`` module to use python-cryptography for CSR processing instead of NSS. Part of: https://fedorahosted.org/freeipa/ticket/6398 --- ipalib/pkcs10.py | 84 +++++++++-------------------------------------- ipalib/x509.py | 39 ++++++++++++++++++++++ ipaserver/plugins/cert.py | 75 ++++++++++++++++++++++++------------------ 3 files changed, 97 insertions(+), 101 deletions(-) diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py index 158ebb3..4df022e 100644 --- a/ipalib/pkcs10.py +++ b/ipalib/pkcs10.py @@ -21,11 +21,11 @@ import sys import base64 -import nss.nss as nss +from cryptography.hazmat.backends import default_backend +import cryptography.x509 from pyasn1.type import univ, namedtype, tag from pyasn1.codec.der import decoder import six -from ipalib import x509 if six.PY3: unicode = str @@ -33,58 +33,6 @@ PEM = 0 DER = 1 -def get_subject(csr, datatype=PEM): - """ - Given a CSR return the subject value. - - This returns an nss.DN object. - """ - request = load_certificate_request(csr, datatype) - try: - return request.subject - finally: - del request - -def get_extensions(csr, datatype=PEM): - """ - Given a CSR return OIDs of certificate extensions. - - The return value is a tuple of strings - """ - request = load_certificate_request(csr, datatype) - - # Work around a bug in python-nss where nss.oid_dotted_decimal - # errors on unrecognised OIDs - # - # https://bugzilla.redhat.com/show_bug.cgi?id=1246729 - # - def get_prefixed_oid_str(ext): - """Returns a string like 'OID.1.2...'.""" - if ext.oid_tag == 0: - return repr(ext) - else: - return nss.oid_dotted_decimal(ext.oid) - - return tuple(get_prefixed_oid_str(ext)[4:] - for ext in request.extensions) - - -def get_subjectaltname(csr, datatype=PEM): - """ - Given a CSR return the subjectaltname value, if any. - - The return value is a tuple of strings or None - """ - request = load_certificate_request(csr, datatype) - for extension in request.extensions: - if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: - break - else: - return None - del request - - return x509.decode_generalnames(extension.value) - # Unfortunately, NSS can only parse the extension request attribute, so # we have to parse friendly name ourselves (see RFC 2986) @@ -148,31 +96,29 @@ def strip_header(csr): return csr -def load_certificate_request(csr, datatype=PEM): + +def load_certificate_request(data, datatype=PEM): """ - Given a base64-encoded certificate request, with or without the - header/footer, return a request object. + Load a PKCS #10 certificate request. + + :param datatype: PEM for base64-encoded data (with or without header), + or DER + :return: a python-cryptography ``Certificate`` object. + :raises: ``ValueError`` if unable to load the request + """ - if datatype == PEM: - csr = strip_header(csr) - csr = base64.b64decode(csr) + if (datatype == PEM): + data = strip_header(data) + data = base64.b64decode(data) - # A fail-safe so we can always read a CSR. python-nss/NSS will segfault - # otherwise - if not nss.nss_is_initialized(): - nss.nss_init_nodb() + return cryptography.x509.load_der_x509_csr(data, default_backend()) - return nss.CertificateRequest(csr) if __name__ == '__main__': - nss.nss_init_nodb() - # Read PEM request from stdin and print out its components csrlines = sys.stdin.readlines() csr = ''.join(csrlines) print(load_certificate_request(csr)) - print(get_subject(csr)) - print(get_subjectaltname(csr)) print(get_friendlyname(csr)) diff --git a/ipalib/x509.py b/ipalib/x509.py index e986a97..e67aab6 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -39,6 +39,7 @@ import base64 import re +import cryptography.x509 import nss.nss as nss from nss.error import NSPRError from pyasn1.type import univ, char, namedtype, tag @@ -52,6 +53,9 @@ from ipaplatform.paths import paths from ipapython.dn import DN +if six.PY3: + unicode = str + PEM = 0 DER = 1 @@ -513,6 +517,41 @@ def decode_generalnames(secitem): return names +class KRB5PrincipalName(cryptography.x509.general_name.OtherName): + def __init__(self, type_id, value): + super(KRB5PrincipalName, self).__init__(type_id, value) + self.name = _decode_krb5principalname(value) + + +class UPN(cryptography.x509.general_name.OtherName): + def __init__(self, type_id, value): + super(UPN, self).__init__(type_id, value) + self.name = unicode( + decoder.decode(value, asn1Spec=char.UTF8String())[0]) + + +OTHERNAME_CLASS_MAP = { + SAN_KRB5PRINCIPALNAME: KRB5PrincipalName, + SAN_UPN: UPN, +} + + +def process_othernames(gns): + """ + Process python-cryptography GeneralName values, yielding + OtherName values of more specific type if type is known. + + """ + for gn in gns: + if isinstance(gn, cryptography.x509.general_name.OtherName): + cls = OTHERNAME_CLASS_MAP.get( + gn.type_id.dotted_string, + cryptography.x509.general_name.OtherName) + yield cls(gn.type_id, gn.value) + else: + yield gn + + if __name__ == '__main__': # this can be run with: # python ipalib/x509.py < /etc/ipa/ca.crt diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index d13974e..4be96f5 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -20,14 +20,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 -import binascii import collections import datetime import os +import cryptography.x509 from nss import nss -from nss.error import NSPRError -from pyasn1.error import PyAsn1Error import six from ipalib import Command, Str, Int, Flag @@ -174,9 +172,7 @@ def validate_csr(ugettext, csr): return try: pkcs10.load_certificate_request(csr) - except (TypeError, binascii.Error) as e: - raise errors.Base64DecodeError(reason=str(e)) - except Exception as e: + except ValueError as e: raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % e) def normalize_csr(csr): @@ -607,17 +603,21 @@ def execute(self, csr, all=False, raw=False, **kw): caacl_check(principal_type, principal, ca, profile_id) try: - subject = pkcs10.get_subject(csr) - extensions = pkcs10.get_extensions(csr) - subjectaltname = pkcs10.get_subjectaltname(csr) or () - except (NSPRError, PyAsn1Error, ValueError) as e: + csr_obj = pkcs10.load_certificate_request(csr) + except ValueError as e: raise errors.CertificateOperationError( error=_("Failure decoding Certificate Signing Request: %s") % e) + try: + ext_san = csr_obj.extensions.get_extension_for_oid( + cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + except cryptography.x509.extensions.ExtensionNotFound: + ext_san = None + # self-service and host principals may bypass SAN permission check if (bind_principal_string != principal_string and bind_principal_type != HOST): - if '2.5.29.17' in extensions: + if ext_san is not None: self.check_access('request certificate with subjectaltname') dn = None @@ -650,10 +650,14 @@ def execute(self, csr, all=False, raw=False, **kw): dn = principal_obj['dn'] # Ensure that the DN in the CSR matches the principal - cn = subject.common_name #pylint: disable=E1101 - if not cn: + # + # We only look at the "most specific" CN value + cns = csr_obj.subject.get_attributes_for_oid( + cryptography.x509.oid.NameOID.COMMON_NAME) + if len(cns) == 0: raise errors.ValidationError(name='csr', error=_("No Common Name was found in subject of request.")) + cn = cns[-1].value # "most specific" is end of list if principal_type in (SERVICE, HOST): if cn.lower() != principal.hostname.lower(): @@ -670,8 +674,11 @@ def execute(self, csr, all=False, raw=False, **kw): ) # check email address - mail = subject.email_address #pylint: disable=E1101 - if mail is not None and mail not in principal_obj.get('mail', []): + # + # fail if any email addr from DN does not appear in ldap entry + email_addrs = csr_obj.subject.get_attributes_for_oid( + cryptography.x509.oid.NameOID.EMAIL_ADDRESS) + if len(set(email_addrs) - set(principal_obj.get('mail', []))) > 0: raise errors.ValidationError( name='csr', error=_( @@ -685,9 +692,12 @@ def execute(self, csr, all=False, raw=False, **kw): "to the 'userCertificate' attribute of entry '%s'.") % dn) # Validate the subject alt name, if any - for name_type, desc, name, _der_name in subjectaltname: - if name_type == nss.certDNSName: - name = unicode(name) + generalnames = [] + if ext_san is not None: + generalnames = x509.process_othernames(ext_san.value) + for gn in generalnames: + if isinstance(gn, cryptography.x509.general_name.DNSName): + name = gn.value alt_principal = None alt_principal_obj = None try: @@ -703,8 +713,9 @@ def execute(self, csr, all=False, raw=False, **kw): elif principal_type == USER: raise errors.ValidationError( name='csr', - error=_("subject alt name type %s is forbidden " - "for user principals") % desc + error=_( + "subject alt name type %s is forbidden " + "for user principals") % "DNSName" ) except errors.NotFound: # We don't want to issue any certificates referencing @@ -721,17 +732,15 @@ def execute(self, csr, all=False, raw=False, **kw): "with subject alt name '%s'.") % name) if alt_principal is not None and not bypass_caacl: caacl_check(principal_type, alt_principal, ca, profile_id) - elif name_type in [ - (nss.certOtherName, x509.SAN_UPN), - (nss.certOtherName, x509.SAN_KRB5PRINCIPALNAME), - ]: - if name != principal_string: + elif isinstance(gn, (x509.KRB5PrincipalName, x509.UPN)): + if gn.name != principal_string: raise errors.ACIError( - info=_("Principal '%s' in subject alt name does not " - "match requested principal") % name) - elif name_type == nss.certRFC822Name: + info=_( + "Principal '%s' in subject alt name does not " + "match requested principal") % gn.name) + elif isinstance(gn, cryptography.x509.general_name.RFC822Name): if principal_type == USER: - if name not in principal_obj.get('mail', []): + if gn.value not in principal_obj.get('mail', []): raise errors.ValidationError( name='csr', error=_( @@ -741,12 +750,14 @@ def execute(self, csr, all=False, raw=False, **kw): else: raise errors.ValidationError( name='csr', - error=_("subject alt name type %s is forbidden " - "for non-user principals") % desc + error=_( + "subject alt name type %s is forbidden " + "for non-user principals") % "RFC822Name" ) else: raise errors.ACIError( - info=_("Subject alt name type %s is forbidden") % desc) + info=_("Subject alt name type %s is forbidden") + % type(gn).__name__) # Request the certificate try: From 0ab57e1b055723da9cad4f762edbd4252c18a71b Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <ftwee...@redhat.com> Date: Wed, 12 Oct 2016 11:03:56 +1000 Subject: [PATCH 3/5] pkcs10: remove pyasn1 PKCS #10 spec In the dogtag-ipa-ca-renew-agent-submit certmonger renewal helper, we currently use our hand-rolled PKCS #10 pyasn1 specification to parse the friendlyName out of CSRs generated by certmonger (it contains the NSSDB nickname of the cert). Use other information from the renewal helper process environment to determine the nickname and remove our PKCS #10 pyasn1 spec. Part of: https://fedorahosted.org/freeipa/ticket/6398 --- .../certmonger/dogtag-ipa-ca-renew-agent-submit | 30 ++++++------- ipalib/pkcs10.py | 50 ---------------------- 2 files changed, 14 insertions(+), 66 deletions(-) diff --git a/install/certmonger/dogtag-ipa-ca-renew-agent-submit b/install/certmonger/dogtag-ipa-ca-renew-agent-submit index 967ce6e..8666ba7 100755 --- a/install/certmonger/dogtag-ipa-ca-renew-agent-submit +++ b/install/certmonger/dogtag-ipa-ca-renew-agent-submit @@ -37,9 +37,9 @@ import json import six -from ipapython import ipautil +from ipapython import certmonger, ipautil from ipapython.dn import DN -from ipalib import api, errors, pkcs10, x509 +from ipalib import api, errors, x509 from ipaplatform.paths import paths from ipaserver.plugins.ldap2 import ldap2 from ipaserver.install import cainstance, certs @@ -65,8 +65,14 @@ if six.PY3: IPA_CA_NICKNAME = 'caSigningCert cert-pki-ca' def get_nickname(): - csr = os.environ.get('CERTMONGER_CSR') - return pkcs10.get_friendlyname(csr) if csr else None + subject = os.environ.get('CERTMONGER_REQ_SUBJECT') + if not subject: + return None + request_id = certmonger.get_request_id({'template-subject': subject}) + if not request_id: + return None + return certmonger.get_request_value(request_id, 'cert-nickname') + def is_lightweight_ca(): nickname = get_nickname() or '' @@ -216,13 +222,9 @@ def store_cert(): else: return (OPERATION_NOT_SUPPORTED_BY_HELPER,) - csr = os.environ.get('CERTMONGER_CSR') - if not csr: - return (UNCONFIGURED, "Certificate request not provided") - - nickname = pkcs10.get_friendlyname(csr) + nickname = get_nickname() if not nickname: - return (REJECTED, "No friendly name in the certificate request") + return (REJECTED, "Nickname could not be determined") cert = os.environ.get('CERTMONGER_CERTIFICATE') if not cert: @@ -325,13 +327,9 @@ def retrieve_or_reuse_cert(): Retrieve certificate from LDAP. If the certificate is not available, reuse the old certificate. """ - csr = os.environ.get('CERTMONGER_CSR') - if not csr: - return (UNCONFIGURED, "Certificate request not provided") - - nickname = pkcs10.get_friendlyname(csr) + nickname = get_nickname() if not nickname: - return (REJECTED, "No friendly name in the certificate request") + return (REJECTED, "Nickname could not be determined") cert = os.environ.get('CERTMONGER_CERTIFICATE') if not cert: diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py index 4df022e..78850d8 100644 --- a/ipalib/pkcs10.py +++ b/ipalib/pkcs10.py @@ -23,8 +23,6 @@ import base64 from cryptography.hazmat.backends import default_backend import cryptography.x509 -from pyasn1.type import univ, namedtype, tag -from pyasn1.codec.der import decoder import six if six.PY3: @@ -34,53 +32,6 @@ DER = 1 -# Unfortunately, NSS can only parse the extension request attribute, so -# we have to parse friendly name ourselves (see RFC 2986) -class _Attribute(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('type', univ.ObjectIdentifier()), - namedtype.NamedType('values', univ.Set()), - ) - -class _Attributes(univ.SetOf): - componentType = _Attribute() - -class _CertificationRequestInfo(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('version', univ.Integer()), - namedtype.NamedType('subject', univ.Sequence()), - namedtype.NamedType('subjectPublicKeyInfo', univ.Sequence()), - namedtype.OptionalNamedType('attributes', _Attributes().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))) - ) - -class _CertificationRequest(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('certificationRequestInfo', - _CertificationRequestInfo()), - namedtype.NamedType('signatureAlgorithm', univ.Sequence()), - namedtype.NamedType('signatureValue', univ.BitString()), - ) - -_FRIENDLYNAME = univ.ObjectIdentifier('1.2.840.113549.1.9.20') - -def get_friendlyname(csr, datatype=PEM): - """ - Given a CSR return the value of the friendlyname attribute, if any. - - The return value is a string. - """ - if datatype == PEM: - csr = strip_header(csr) - csr = base64.b64decode(csr) - - csr = decoder.decode(csr, asn1Spec=_CertificationRequest())[0] - for attribute in csr['certificationRequestInfo']['attributes']: - if attribute['type'] == _FRIENDLYNAME: - return unicode(attribute['values'][0]) - - return None - def strip_header(csr): """ Remove the header and footer from a CSR. @@ -121,4 +72,3 @@ def load_certificate_request(data, datatype=PEM): csr = ''.join(csrlines) print(load_certificate_request(csr)) - print(get_friendlyname(csr)) From f6deaadd84a9c31e0b5deda1ef50f67ca9aa9af0 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <ftwee...@redhat.com> Date: Tue, 11 Oct 2016 12:43:22 +1000 Subject: [PATCH 4/5] x509: avoid use of nss.data_to_hex Avoid use of the nss.data_to_hex function for formatting certificate fingerprints. Add our own helper functions to format the fingerprints as hex (with colons). Part of: https://fedorahosted.org/freeipa/ticket/6398 --- ipalib/x509.py | 23 +++++++++++++++++++++++ ipaserver/plugins/cert.py | 8 ++++---- ipaserver/plugins/service.py | 6 ++++-- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/ipalib/x509.py b/ipalib/x509.py index e67aab6..cac5e9c 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -33,6 +33,7 @@ from __future__ import print_function +import binascii import collections import os import sys @@ -552,6 +553,28 @@ def process_othernames(gns): yield gn +def chunk(size, s): + """Yield chunks of the specified size from the given string. + + The input must be a multiple of the chunk size (otherwise + trailing characters are dropped). + + Works on character strings only. + + """ + return (u''.join(span) for span in six.moves.zip(*[iter(s)] * size)) + + +def add_colons(s): + """Add colons between each nibble pair in a hex string.""" + return u':'.join(chunk(2, s)) + + +def to_hex_with_colons(bs): + """Convert bytes to a hex string with colons.""" + return add_colons(binascii.hexlify(bs).decode('utf-8')) + + if __name__ == '__main__': # this can be run with: # python ipalib/x509.py < /etc/ipa/ca.crt diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 4be96f5..b21f0ad 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -398,10 +398,10 @@ def _parse(self, obj, full=True): if full: obj['valid_not_before'] = unicode(cert.valid_not_before_str) obj['valid_not_after'] = unicode(cert.valid_not_after_str) - obj['md5_fingerprint'] = unicode( - nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) - obj['sha1_fingerprint'] = unicode( - nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + obj['md5_fingerprint'] = x509.to_hex_with_colons( + nss.md5_digest(cert.der_data)) + obj['sha1_fingerprint'] = x509.to_hex_with_colons( + nss.sha1_digest(cert.der_data)) try: ext_san = cert.get_extension(nss.SEC_OID_X509_SUBJECT_ALT_NAME) diff --git a/ipaserver/plugins/service.py b/ipaserver/plugins/service.py index e57ca52..a39ba32 100644 --- a/ipaserver/plugins/service.py +++ b/ipaserver/plugins/service.py @@ -274,8 +274,10 @@ def set_certificate_attrs(entry_attrs): entry_attrs['issuer'] = unicode(cert.issuer) entry_attrs['valid_not_before'] = unicode(cert.valid_not_before_str) entry_attrs['valid_not_after'] = unicode(cert.valid_not_after_str) - entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) - entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + entry_attrs['md5_fingerprint'] = x509.to_hex_with_colons( + nss.md5_digest(cert.der_data)) + entry_attrs['sha1_fingerprint'] = x509.to_hex_with_colons( + nss.sha1_digest(cert.der_data)) def check_required_principal(ldap, principal): """ From 76f37b04429b85947a894d0b06b9f00ee03f0fc3 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <ftwee...@redhat.com> Date: Tue, 11 Oct 2016 17:02:58 +1000 Subject: [PATCH 5/5] x509: use pyasn1-modules X.509 specs Remove our hand-rolled pyasn1 specifications for X.509 in favour of those provided by the pyasn1-modules library. This also avoids a bug in our _Extension spec wherein parsing fails if the 'critical' field is absent. Part of: https://fedorahosted.org/freeipa/ticket/6398 --- ipalib/x509.py | 101 ++++----------------------------------------------------- 1 file changed, 6 insertions(+), 95 deletions(-) diff --git a/ipalib/x509.py b/ipalib/x509.py index cac5e9c..e206802 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -45,6 +45,7 @@ from nss.error import NSPRError from pyasn1.type import univ, char, namedtype, tag from pyasn1.codec.der import decoder, encoder +from pyasn1_modules import rfc2459 import six from ipalib import api @@ -200,49 +201,11 @@ def is_self_signed(certificate, datatype=PEM, dbdir=None): del nsscert return self_signed -class _Name(univ.Choice): - componentType = namedtype.NamedTypes( - namedtype.NamedType('rdnSequence', - univ.SequenceOf()), - ) - -class _TBSCertificate(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType( - 'version', - univ.Integer().subtype(explicitTag=tag.Tag( - tag.tagClassContext, tag.tagFormatSimple, 0))), - namedtype.NamedType('serialNumber', univ.Integer()), - namedtype.NamedType('signature', univ.Sequence()), - namedtype.NamedType('issuer', _Name()), - namedtype.NamedType('validity', univ.Sequence()), - namedtype.NamedType('subject', _Name()), - namedtype.NamedType('subjectPublicKeyInfo', univ.Sequence()), - namedtype.OptionalNamedType( - 'issuerUniquedID', - univ.BitString().subtype(implicitTag=tag.Tag( - tag.tagClassContext, tag.tagFormatSimple, 1))), - namedtype.OptionalNamedType( - 'subjectUniquedID', - univ.BitString().subtype(implicitTag=tag.Tag( - tag.tagClassContext, tag.tagFormatSimple, 2))), - namedtype.OptionalNamedType( - 'extensions', - univ.Sequence().subtype(explicitTag=tag.Tag( - tag.tagClassContext, tag.tagFormatSimple, 3))), - ) - -class _Certificate(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('tbsCertificate', _TBSCertificate()), - namedtype.NamedType('signatureAlgorithm', univ.Sequence()), - namedtype.NamedType('signature', univ.BitString()), - ) def _get_der_field(cert, datatype, dbdir, field): cert = load_certificate(cert, datatype, dbdir) cert = cert.der_data - cert = decoder.decode(cert, _Certificate())[0] + cert = decoder.decode(cert, rfc2459.Certificate())[0] field = cert['tbsCertificate'][field] field = encoder.encode(field) return field @@ -364,77 +327,24 @@ def write_certificate_list(rawcerts, filename): except (IOError, OSError) as e: raise errors.FileError(reason=str(e)) -class _Extension(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('extnID', univ.ObjectIdentifier()), - namedtype.NamedType('critical', univ.Boolean()), - namedtype.NamedType('extnValue', univ.OctetString()), - ) def _encode_extension(oid, critical, value): - ext = _Extension() + ext = rfc2459.Extension() ext['extnID'] = univ.ObjectIdentifier(oid) ext['critical'] = univ.Boolean(critical) ext['extnValue'] = univ.OctetString(value) ext = encoder.encode(ext) return ext -class _ExtKeyUsageSyntax(univ.SequenceOf): - componentType = univ.ObjectIdentifier() def encode_ext_key_usage(ext_key_usage): - eku = _ExtKeyUsageSyntax() + eku = rfc2459.ExtKeyUsageSyntax() for i, oid in enumerate(ext_key_usage): eku[i] = univ.ObjectIdentifier(oid) eku = encoder.encode(eku) return _encode_extension('2.5.29.37', EKU_ANY not in ext_key_usage, eku) -class _AnotherName(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('type-id', univ.ObjectIdentifier()), - namedtype.NamedType('value', univ.Any().subtype( - explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)) - ), - ) - - -class _GeneralName(univ.Choice): - componentType = namedtype.NamedTypes( - namedtype.NamedType('otherName', _AnotherName().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)) - ), - namedtype.NamedType('rfc822Name', char.IA5String().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)) - ), - namedtype.NamedType('dNSName', char.IA5String().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)) - ), - namedtype.NamedType('x400Address', univ.Sequence().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)) - ), - namedtype.NamedType('directoryName', _Name().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4)) - ), - namedtype.NamedType('ediPartyName', univ.Sequence().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 5)) - ), - namedtype.NamedType('uniformResourceIdentifier', char.IA5String().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)) - ), - namedtype.NamedType('iPAddress', univ.OctetString().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 7)) - ), - namedtype.NamedType('registeredID', univ.ObjectIdentifier().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 8)) - ), - ) - - -class _SubjectAltName(univ.SequenceOf): - componentType = _GeneralName() - - class _PrincipalName(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('name-type', univ.Integer().subtype( @@ -488,7 +398,8 @@ def decode_generalnames(secitem): """ nss_names = nss.x509_alt_name(secitem, repr_kind=nss.AsObject) - asn1_names = decoder.decode(secitem.data, asn1Spec=_SubjectAltName())[0] + asn1_names = decoder.decode( + secitem.data, asn1Spec=rfc2459.SubjectAltName())[0] names = [] for nss_name, asn1_name in zip(nss_names, asn1_names): # NOTE: we use the NSS enum to identify the name type.
-- Manage your subscription for the Freeipa-devel mailing list: https://www.redhat.com/mailman/listinfo/freeipa-devel Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code