URL: https://github.com/freeipa/freeipa/pull/812 Author: felipevolpone Title: #812: Refactoring cert-find to use API call directly instead of using Action: opened
PR body: """ Refactoring cert-find to use API calls directly instead of using raw LDAP search. Upstream ticket: https://pagure.io/freeipa/issue/6948 I removed the raw LDAP search and used the API directly. In the old code, the call ` self.obj._owners()` returns `service, hots and user`. However, when testing the code, only the service was being used, so I made it only use the service API. If there another scenario where `user and host` are used, I thought to do something like: ```python for owner in self.obj._owners(): api_name = owner.name response = api.Command[api_name+'_find'](options[api_name]) ... # continues ``` Is that correct? """ To pull the PR as Git branch: git remote add ghfreeipa https://github.com/freeipa/freeipa git fetch ghfreeipa pull/812/head:pr812 git checkout pr812
From c5397bf416953674937b3e23f4def73e0fb61b03 Mon Sep 17 00:00:00 2001 From: Felipe Volpone <fbarr...@redhat.com> Date: Wed, 24 May 2017 15:33:34 -0300 Subject: [PATCH] Refactoring cert-find to use API call directly instead of using raw LDAP searchs. https://pagure.io/freeipa/issue/6948 --- ipaserver/plugins/cert.py | 92 +++++++++-------------------------------------- 1 file changed, 17 insertions(+), 75 deletions(-) diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index fbda6ca6ca..796f9aad04 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -1500,86 +1500,28 @@ def _ca_search(self, raw, pkey_only, exactly, **options): return result, False, complete - def _ldap_search(self, all, pkey_only, no_members, **options): + def _service_search(self, all, pkey_only, no_members, **options): ldap = self.api.Backend.ldap2 - - filters = [] - for owner in self.obj._owners(): - for prefix, rule in (('', ldap.MATCH_ALL), - ('no_', ldap.MATCH_NONE)): - try: - value = options[prefix + owner.name] - except KeyError: - continue - - filter = ldap.make_filter_from_attr( - 'objectclass', - owner.object_class, - ldap.MATCH_ALL) - if filter not in filters: - filters.append(filter) - - filter = ldap.make_filter_from_attr( - owner.primary_key.name, - value, - rule) - filters.append(filter) - + + principal = unicode(options['service'][0]) + response = api.Command['service_find'](principal) + result = collections.OrderedDict() - complete = bool(filters) - - cert = options.get('certificate') - if cert is not None: - filter = ldap.make_filter_from_attr('usercertificate', cert) - else: - filter = '(usercertificate=*)' - filters.append(filter) - - filter = ldap.combine_filters(filters, ldap.MATCH_ALL) - try: - entries, truncated = ldap.find_entries( - base_dn=self.api.env.basedn, - filter=filter, - attrs_list=['usercertificate'], - time_limit=0, - size_limit=0, - ) - except errors.EmptyResult: - entries = [] - truncated = False - else: - try: - ldap.handle_truncated_result(truncated) - except errors.LimitsExceeded as e: - self.add_message(messages.SearchResultTruncated(reason=e)) - - truncated = bool(truncated) - - for entry in entries: - for attr in ('usercertificate', 'usercertificate;binary'): - for cert in entry.get(attr, []): - try: - issuer, serial_number = self._get_cert_key(cert) - except ValueError: - truncated = True - continue - - try: - obj = result[issuer, serial_number] - except KeyError: - obj = {'serial_number': serial_number} - if not pkey_only and all: - obj['certificate'] = ( - base64.b64encode(cert).decode('ascii')) - result[issuer, serial_number] = obj + complete = True if response['count'] >= 1 else False + truncated = False + + if complete: + cert = response['result'][0]['usercertificate'] + key = self._get_cert_key(cert) + content = self._get_cert_obj(cert[0], all, raw, pkey_only) + result[key] = content - if not pkey_only and (all or not no_members): - owners = obj.setdefault('owner', []) - if entry.dn not in owners: - owners.append(entry.dn) + for obj in six.itervalues(result): + self.obj._fill_owners(obj) return result, truncated, complete + def execute(self, criteria=None, all=False, raw=False, pkey_only=False, no_members=True, timelimit=None, sizelimit=None, **options): if 'cacn' in options: @@ -1608,7 +1550,7 @@ def execute(self, criteria=None, all=False, raw=False, pkey_only=False, for sub_search in (self._cert_search, self._ca_search, - self._ldap_search): + self._service_search): sub_result, sub_truncated, sub_complete = sub_search( all=all, raw=raw,
_______________________________________________ FreeIPA-devel mailing list -- freeipa-devel@lists.fedorahosted.org To unsubscribe send an email to freeipa-devel-le...@lists.fedorahosted.org