Author: Gustavo
Date: Sat Nov 22 08:26:53 2008
New Revision: 5743
URL: http://trac.turbogears.org/changeset/5743

Log:
- Added *tons* of tests to verify the integration of repoze.who and repoze.what.
 - Moved the @require decorator from repoze.what into TG itself.
 - Synchronized the used services of repoze.what according to the latest API 
changes to make it independent of TG.

Added:
   trunk/tg/test_stack/auth_base.py   (contents, props changed)
   trunk/tg/test_stack/fixture/
   trunk/tg/test_stack/fixture/__init__.py   (contents, props changed)
   trunk/tg/test_stack/fixture/session/   (props changed)
   trunk/tg/test_stack/test_auth.py   (contents, props changed)
Modified:
   trunk/tg/__init__.py
   trunk/tg/controllers.py
   trunk/tg/decorators.py

Modified: trunk/tg/__init__.py
==============================================================================
--- trunk/tg/__init__.py        (original)
+++ trunk/tg/__init__.py        Sat Nov 22 08:26:53 2008
@@ -53,7 +53,8 @@
 from tg.wsgiapp import TGApp
 from tg.controllers import TGController, redirect, url, use_wsgi_app
 from tg.configuration import config
-from tg.decorators import validate, expose, override_template, paginate, 
postpone_commits
+from tg.decorators import validate, expose, override_template, paginate, \
+                          postpone_commits, require
 from tg.flash import flash, get_flash, get_status
 
 from pylons import g, request, response, tmpl_context, session
@@ -61,5 +62,5 @@
 __all__ = [
     'expose', 'validate', 'TGController', 'tmpl_context', 'app_globals',
     'overide_template', 'request', 'response', 
'session','TurboGearsApplication',
-    'use_wsgi_app', 'TGApp', 'app_globals'
+    'use_wsgi_app', 'TGApp', 'app_globals', 'require'
 ]

Modified: trunk/tg/controllers.py
==============================================================================
--- trunk/tg/controllers.py     (original)
+++ trunk/tg/controllers.py     Sat Nov 22 08:26:53 2008
@@ -514,10 +514,9 @@
     def check_security(self):
         errors = []
         environ = pylons.request.environ
-        identity = environ.get('repoze.who.identity')
         if not hasattr(self, "require") or \
             self.require is None or \
-            self.require.eval_with_object(identity, errors):
+            self.require.eval_with_environ(environ, errors):
             return True
 
         # if we did not return this is an error :)

Modified: trunk/tg/decorators.py
==============================================================================
--- trunk/tg/decorators.py      (original)
+++ trunk/tg/decorators.py      Sat Nov 22 08:26:53 2008
@@ -9,13 +9,17 @@
 from paste.util.mimeparse import best_match
 from decorator import decorator
 
+from webob.exc import HTTPUnauthorized
 from webob.multidict import MultiDict
 from webhelpers.paginate import Page
-from tg.configuration import Bunch
 # this can't be tg, as we are circular importing then!
 from pylons import config, request
 from pylons import tmpl_context as c
 from util import partial
+from repoze.what.authorize import check_authorization, NotAuthorizedError
+
+from tg.configuration import Bunch
+from tg.flash import flash
 
 class Decoration(object):
     """ Simple class to support 'simple registration' type decorators
@@ -208,6 +212,7 @@
             self.content_type, self.engine, self.template, self.exclude_names)
         return func
 
+
 def override_template(controller, template):
     """Use overide_template in a controller in order to change the
     template that will be used to render the response dictionary
@@ -343,5 +348,26 @@
     retval = func(*args, **kwargs)
     s.commit = old_commit
     return retval
+
+
+def require(predicate):
+    """
+    Make repoze.what verify that the predicate is met.
+    
+    @param predicate: A repoze.what predicate.
+    @return: The decorator that checks authorization.
+    
+    """
+    
+    @decorator
+    def check_auth(func, *args, **kwargs):
+        environ = request.environ
+        try:
+            check_authorization(predicate, environ)
+        except NotAuthorizedError, e:
+            flash(e.errors, status="status_error")
+            raise HTTPUnauthorized()
+        return func(*args, **kwargs)
     
+    return check_auth
 

