URL: https://github.com/freeipa/freeipa/pull/812
Author: felipevolpone
 Title: #812: Refactoring cert-find to use API call directly instead of using
Action: opened

PR body:
"""
Refactoring cert-find to use API calls directly instead of using raw LDAP 
search.

Upstream ticket: https://pagure.io/freeipa/issue/6948

I removed the raw LDAP search and used the API directly. In the old code, the 
call ` self.obj._owners()` returns `service, hots and user`. However, when 
testing the code, only the service was being used, so I made it only use the 
service API. 

If there another scenario where `user and host` are used, I thought to do 
something like:

```python
for owner in self.obj._owners():
    api_name = owner.name
    response = api.Command[api_name+'_find'](options[api_name])
    ...  # continues
```
Is that correct?
"""

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/812/head:pr812
git checkout pr812
From c5397bf416953674937b3e23f4def73e0fb61b03 Mon Sep 17 00:00:00 2001
From: Felipe Volpone <fbarr...@redhat.com>
Date: Wed, 24 May 2017 15:33:34 -0300
Subject: [PATCH] Refactoring cert-find to use API call directly instead of
 using raw LDAP searchs.

https://pagure.io/freeipa/issue/6948
---
 ipaserver/plugins/cert.py | 92 +++++++++--------------------------------------
 1 file changed, 17 insertions(+), 75 deletions(-)

diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index fbda6ca6ca..796f9aad04 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -1500,86 +1500,28 @@ def _ca_search(self, raw, pkey_only, exactly, **options):
 
         return result, False, complete
 
-    def _ldap_search(self, all, pkey_only, no_members, **options):
+    def _service_search(self, all, pkey_only, no_members, **options):
         ldap = self.api.Backend.ldap2
-
-        filters = []
-        for owner in self.obj._owners():
-            for prefix, rule in (('', ldap.MATCH_ALL),
-                                 ('no_', ldap.MATCH_NONE)):
-                try:
-                    value = options[prefix + owner.name]
-                except KeyError:
-                    continue
-
-                filter = ldap.make_filter_from_attr(
-                    'objectclass',
-                    owner.object_class,
-                    ldap.MATCH_ALL)
-                if filter not in filters:
-                    filters.append(filter)
-
-                filter = ldap.make_filter_from_attr(
-                    owner.primary_key.name,
-                    value,
-                    rule)
-                filters.append(filter)
-
+        
+	principal = unicode(options['service'][0])
+        response = api.Command['service_find'](principal)
+        
         result = collections.OrderedDict()
-        complete = bool(filters)
-
-        cert = options.get('certificate')
-        if cert is not None:
-            filter = ldap.make_filter_from_attr('usercertificate', cert)
-        else:
-            filter = '(usercertificate=*)'
-        filters.append(filter)
-
-        filter = ldap.combine_filters(filters, ldap.MATCH_ALL)
-        try:
-            entries, truncated = ldap.find_entries(
-                base_dn=self.api.env.basedn,
-                filter=filter,
-                attrs_list=['usercertificate'],
-                time_limit=0,
-                size_limit=0,
-            )
-        except errors.EmptyResult:
-            entries = []
-            truncated = False
-        else:
-            try:
-                ldap.handle_truncated_result(truncated)
-            except errors.LimitsExceeded as e:
-                self.add_message(messages.SearchResultTruncated(reason=e))
-
-            truncated = bool(truncated)
-
-        for entry in entries:
-            for attr in ('usercertificate', 'usercertificate;binary'):
-                for cert in entry.get(attr, []):
-                    try:
-                        issuer, serial_number = self._get_cert_key(cert)
-                    except ValueError:
-                        truncated = True
-                        continue
-
-                    try:
-                        obj = result[issuer, serial_number]
-                    except KeyError:
-                        obj = {'serial_number': serial_number}
-                        if not pkey_only and all:
-                            obj['certificate'] = (
-                                base64.b64encode(cert).decode('ascii'))
-                        result[issuer, serial_number] = obj
+        complete = True if response['count'] >= 1 else False
+        truncated = False
+    
+	if complete:
+	    cert = response['result'][0]['usercertificate']
+	    key = self._get_cert_key(cert)
+	    content = self._get_cert_obj(cert[0], all, raw, pkey_only)
+	    result[key] = content
 
-                    if not pkey_only and (all or not no_members):
-                        owners = obj.setdefault('owner', [])
-                        if entry.dn not in owners:
-                            owners.append(entry.dn)
+	    for obj in six.itervalues(result):
+		self.obj._fill_owners(obj)
 
         return result, truncated, complete
 
+
     def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
                 no_members=True, timelimit=None, sizelimit=None, **options):
         if 'cacn' in options:
@@ -1608,7 +1550,7 @@ def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
 
         for sub_search in (self._cert_search,
                            self._ca_search,
-                           self._ldap_search):
+                           self._service_search):
             sub_result, sub_truncated, sub_complete = sub_search(
                 all=all,
                 raw=raw,
_______________________________________________
FreeIPA-devel mailing list -- freeipa-devel@lists.fedorahosted.org
To unsubscribe send an email to freeipa-devel-le...@lists.fedorahosted.org

Reply via email to