Author: Gustavo
Date: Sat Nov 22 08:26:53 2008
New Revision: 5743
URL: http://trac.turbogears.org/changeset/5743
Log:
- Added *tons* of tests to verify the integration of repoze.who and repoze.what.
- Moved the @require decorator from repoze.what into TG itself.
- Synchronized the used services of repoze.what according to the latest API
changes to make it independent of TG.
Added:
trunk/tg/test_stack/auth_base.py (contents, props changed)
trunk/tg/test_stack/fixture/
trunk/tg/test_stack/fixture/__init__.py (contents, props changed)
trunk/tg/test_stack/fixture/session/ (props changed)
trunk/tg/test_stack/test_auth.py (contents, props changed)
Modified:
trunk/tg/__init__.py
trunk/tg/controllers.py
trunk/tg/decorators.py
Modified: trunk/tg/__init__.py
==============================================================================
--- trunk/tg/__init__.py (original)
+++ trunk/tg/__init__.py Sat Nov 22 08:26:53 2008
@@ -53,7 +53,8 @@
from tg.wsgiapp import TGApp
from tg.controllers import TGController, redirect, url, use_wsgi_app
from tg.configuration import config
-from tg.decorators import validate, expose, override_template, paginate,
postpone_commits
+from tg.decorators import validate, expose, override_template, paginate, \
+ postpone_commits, require
from tg.flash import flash, get_flash, get_status
from pylons import g, request, response, tmpl_context, session
@@ -61,5 +62,5 @@
__all__ = [
'expose', 'validate', 'TGController', 'tmpl_context', 'app_globals',
'overide_template', 'request', 'response',
'session','TurboGearsApplication',
- 'use_wsgi_app', 'TGApp', 'app_globals'
+ 'use_wsgi_app', 'TGApp', 'app_globals', 'require'
]
Modified: trunk/tg/controllers.py
==============================================================================
--- trunk/tg/controllers.py (original)
+++ trunk/tg/controllers.py Sat Nov 22 08:26:53 2008
@@ -514,10 +514,9 @@
def check_security(self):
errors = []
environ = pylons.request.environ
- identity = environ.get('repoze.who.identity')
if not hasattr(self, "require") or \
self.require is None or \
- self.require.eval_with_object(identity, errors):
+ self.require.eval_with_environ(environ, errors):
return True
# if we did not return this is an error :)
Modified: trunk/tg/decorators.py
==============================================================================
--- trunk/tg/decorators.py (original)
+++ trunk/tg/decorators.py Sat Nov 22 08:26:53 2008
@@ -9,13 +9,17 @@
from paste.util.mimeparse import best_match
from decorator import decorator
+from webob.exc import HTTPUnauthorized
from webob.multidict import MultiDict
from webhelpers.paginate import Page
-from tg.configuration import Bunch
# this can't be tg, as we are circular importing then!
from pylons import config, request
from pylons import tmpl_context as c
from util import partial
+from repoze.what.authorize import check_authorization, NotAuthorizedError
+
+from tg.configuration import Bunch
+from tg.flash import flash
class Decoration(object):
""" Simple class to support 'simple registration' type decorators
@@ -208,6 +212,7 @@
self.content_type, self.engine, self.template, self.exclude_names)
return func
+
def override_template(controller, template):
"""Use overide_template in a controller in order to change the
template that will be used to render the response dictionary
@@ -343,5 +348,26 @@
retval = func(*args, **kwargs)
s.commit = old_commit
return retval
+
+
+def require(predicate):
+ """
+ Make repoze.what verify that the predicate is met.
+
+ @param predicate: A repoze.what predicate.
+ @return: The decorator that checks authorization.
+
+ """
+
+ @decorator
+ def check_auth(func, *args, **kwargs):
+ environ = request.environ
+ try:
+ check_authorization(predicate, environ)
+ except NotAuthorizedError, e:
+ flash(e.errors, status="status_error")
+ raise HTTPUnauthorized()
+ return func(*args, **kwargs)
+ return check_auth
Added: trunk/tg/test_stack/auth_base.py
==============================================================================
--- (empty file)
+++ trunk/tg/test_stack/auth_base.py Sat Nov 22 08:26:53 2008
@@ -0,0 +1,200 @@
+# -*- coding: utf-8 -*-
+
+"""Stuff required to the authentication and authorization."""
+
+import os, shutil
+from unittest import TestCase
+from xmlrpclib import loads, dumps
+
+import webob
+import beaker
+import pylons
+from paste.registry import Registry, RegistryManager
+from paste.fixture import TestApp
+from paste.wsgiwrappers import WSGIRequest, WSGIResponse
+from paste import httpexceptions
+
+from tg import tmpl_context
+from pylons.util import ContextObj, PylonsContext
+from pylons.controllers.util import Request, Response
+from tg.controllers import TGController
+from pylons.testutil import ControllerWrap, SetupCacheGlobal
+#import pylons.tests
+
+from beaker.middleware import CacheMiddleware
+
+from repoze.what.middleware import setup_auth
+from repoze.what.adapters import BaseSourceAdapter
+from pylons import config
+
+data_dir = os.path.dirname(os.path.abspath(__file__))
+session_dir = os.path.join(data_dir, 'fixture', 'session')
+
+def setup_session_dir():
+ if not os.path.exists(session_dir):
+ os.makedirs(session_dir)
+
+def teardown_session_dir():
+ shutil.rmtree(session_dir, ignore_errors=True)
+
+default_environ = {
+ 'pylons.use_webob' : True,
+ 'pylons.routes_dict': dict(action='index'),
+ 'paste.config': dict(global_conf=dict(debug=True))
+}
+
+
+class FakeAuthenticator(object):
+ """Fake repoze.who authenticator plugin"""
+ credentials = {
+ u'rms': u'freedom',
+ u'linus': u'linux',
+ u'sballmer': u'developers',
+ u'guido': u'pythonic',
+ u'rasmus': u'php'
+ }
+
+ def authenticate(self, environ, identity):
+ login = identity['login']
+ pass_ = identity['password']
+ if login in self.credentials and pass_ == self.credentials[login]:
+ return login
+
+
+class FakeGroupSourceAdapter(BaseSourceAdapter):
+ """Mock group source adapter"""
+
+ def __init__(self):
+ super(FakeGroupSourceAdapter, self).__init__()
+ self.fake_sections = {
+ u'admins': set([u'rms']),
+ u'developers': set([u'rms', u'linus']),
+ u'trolls': set([u'sballmer']),
+ u'python': set(),
+ u'php': set()
+ }
+
+ def _get_all_sections(self):
+ return self.fake_sections
+
+ def _get_section_items(self, section):
+ return self.fake_sections[section]
+
+ def _find_sections(self, identity):
+ username = identity['repoze.who.userid']
+ return set([n for (n, g) in self.fake_sections.items()
+ if username in g])
+
+ def _include_items(self, section, items):
+ self.fake_sections[section] |= items
+
+ def _exclude_items(self, section, items):
+ for item in items:
+ self.fake_sections[section].remove(item)
+
+ def _item_is_included(self, section, item):
+ return item in self.fake_sections[section]
+
+ def _create_section(self, section):
+ self.fake_sections[section] = set()
+
+ def _edit_section(self, section, new_section):
+ self.fake_sections[new_section] = self.fake_sections[section]
+ del self.fake_sections[section]
+
+ def _delete_section(self, section):
+ del self.fake_sections[section]
+
+ def _section_exists(self, section):
+ return self.fake_sections.has_key(section)
+
+
+class FakePermissionSourceAdapter(FakeGroupSourceAdapter):
+ """Mock permissions source adapter"""
+
+ def __init__(self):
+ super(FakePermissionSourceAdapter, self).__init__()
+ self.fake_sections = {
+ u'see-site': set([u'trolls']),
+ u'edit-site': set([u'admins', u'developers']),
+ u'commit': set([u'developers'])
+ }
+
+ def _find_sections(self, group_name):
+ return set([n for (n, p) in self.fake_sections.items()
+ if group_name in p])
+
+
+def make_app(controller_klass=None, environ=None):
+ """Creates a `TestApp` instance."""
+ if environ is None:
+ environ = {}
+ environ['pylons.routes_dict'] = {}
+ environ['pylons.routes_dict']['action'] = "routes_placeholder"
+
+ if controller_klass is None:
+ controller_klass = TGController
+
+ app = ControllerWrap(controller_klass)
+ app = SetupCacheGlobal(app, environ, setup_cache=True, setup_session=True)
+ app = RegistryManager(app)
+ app = beaker.middleware.SessionMiddleware(app, {}, data_dir=session_dir)
+ app = CacheMiddleware(app, {}, data_dir=os.path.join(data_dir, 'cache'))
+
+ # Setting up the source adapters:
+ groups_adapters = {'my_groups': FakeGroupSourceAdapter()}
+ permissions_adapters = {'my_permissions': FakePermissionSourceAdapter()}
+ authenticators = (('auth', FakeAuthenticator()), )
+ app = setup_auth(app, groups_adapters, permissions_adapters,
authenticators)
+
+ app = httpexceptions.make_middleware(app)
+ return TestApp(app)
+
+
+def create_request(path, environ=None):
+ """Helper used in test cases to quickly setup a request obj.
+
+ ``path``
+ The path will become PATH_INFO
+ ``environ``
+ Additional environment
+
+ Returns an instance of the `webob.Request` object.
+ """
+ # setup the environ
+ if environ is None:
+ environ = {}
+ environ.update(default_environ)
+ # create a "blank" WebOb Request object
+ # using Pylon's Request which is a webob Request plus
+ # some compatibility methods
+ req = Request.blank(path, environ)
+ # setup a Registry
+ reg = environ.setdefault('paste.registry', Registry())
+ reg.prepare()
+ # setup pylons.request to point to our Registry
+ reg.register(pylons.request, req)
+ # setup tmpl context
+ tmpl_context._push_object(ContextObj())
+ return req
+
+class TestWSGIController(TestCase):
+
+ def setUp(self):
+ tmpl_context._push_object(ContextObj())
+ # Without the line below, it will fail because TG2 will try to use the
+ # deprecated Buffet package.
+ config['use_legacy_renderer'] = False
+
+ def tearDown(self):
+ tmpl_context._pop_object()
+
+ def get_response(self, **kargs):
+ url = kargs.pop('_url', '/')
+ self.environ['pylons.routes_dict'].update(kargs)
+ return self.app.get(url, extra_environ=self.environ)
+
+ def post_response(self, **kargs):
+ url = kargs.pop('_url', '/')
+ self.environ['pylons.routes_dict'].update(kargs)
+ return self.app.post(url, extra_environ=self.environ, params=kargs)
Added: trunk/tg/test_stack/fixture/__init__.py
==============================================================================
Added: trunk/tg/test_stack/test_auth.py
==============================================================================
--- (empty file)
+++ trunk/tg/test_stack/test_auth.py Sat Nov 22 08:26:53 2008
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+"""
+Tests for the integration of repoze.who and repoze.what into TG.
+
+"""
+
+import tg, pylons
+from tg.controllers import TGController
+from tg.decorators import expose, require
+from repoze.what import authorize
+from nose.tools import eq_
+
+from auth_base import TestWSGIController, make_app, setup_session_dir, \
+ teardown_session_dir
+
+
+def setup():
+ setup_session_dir()
+
+
+def _teardown():
+ teardown_session_dir()
+
+
+class SubController1(TGController):
+ """Mock TG2 subcontroller"""
+
+ @expose()
+ def index(self):
+ return 'hello sub1'
+
+ @expose()
+ def in_group(self):
+ return 'in group'
+
+
+class BasicTGController(TGController):
+ """Mock TG2 controller"""
+
+ sub1 = SubController1()
+
+ @expose()
+ def index(self, **kwargs):
+ return 'hello world'
+
+ @expose()
+ def default(self, remainder):
+ return "Main Default Page called for url /%s" % remainder
+
+ @expose()
+ @require(authorize.in_group('admins'))
+ def admin(self):
+ return 'got to admin'
+
+ @expose()
+ @require(authorize.in_all_groups('developers', 'admins'))
+ def all_groups(self):
+ return 'got to all groups'
+
+ @expose()
+ @require(authorize.in_any_group('php', 'trolls'))
+ def any_groups(self):
+ return 'got to any groups'
+
+ @expose()
+ @require(authorize.is_user('rms'))
+ def rms_user(self):
+ return 'got to promote freedomware'
+
+ @expose()
+ @require(authorize.has_permission('edit-site'))
+ def editsite_perm_only(self):
+ return 'got to edit'
+
+ @expose()
+ @require(authorize.has_any_permission('commit'))
+ def commit_perm(self):
+ return 'got to commit'
+
+ @expose()
+ @require(authorize.has_all_permissions('commit', 'edit-site'))
+ def all_perm(self):
+ return 'got to all perm'
+
+ @expose()
+ @require(authorize.not_anonymous())
+ def not_anon(self):
+ return 'got to not anon'
+
+ @expose()
+ def redirect_me(self, target, **kw):
+ tg.redirect(target, kw)
+
+
+class TestTGController(TestWSGIController):
+ """Test case for the mock TG controller and its subcontroller"""
+
+ def __init__(self, *args, **kargs):
+ TestWSGIController.__init__(self, *args, **kargs)
+ self.app = make_app(BasicTGController)
+
+ def _test_index(self):
+ resp = self.app.get('/index/')
+ assert 'hello' in resp.body
+
+ def test_group_no_auth(self):
+ resp = self.app.get('/admin')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_group_with_auth(self):
+ resp = self.app.get('/login_handler?login=rms&password=freedom')
+ resp = self.app.get('/admin')
+ eq_(resp.body, 'got to admin')
+
+ def test_all_groups_no_auth(self):
+ resp = self.app.get('/login_handler?login=rasmus&password=php')
+ resp = self.app.get('/all_groups')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_all_groups(self):
+ resp = self.app.get('/login_handler?login=rms&password=freedom')
+ resp = self.app.get('/all_groups')
+ eq_(resp.body, 'got to all groups')
+
+ def test_any_groups_no_auth(self):
+ resp = self.app.get('/login_handler?login=linus&password=freedomware')
+ resp = self.app.get('/any_groups')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_any_groups(self):
+ resp =
self.app.get('/login_handler?login=sballmer&password=developers')
+ resp = self.app.get('/any_groups')
+ eq_(resp.body, 'got to any groups')
+
+ def test_no_auth_not_anon(self):
+ resp = self.app.get('/not_anon')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_not_anon(self):
+ resp = self.app.get('/login_handler?login=linus&password=linux')
+ resp = self.app.get('/not_anon')
+ eq_(resp.body, 'got to not anon')
+
+ def test_no_auth_is_user(self):
+ resp =
self.app.get('/login_handler?login=sballmer&password=developers')
+ resp = self.app.get('/rms_user')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_is_user(self):
+ resp = self.app.get('/login_handler?login=rms&password=freedom')
+ resp = self.app.get('/rms_user')
+ eq_(resp.body, 'got to promote freedomware')
+
+ def test_no_auth_perm(self):
+ resp =
self.app.get('/login_handler?login=sballmer&password=developers')
+ resp = self.app.get('/editsite_perm_only')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_perm(self):
+ resp = self.app.get('/login_handler?login=rms&password=freedom')
+ resp = self.app.get('/editsite_perm_only')
+ eq_(resp.body, 'got to edit')
+
+ def test_no_auth_any_perm(self):
+ resp = self.app.get('/login_handler?login=linus&password=freedomware')
+ resp = self.app.get('/commit_perm')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_any_perm(self):
+ resp = self.app.get('/login_handler?login=rms&password=freedom')
+ resp = self.app.get('/commit_perm')
+ eq_(resp.body, 'got to commit')
+
+ def test_no_auth_all_perm(self):
+ resp = self.app.get('/login_handler?login=linus&password=freedomware')
+ resp = self.app.get('/all_perm')
+ assert resp.body.startswith('302 Found'), resp.body
+
+ def test_all_perm(self):
+ resp = self.app.get('/login_handler?login=linus&password=linux')
+ resp = self.app.get('/all_perm')
+ eq_(resp.body, 'got to all perm')
+
+ def test_sub_in_admin(self):
+ resp =
self.app.get('/login_handler?login=sballmer&password=developers')
+ resp = self.app.get('/sub1/in_group')
+ eq_(resp.body, 'in group')