Added: trunk/tg/test_stack/auth_base.py
==============================================================================
--- (empty file)
+++ trunk/tg/test_stack/auth_base.py    Sat Nov 22 08:26:53 2008
@@ -0,0 +1,200 @@
+# -*- coding: utf-8 -*-
+
+"""Stuff required to the authentication and authorization."""
+
+import os, shutil
+from unittest import TestCase
+from xmlrpclib import loads, dumps
+
+import webob
+import beaker
+import pylons
+from paste.registry import Registry, RegistryManager
+from paste.fixture import TestApp
+from paste.wsgiwrappers import WSGIRequest, WSGIResponse
+from paste import httpexceptions
+
+from tg import tmpl_context
+from pylons.util import ContextObj, PylonsContext
+from pylons.controllers.util import Request, Response
+from tg.controllers import TGController
+from pylons.testutil import ControllerWrap, SetupCacheGlobal
+#import pylons.tests
+
+from beaker.middleware import CacheMiddleware
+
+from repoze.what.middleware import setup_auth
+from repoze.what.adapters import BaseSourceAdapter
+from pylons import config
+
+data_dir = os.path.dirname(os.path.abspath(__file__))
+session_dir = os.path.join(data_dir, 'fixture', 'session')
+
+def setup_session_dir():
+    if not os.path.exists(session_dir):
+        os.makedirs(session_dir)
+
+def teardown_session_dir():
+    shutil.rmtree(session_dir, ignore_errors=True)
+
+default_environ = {
+    'pylons.use_webob' : True,
+    'pylons.routes_dict': dict(action='index'),
+    'paste.config': dict(global_conf=dict(debug=True))
+}
+
+
+class FakeAuthenticator(object):
+    """Fake repoze.who authenticator plugin"""
+    credentials = {
+        u'rms': u'freedom',
+        u'linus': u'linux',
+        u'sballmer': u'developers',
+        u'guido': u'pythonic',
+        u'rasmus': u'php'
+        }
+
+    def authenticate(self, environ, identity):
+        login = identity['login']
+        pass_ = identity['password']
+        if login in self.credentials and pass_ == self.credentials[login]:
+            return login
+
+
+class FakeGroupSourceAdapter(BaseSourceAdapter):
+    """Mock group source adapter"""
+
+    def __init__(self):
+        super(FakeGroupSourceAdapter, self).__init__()
+        self.fake_sections = {
+            u'admins': set([u'rms']),
+            u'developers': set([u'rms', u'linus']),
+            u'trolls': set([u'sballmer']),
+            u'python': set(),
+            u'php': set()
+            }
+
+    def _get_all_sections(self):
+        return self.fake_sections
+
+    def _get_section_items(self, section):
+        return self.fake_sections[section]
+
+    def _find_sections(self, identity):
+        username = identity['repoze.who.userid']
+        return set([n for (n, g) in self.fake_sections.items()
+                    if username in g])
+
+    def _include_items(self, section, items):
+        self.fake_sections[section] |= items
+
+    def _exclude_items(self, section, items):
+        for item in items:
+            self.fake_sections[section].remove(item)
+
+    def _item_is_included(self, section, item):
+        return item in self.fake_sections[section]
+
+    def _create_section(self, section):
+        self.fake_sections[section] = set()
+
+    def _edit_section(self, section, new_section):
+        self.fake_sections[new_section] = self.fake_sections[section]
+        del self.fake_sections[section]
+
+    def _delete_section(self, section):
+        del self.fake_sections[section]
+
+    def _section_exists(self, section):
+        return self.fake_sections.has_key(section)
+
+
+class FakePermissionSourceAdapter(FakeGroupSourceAdapter):
+    """Mock permissions source adapter"""
+
+    def __init__(self):
+        super(FakePermissionSourceAdapter, self).__init__()
+        self.fake_sections = {
+            u'see-site': set([u'trolls']),
+            u'edit-site': set([u'admins', u'developers']),
+            u'commit': set([u'developers'])
+            }
+
+    def _find_sections(self, group_name):
+        return set([n for (n, p) in self.fake_sections.items()
+                    if group_name in p])
+
+
+def make_app(controller_klass=None, environ=None):
+    """Creates a `TestApp` instance."""
+    if environ is None:
+        environ = {}
+    environ['pylons.routes_dict'] = {}
+    environ['pylons.routes_dict']['action'] = "routes_placeholder"
+
+    if controller_klass is None:
+        controller_klass = TGController
+
+    app = ControllerWrap(controller_klass)
+    app = SetupCacheGlobal(app, environ, setup_cache=True, setup_session=True)
+    app = RegistryManager(app)
+    app = beaker.middleware.SessionMiddleware(app, {}, data_dir=session_dir)
+    app = CacheMiddleware(app, {}, data_dir=os.path.join(data_dir, 'cache'))
+
+    # Setting up the source adapters:
+    groups_adapters = {'my_groups': FakeGroupSourceAdapter()}
+    permissions_adapters = {'my_permissions': FakePermissionSourceAdapter()}
+    authenticators = (('auth', FakeAuthenticator()), )
+    app = setup_auth(app, groups_adapters, permissions_adapters, 
authenticators)
+
+    app = httpexceptions.make_middleware(app)
+    return TestApp(app)
+
+
+def create_request(path, environ=None):
+    """Helper used in test cases to quickly setup a request obj.
+
+    ``path``
+        The path will become PATH_INFO
+    ``environ``
+        Additional environment
+
+    Returns an instance of the `webob.Request` object.
+    """
+    # setup the environ
+    if environ is None:
+        environ = {}
+    environ.update(default_environ)
+    # create a "blank" WebOb Request object
+    # using Pylon's Request which is a webob Request plus
+    # some compatibility methods
+    req = Request.blank(path, environ)
+    # setup a Registry
+    reg = environ.setdefault('paste.registry', Registry())
+    reg.prepare()
+    # setup pylons.request to point to our Registry
+    reg.register(pylons.request, req)
+    # setup tmpl context
+    tmpl_context._push_object(ContextObj())
+    return req
+
+class TestWSGIController(TestCase):
+
+    def setUp(self):
+        tmpl_context._push_object(ContextObj())
+        # Without the line below, it will fail because TG2 will try to use the
+        # deprecated Buffet package.
+        config['use_legacy_renderer'] = False
+
+    def tearDown(self):
+        tmpl_context._pop_object()
+
+    def get_response(self, **kargs):
+        url = kargs.pop('_url', '/')
+        self.environ['pylons.routes_dict'].update(kargs)
+        return self.app.get(url, extra_environ=self.environ)
+
+    def post_response(self, **kargs):
+        url = kargs.pop('_url', '/')
+        self.environ['pylons.routes_dict'].update(kargs)
+        return self.app.post(url, extra_environ=self.environ, params=kargs)

