URL: https://github.com/freeipa/freeipa/pull/717 Author: LiptonB Title: #717: csrgen: Finish NSS support Action: opened
PR body: """ I took the approach of generating a new key for each request, as keys already stored in a database are difficult to name precisely. I also had to add another hook to `CSRLibraryAdaptor` that is called after the cert is returned from the server, so that we could add the cert to the database as desired. """ To pull the PR as Git branch: git remote add ghfreeipa https://github.com/freeipa/freeipa git fetch ghfreeipa pull/717/head:pr717 git checkout pr717
From 171ead1704ef9f5d28b361cb6555dc31540fa4b5 Mon Sep 17 00:00:00 2001 From: Ben Lipton <ben.lip...@gmail.com> Date: Thu, 30 Mar 2017 22:39:40 -0400 Subject: [PATCH] csrgen: Finish NSS support https://pagure.io/freeipa/issue/4899 --- ipaclient/csrgen.py | 130 +++++++++++++++++++++++++++++----------------- ipaclient/plugins/cert.py | 21 +++++++- 2 files changed, 102 insertions(+), 49 deletions(-) diff --git a/ipaclient/csrgen.py b/ipaclient/csrgen.py index 0f52a8b..7724099 100644 --- a/ipaclient/csrgen.py +++ b/ipaclient/csrgen.py @@ -10,16 +10,17 @@ import os.path import pipes import subprocess +import tempfile import traceback import pkg_resources from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.serialization import ( - load_pem_private_key, Encoding, PublicFormat) -from cryptography.x509 import load_pem_x509_certificate + load_pem_private_key, Encoding, NoEncryption, PrivateFormat, PublicFormat) +from cryptography.x509 import load_der_x509_certificate import jinja2 import jinja2.ext import jinja2.sandbox @@ -389,39 +390,28 @@ def csr_config(self, principal, config, profile_id): class CSRLibraryAdaptor(object): - def get_subject_public_key_info(self): - raise NotImplementedError('Use a subclass of CSRLibraryAdaptor') - - def sign_csr(self, certification_request_info): - """Sign a CertificationRequestInfo. + def key(self): + """Return the private key to be used in the cert. - Returns: str, a DER-encoded signed CSR. + Returns: cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey + representing the private key. """ raise NotImplementedError('Use a subclass of CSRLibraryAdaptor') - -class OpenSSLAdaptor(object): - def __init__(self, key_filename, password_filename): - self.key_filename = key_filename - self.password_filename = password_filename - - def key(self): - with open(self.key_filename, 'r') as key_file: - key_bytes = key_file.read() - password = None - if self.password_filename is not None: - with open(self.password_filename, 'r') as password_file: - password = password_file.read().strip() - - key = load_pem_private_key(key_bytes, password, default_backend()) - return key - def get_subject_public_key_info(self): + """Return the public key info for the cert. + + Returns: str, a DER-encoded SubjectPublicKeyInfo structure. + """ pubkey_info = self.key().public_key().public_bytes( Encoding.DER, PublicFormat.SubjectPublicKeyInfo) return pubkey_info def sign_csr(self, certification_request_info): + """Sign a CertificationRequestInfo. + + Returns: str, a DER-encoded signed CSR. + """ reqinfo = decoder.decode( certification_request_info, rfc2314.CertificationRequestInfo())[0] csr = rfc2314.CertificationRequest() @@ -442,32 +432,78 @@ def sign_csr(self, certification_request_info): csr.setComponentByName('signature', asn1sig) return encoder.encode(csr) + def process_cert(self, cert): + """Perform any required post-processing on the certificate.""" -class NSSAdaptor(object): - def __init__(self, database, password_filename): - self.database = database + +class OpenSSLAdaptor(CSRLibraryAdaptor): + def __init__(self, key_filename, password_filename): + self.key_filename = key_filename self.password_filename = password_filename - self.nickname = base64.b32encode(os.urandom(40)) + self._key = None - def get_subject_public_key_info(self): - temp_cn = base64.b32encode(os.urandom(40)) + def key(self): + if self._key is None: + with open(self.key_filename, 'r') as key_file: + key_bytes = key_file.read() + password = None + if self.password_filename is not None: + with open(self.password_filename, 'r') as password_file: + password = password_file.read().strip() - password_args = [] - if self.password_filename is not None: - password_args = ['-f', self.password_filename] + self._key = load_pem_private_key( + key_bytes, password, default_backend()) + return self._key - subprocess.check_call( - ['certutil', '-S', '-n', self.nickname, '-s', 'CN=%s' % temp_cn, - '-x', '-t', ',,', '-d', self.database] + password_args) - cert_pem = subprocess.check_output( - ['certutil', '-L', '-n', self.nickname, '-a', - '-d', self.database] + password_args) - cert = load_pem_x509_certificate(cert_pem, default_backend()) - pubkey_info = cert.public_key().public_bytes( - Encoding.DER, PublicFormat.SubjectPublicKeyInfo) +class NSSAdaptor(CSRLibraryAdaptor): + """Adaptor that stores certificates and keys in an NSS DB. - return pubkey_info + A new key is generated from scratch. Once the certificate is requested, key + and certificate are stored in the database. + """ + def __init__(self, database, nickname, password_filename): + super(NSSAdaptor, self).__init__() + self.database = database + self.nickname = nickname + self.password_filename = password_filename + self._key = None - def sign_csr(self, certification_request_info): - raise NotImplementedError('NSS is not yet supported') + def key(self): + if self._key is None: + self._key = rsa.generate_private_key( + 65537, 2048, default_backend()) + return self._key + + def process_cert(self, cert_der): + cert = load_der_x509_certificate(cert_der, default_backend()) + cert_pem = cert.public_bytes(Encoding.PEM) + key_pem = self.key().private_bytes( + Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()) + + p12_pass = base64.b32encode(os.urandom(40)) + + popen = subprocess.Popen( + ['openssl', 'pkcs12', '-export', + '-passout', 'pass:%s' % p12_pass, '-name', self.nickname], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + p12, _stderr = popen.communicate(key_pem + cert_pem) + if popen.returncode != 0: + raise errors.CertificateOperationError( + error=_('Unable to convert to PKCS #12 format')) + + password_args = [] + if self.password_filename is not None: + password_args = ['-k', self.password_filename] + + with tempfile.NamedTemporaryFile() as p12_file: + p12_file.write(p12) + p12_file.flush() + try: + subprocess.check_call( + ['pk12util', '-i', p12_file.name, '-d', self.database, + '-W', p12_pass] + password_args) + except subprocess.CalledProcessError: + raise errors.CertificateOperationError( + error=_('Unable to save certificate to NSS database')) diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py index a4ee9a9..0a285d0 100644 --- a/ipaclient/plugins/cert.py +++ b/ipaclient/plugins/cert.py @@ -77,6 +77,11 @@ class cert_request(CertRetrieveOverride): doc=_('Path to NSS database to use for private key'), ), Str( + 'nickname?', + label=_('Nickname for new cert'), + doc=_('Nickname for new cert in NSS database'), + ), + Str( 'private_key?', label=_('Path to private key file'), doc=_('Path to PEM file containing a private key'), @@ -101,13 +106,19 @@ def get_args(self): def forward(self, csr=None, **options): database = options.pop('database', None) + nickname = options.pop('nickname', None) private_key = options.pop('private_key', None) csr_profile_id = options.pop('csr_profile_id', None) password_file = options.pop('password_file', None) + adaptor = None + if csr is None: if database: - adaptor = csrgen.NSSAdaptor(database, password_file) + if nickname is None: + raise errors.InvocationError( + "'database' was specified, 'nickname' must be as well") + adaptor = csrgen.NSSAdaptor(database, nickname, password_file) elif private_key: adaptor = csrgen.OpenSSLAdaptor(private_key, password_file) else: @@ -147,7 +158,13 @@ def forward(self, csr=None, **options): "Options 'database' and 'private_key' are not compatible" " with 'csr'")) - return super(cert_request, self).forward(csr, **options) + result = super(cert_request, self).forward(csr, **options) + + if adaptor is not None and 'certificate' in result['result']: + adaptor.process_cert(base64.b64decode( + result['result']['certificate'])) + + return result @register(override=True, no_fail=True)
-- Manage your subscription for the Freeipa-devel mailing list: https://www.redhat.com/mailman/listinfo/freeipa-devel Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code