Added: trunk/tg/test_stack/fixture/__init__.py
==============================================================================

Added: trunk/tg/test_stack/test_auth.py
==============================================================================
--- (empty file)
+++ trunk/tg/test_stack/test_auth.py    Sat Nov 22 08:26:53 2008
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+"""
+Tests for the integration of repoze.who and repoze.what into TG.
+
+"""
+
+import tg, pylons
+from tg.controllers import TGController
+from tg.decorators import expose, require
+from repoze.what import authorize
+from nose.tools import eq_
+
+from auth_base import TestWSGIController, make_app, setup_session_dir, \
+                      teardown_session_dir
+
+
+def setup():
+    setup_session_dir()
+
+
+def _teardown():
+    teardown_session_dir()
+
+
+class SubController1(TGController):
+    """Mock TG2 subcontroller"""
+    
+    @expose()
+    def index(self):
+        return 'hello sub1'
+    
+    @expose()
+    def in_group(self):
+        return 'in group'
+
+
+class BasicTGController(TGController):
+    """Mock TG2 controller"""
+
+    sub1 = SubController1()
+    
+    @expose()
+    def index(self, **kwargs):
+        return 'hello world'
+
+    @expose()
+    def default(self, remainder):
+        return "Main Default Page called for url /%s" % remainder
+    
+    @expose()
+    @require(authorize.in_group('admins'))
+    def admin(self):
+        return 'got to admin'
+    
+    @expose()
+    @require(authorize.in_all_groups('developers', 'admins'))
+    def all_groups(self):
+        return 'got to all groups'
+
+    @expose()
+    @require(authorize.in_any_group('php', 'trolls'))
+    def any_groups(self):
+        return 'got to any groups'
+
+    @expose()
+    @require(authorize.is_user('rms'))
+    def rms_user(self):
+        return 'got to promote freedomware'
+    
+    @expose()
+    @require(authorize.has_permission('edit-site'))
+    def editsite_perm_only(self):
+        return 'got to edit'
+    
+    @expose()
+    @require(authorize.has_any_permission('commit'))
+    def commit_perm(self):
+        return 'got to commit'
+
+    @expose()
+    @require(authorize.has_all_permissions('commit', 'edit-site'))
+    def all_perm(self):
+        return 'got to all perm'
+
+    @expose()
+    @require(authorize.not_anonymous())
+    def not_anon(self):
+        return 'got to not anon'
+    
+    @expose()
+    def redirect_me(self, target, **kw):
+        tg.redirect(target, kw)
+
+
+class TestTGController(TestWSGIController):
+    """Test case for the mock TG controller and its subcontroller"""
+    
+    def __init__(self, *args, **kargs):
+        TestWSGIController.__init__(self, *args, **kargs)
+        self.app = make_app(BasicTGController)
+    
+    def _test_index(self):
+        resp = self.app.get('/index/')
+        assert 'hello' in resp.body
+    
+    def test_group_no_auth(self):
+        resp = self.app.get('/admin')
+        assert resp.body.startswith('302 Found'), resp.body
+    
+    def test_group_with_auth(self):
+        resp = self.app.get('/login_handler?login=rms&password=freedom')
+        resp = self.app.get('/admin')
+        eq_(resp.body, 'got to admin')
+
+    def test_all_groups_no_auth(self):
+        resp = self.app.get('/login_handler?login=rasmus&password=php')
+        resp = self.app.get('/all_groups')
+        assert resp.body.startswith('302 Found'), resp.body
+    
+    def test_all_groups(self):
+        resp = self.app.get('/login_handler?login=rms&password=freedom')
+        resp = self.app.get('/all_groups')
+        eq_(resp.body, 'got to all groups')
+
+    def test_any_groups_no_auth(self):
+        resp = self.app.get('/login_handler?login=linus&password=freedomware')
+        resp = self.app.get('/any_groups')
+        assert resp.body.startswith('302 Found'), resp.body
+    
+    def test_any_groups(self):
+        resp = 
self.app.get('/login_handler?login=sballmer&password=developers')
+        resp = self.app.get('/any_groups')
+        eq_(resp.body, 'got to any groups')
+
+    def test_no_auth_not_anon(self):
+        resp = self.app.get('/not_anon')
+        assert resp.body.startswith('302 Found'), resp.body
+
+    def test_not_anon(self):
+        resp = self.app.get('/login_handler?login=linus&password=linux')
+        resp = self.app.get('/not_anon')
+        eq_(resp.body, 'got to not anon')
+
+    def test_no_auth_is_user(self):
+        resp = 
self.app.get('/login_handler?login=sballmer&password=developers')
+        resp = self.app.get('/rms_user')
+        assert resp.body.startswith('302 Found'), resp.body
+
+    def test_is_user(self):
+        resp = self.app.get('/login_handler?login=rms&password=freedom')
+        resp = self.app.get('/rms_user')
+        eq_(resp.body, 'got to promote freedomware')
+
+    def test_no_auth_perm(self):
+        resp = 
self.app.get('/login_handler?login=sballmer&password=developers')
+        resp = self.app.get('/editsite_perm_only')
+        assert resp.body.startswith('302 Found'), resp.body
+
+    def test_perm(self):
+        resp = self.app.get('/login_handler?login=rms&password=freedom')
+        resp = self.app.get('/editsite_perm_only')
+        eq_(resp.body, 'got to edit')
+
+    def test_no_auth_any_perm(self):
+        resp = self.app.get('/login_handler?login=linus&password=freedomware')
+        resp = self.app.get('/commit_perm')
+        assert resp.body.startswith('302 Found'), resp.body
+
+    def test_any_perm(self):
+        resp = self.app.get('/login_handler?login=rms&password=freedom')
+        resp = self.app.get('/commit_perm')
+        eq_(resp.body, 'got to commit')
+
+    def test_no_auth_all_perm(self):
+        resp = self.app.get('/login_handler?login=linus&password=freedomware')
+        resp = self.app.get('/all_perm')
+        assert resp.body.startswith('302 Found'), resp.body
+
+    def test_all_perm(self):
+        resp = self.app.get('/login_handler?login=linus&password=linux')
+        resp = self.app.get('/all_perm')
+        eq_(resp.body, 'got to all perm')
+        
+    def test_sub_in_admin(self):
+        resp = 
self.app.get('/login_handler?login=sballmer&password=developers')
+        resp = self.app.get('/sub1/in_group')
+        eq_(resp.body, 'in group')

Reply via email to