Log message for revision 77459: Integrate Zope 3-based exception views. Patch by Sidnei, integration work done for Infrae.
Changed: U Zope/trunk/doc/CHANGES.txt U Zope/trunk/lib/python/Zope2/App/startup.py A Zope/trunk/lib/python/Zope2/App/tests/ A Zope/trunk/lib/python/Zope2/App/tests/__init__.py A Zope/trunk/lib/python/Zope2/App/tests/testExceptionHook.py U Zope/trunk/lib/python/zExceptions/__init__.py U Zope/trunk/lib/python/zExceptions/unauthorized.py -=- Modified: Zope/trunk/doc/CHANGES.txt =================================================================== --- Zope/trunk/doc/CHANGES.txt 2007-07-05 13:34:33 UTC (rev 77458) +++ Zope/trunk/doc/CHANGES.txt 2007-07-05 14:35:49 UTC (rev 77459) @@ -100,6 +100,27 @@ - AccessControl: the form behind the "Security" tab has a new form for user-related reporting of permissions and roles + - Zope 3-based exception views can now be registered in ZCML for + various exceptions that can be raised by Zope. Registering an + exception view can be done like this: + + <browser:page + for="zope.publisher.interfaces.INotFound" + class=".view.SomeView" + name="index.html" + permission="zope.Public" /> + + Relevant exceptions that can have views are: + + zope.interface.common.interfaces.IException + zope.publisher.interfaces.INotFound + zope.security.interfaces.IForbidden + zope.security.interfaces.IUnauthorized + + Note that the name has to be 'index.html' for the exception + view to work. (patch by Sidnei da Silva from Enfold, + integration by Martijn Faassen (Startifact) for Infrae) + Bugs Fixed - Collector #1306: Missing acquisition context on local roles screen. Modified: Zope/trunk/lib/python/Zope2/App/startup.py =================================================================== --- Zope/trunk/lib/python/Zope2/App/startup.py 2007-07-05 13:34:33 UTC (rev 77458) +++ Zope/trunk/lib/python/Zope2/App/startup.py 2007-07-05 14:35:49 UTC (rev 77459) @@ -13,6 +13,7 @@ """Initialize the Zope2 Package and provide a published module """ +from zope.component import queryMultiAdapter from AccessControl.SecurityManagement import getSecurityManager from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import noSecurityManager @@ -20,7 +21,7 @@ from App.config import getConfiguration from time import asctime from types import StringType, ListType -from zExceptions import Unauthorized +from zExceptions import Unauthorized, Redirect from ZODB.POSException import ConflictError import transaction import AccessControl.User @@ -37,6 +38,8 @@ import Zope2 import ZPublisher +app = None +startup_time = asctime() def startup(): global app @@ -132,100 +135,124 @@ ) Zope2.DB.removeVersionPool(version) raise Unauthorized, "You don't have permission to enter versions." - + class RequestContainer(ExtensionClass.Base): def __init__(self,r): self.REQUEST=r -conflict_errors = 0 -unresolved_conflict_errors = 0 +class ZPublisherExceptionHook: -conflict_logger = logging.getLogger('ZPublisher.Conflict') + def __init__(self): + self.conflict_errors = 0 + self.unresolved_conflict_errors = 0 + self.conflict_logger = logging.getLogger('ZPublisher.Conflict') + self.error_message = 'standard_error_message' + self.raise_error_message = 'raise_standardErrorMessage' -def zpublisher_exception_hook(published, REQUEST, t, v, traceback): - global unresolved_conflict_errors - global conflict_errors - try: - if isinstance(t, StringType): - if t.lower() in ('unauthorized', 'redirect'): - raise - else: - if t is SystemExit: - raise - if issubclass(t, ConflictError): - conflict_errors += 1 - level = getConfiguration().conflict_error_log_level - if level: - conflict_logger.log(level, - "%s at %s: %s (%d conflicts (%d unresolved) " - "since startup at %s)", - v.__class__.__name__, - REQUEST.get('PATH_INFO', '<unknown>'), - v, - conflict_errors, - unresolved_conflict_errors, - startup_time) - raise ZPublisher.Retry(t, v, traceback) - if t is ZPublisher.Retry: - try: - v.reraise() - except: - # we catch the re-raised exception so that it gets - # stored in the error log and gets rendered with - # standard_error_message - t, v, traceback = sys.exc_info() - if issubclass(t, ConflictError): - # ouch, a user saw this conflict error :-( - unresolved_conflict_errors += 1 + def logConflicts(self, v, REQUEST): + self.conflict_errors += 1 + level = getattr(getConfiguration(), 'conflict_error_log_level', 0) + if not self.conflict_logger.isEnabledFor(level): + return False + self.conflict_logger.log( + level, + "%s at %s: %s (%d conflicts (%d unresolved) " + "since startup at %s)", + v.__class__.__name__, + REQUEST.get('PATH_INFO', '<unknown>'), + v, + self.conflict_errors, + self.unresolved_conflict_errors, + startup_time) + return True + def __call__(self, published, REQUEST, t, v, traceback): try: - log = aq_acquire(published, '__error_log__', containment=1) - except AttributeError: - error_log_url = '' - else: - error_log_url = log.raising((t, v, traceback)) + if isinstance(t, StringType): + if t.lower() in ('unauthorized', 'redirect'): + raise + else: + if t is SystemExit or t is Redirect: + raise - if (getattr(REQUEST.get('RESPONSE', None), '_error_format', '') - !='text/html'): - raise t, v, traceback + if issubclass(t, ConflictError): + self.logConflicts(v, REQUEST) + raise ZPublisher.Retry(t, v, traceback) - if (published is None or published is app or - type(published) is ListType): - # At least get the top-level object - published=app.__bobo_traverse__(REQUEST).__of__( - RequestContainer(REQUEST)) + if t is ZPublisher.Retry: + try: + v.reraise() + except: + # we catch the re-raised exception so that it gets + # stored in the error log and gets rendered with + # standard_error_message + t, v, traceback = sys.exc_info() + if issubclass(t, ConflictError): + # ouch, a user saw this conflict error :-( + self.unresolved_conflict_errors += 1 - published=getattr(published, 'im_self', published) - while 1: - f=getattr(published, 'raise_standardErrorMessage', None) - if f is None: - published=getattr(published, 'aq_parent', None) - if published is None: - raise t, v, traceback + try: + log = aq_acquire(published, '__error_log__', containment=1) + except AttributeError: + error_log_url = '' else: - break + error_log_url = log.raising((t, v, traceback)) - client=published - while 1: - if getattr(client, 'standard_error_message', None) is not None: - break - client=getattr(client, 'aq_parent', None) - if client is None: + if (getattr(REQUEST.get('RESPONSE', None), '_error_format', '') + !='text/html'): raise t, v, traceback - if REQUEST.get('AUTHENTICATED_USER', None) is None: - REQUEST['AUTHENTICATED_USER']=AccessControl.User.nobody + # Lookup a view for the exception and render it, then + # raise the rendered value as the exception value + # (basically the same that 'raise_standardErrorMessage' + # does. The view is named 'index.html' because that's what + # Zope 3 uses as well. + view = queryMultiAdapter((v, REQUEST), name=u'index.html') + if view is not None: + v = view() + response = REQUEST.RESPONSE + response.setStatus(t) + response.setBody(v) + return response - try: - f(client, REQUEST, t, v, traceback, error_log_url=error_log_url) - except TypeError: - # Pre 2.6 call signature - f(client, REQUEST, t, v, traceback) + if (published is None or published is app or + type(published) is ListType): + # At least get the top-level object + published=app.__bobo_traverse__(REQUEST).__of__( + RequestContainer(REQUEST)) - finally: - traceback=None + published = getattr(published, 'im_self', published) + while 1: + f = getattr(published, self.raise_error_message, None) + if f is None: + published = getattr(published, 'aq_parent', None) + if published is None: + raise t, v, traceback + else: + break + client = published + while 1: + if getattr(client, self.error_message, None) is not None: + break + client = getattr(client, 'aq_parent', None) + if client is None: + raise t, v, traceback + + if REQUEST.get('AUTHENTICATED_USER', None) is None: + REQUEST['AUTHENTICATED_USER'] = AccessControl.User.nobody + + try: + f(client, REQUEST, t, v, traceback, error_log_url=error_log_url) + except TypeError: + # Pre 2.6 call signature + f(client, REQUEST, t, v, traceback) + + finally: + traceback = None + +zpublisher_exception_hook = ZPublisherExceptionHook() ac_logger = logging.getLogger('event.AccessControl') class TransactionsManager: Added: Zope/trunk/lib/python/Zope2/App/tests/__init__.py =================================================================== --- Zope/trunk/lib/python/Zope2/App/tests/__init__.py (rev 0) +++ Zope/trunk/lib/python/Zope2/App/tests/__init__.py 2007-07-05 14:35:49 UTC (rev 77459) @@ -0,0 +1,15 @@ +############################################################################## +# +# Copyright (c) 2007 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## + +"""Tests of the Zope2.App package.""" Added: Zope/trunk/lib/python/Zope2/App/tests/testExceptionHook.py =================================================================== --- Zope/trunk/lib/python/Zope2/App/tests/testExceptionHook.py (rev 0) +++ Zope/trunk/lib/python/Zope2/App/tests/testExceptionHook.py 2007-07-05 14:35:49 UTC (rev 77459) @@ -0,0 +1,412 @@ +############################################################################## +# +# Copyright (c) 2007 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## + +import sys +import unittest +import logging + +from zope.publisher.interfaces import INotFound +from zope.security.interfaces import IUnauthorized +from zope.security.interfaces import IForbidden +from zope.interface.common.interfaces import IException + +from zope.app.testing import ztapi +from zope.app.testing.placelesssetup import PlacelessSetup +from zope.publisher.browser import setDefaultSkin + +class ExceptionHookTestCase(unittest.TestCase): + + def _makeOne(self): + from Zope2.App.startup import ZPublisherExceptionHook + return ZPublisherExceptionHook() + + def _makeRequest(self, stdin=None, environ=None, + response=None, clean=1, stdout=None): + from ZPublisher.HTTPRequest import HTTPRequest + from ZPublisher.HTTPResponse import HTTPResponse + + if stdin is None: + from StringIO import StringIO + stdin = StringIO() + + if stdout is None: + from StringIO import StringIO + stdout = StringIO() + + if environ is None: + environ = {} + + if 'SERVER_NAME' not in environ: + environ['SERVER_NAME'] = 'http://localhost' + + if 'SERVER_PORT' not in environ: + environ['SERVER_PORT'] = '8080' + + if response is None: + response = HTTPResponse(stdout=stdout) + + req = HTTPRequest(stdin, environ, response, clean) + setDefaultSkin(req) + return req + + def call(self, published, request, f, args=None, kw=None): + hook = self._makeOne() + try: + if args is None: + args = () + if kw is None: + kw = {} + f(*args, **kw) + except: + return hook(published, request, + sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2], + ) + + def call_no_exc(self, hook, published, request, f, args=None, kw=None): + if hook is None: + hook = self._makeOne() + try: + if args is None: + args = () + if kw is None: + kw = {} + f(*args, **kw) + except: + try: + hook(published, request, + sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2], + ) + except: + pass + return hook + + def call_exc_value(self, published, request, f, args=None, kw=None): + hook = self._makeOne() + try: + if args is None: + args = () + if kw is None: + kw = {} + f(*args, **kw) + except: + try: + return hook(published, request, + sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2], + ) + except Exception, e: + return e + +class ExceptionHookTest(ExceptionHookTestCase): + + def testStringException1(self): + def f(): + raise 'unauthorized', 'x' + self.assertRaises('unauthorized', self.call, None, None, f) + + def testStringException2(self): + def f(): + raise 'redirect', 'x' + self.assertRaises('redirect', self.call, None, None, f) + + def testSystemExit(self): + def f(): + raise SystemExit, 1 + self.assertRaises(SystemExit, self.call, None, None, f) + + def testUnauthorized(self): + from AccessControl import Unauthorized + def f(): + raise Unauthorized, 1 + self.assertRaises(Unauthorized, self.call, None, {}, f) + + def testConflictErrorRaisesRetry(self): + from ZPublisher import Retry + from ZODB.POSException import ConflictError + from App.config import getConfiguration + def f(): + raise ConflictError + request = self._makeRequest() + old_value = getattr(getConfiguration(), 'conflict_error_log_level', 0) + self.assertEquals(old_value, 0) # default value + try: + getConfiguration().conflict_error_log_level = logging.CRITICAL + level = getattr(getConfiguration(), 'conflict_error_log_level', 0) + self.assertEquals(level, logging.CRITICAL) + self.assertRaises(Retry, self.call, None, request, f) + finally: + getConfiguration().conflict_error_log_level = old_value + + def testConflictErrorCount(self): + from ZPublisher import Retry + from ZODB.POSException import ConflictError + def f(): + raise ConflictError + hook = self._makeOne() + self.assertEquals(hook.conflict_errors, 0) + self.call_no_exc(hook, None, None, f) + self.assertEquals(hook.conflict_errors, 1) + self.call_no_exc(hook, None, None, f) + self.assertEquals(hook.conflict_errors, 2) + + def testRetryRaisesOriginalException(self): + from ZPublisher import Retry + class CustomException(Exception): + pass + def f(): + try: + raise CustomException, 'Zope' + except: + raise Retry(sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2]) + self.assertRaises(CustomException, self.call, None, {}, f) + + def testRetryRaisesConflictError(self): + from ZPublisher import Retry + from ZODB.POSException import ConflictError + def f(): + try: + raise ConflictError + except: + raise Retry(sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2]) + self.assertRaises(ConflictError, self.call, None, {}, f) + + def testRetryUnresolvedConflictErrorCount(self): + from ZPublisher import Retry + from ZODB.POSException import ConflictError + def f(): + try: + raise ConflictError + except: + raise Retry(sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2]) + hook = self._makeOne() + self.assertEquals(hook.unresolved_conflict_errors, 0) + self.call_no_exc(hook, None, None, f) + self.assertEquals(hook.unresolved_conflict_errors, 1) + self.call_no_exc(hook, None, None, f) + self.assertEquals(hook.unresolved_conflict_errors, 2) + +class Client: + + def __init__(self): + self.standard_error_message = True + self.messages = [] + + def dummyMethod(self): + return 'Aye' + +class OldClient(Client): + + def raise_standardErrorMessage(self, c, r, t, v, tb): + from zExceptions.ExceptionFormatter import format_exception + self.messages.append(''.join(format_exception(t, v, tb, as_html=0))) + +class StandardClient(Client): + + def raise_standardErrorMessage(self, c, r, t, v, tb, error_log_url): + from zExceptions.ExceptionFormatter import format_exception + fmt = format_exception(t, v, tb, as_html=0) + self.messages.append(''.join([error_log_url] + fmt)) + +class BrokenClient(Client): + + def raise_standardErrorMessage(self, c, r, t, v, tb, error_log_url): + raise AttributeError, 'ouch' + +class ExceptionMessageRenderTest(ExceptionHookTestCase): + + def testRenderUnauthorizedOldClient(self): + from AccessControl import Unauthorized + def f(): + raise Unauthorized, 1 + request = self._makeRequest() + client = OldClient() + self.call(client, request, f) + self.failUnless(client.messages, client.messages) + tb = client.messages[0] + self.failUnless("Unauthorized: You are not allowed" in tb, tb) + + def testRenderUnauthorizedStandardClient(self): + from AccessControl import Unauthorized + def f(): + raise Unauthorized, 1 + request = self._makeRequest() + client = StandardClient() + self.call(client, request, f) + self.failUnless(client.messages, client.messages) + tb = client.messages[0] + self.failUnless("Unauthorized: You are not allowed" in tb, tb) + + def testRenderUnauthorizedStandardClientMethod(self): + from AccessControl import Unauthorized + def f(): + raise Unauthorized, 1 + request = self._makeRequest() + client = StandardClient() + self.call(client.dummyMethod, request, f) + self.failUnless(client.messages, client.messages) + tb = client.messages[0] + self.failUnless("Unauthorized: You are not allowed" in tb, tb) + + def testRenderUnauthorizedBrokenClient(self): + from AccessControl import Unauthorized + def f(): + raise Unauthorized, 1 + request = self._makeRequest() + client = BrokenClient() + self.assertRaises(AttributeError, self.call, client, request, f) + + def testRenderRetryRaisesOriginalException(self): + from ZPublisher import Retry + class CustomException(Exception): + pass + def f(): + try: + raise CustomException, 'Zope' + except: + raise Retry(sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2]) + request = self._makeRequest() + client = StandardClient() + self.call(client, request, f) + self.failUnless(client.messages, client.messages) + tb = client.messages[0] + self.failUnless("CustomException: Zope" in tb, tb) + + def testRenderRetryRaisesConflictError(self): + from ZPublisher import Retry + from ZODB.POSException import ConflictError + def f(): + try: + raise ConflictError + except: + raise Retry(sys.exc_info()[0], + sys.exc_info()[1], + sys.exc_info()[2]) + request = self._makeRequest() + client = StandardClient() + self.call(client, request, f) + self.failUnless(client.messages, client.messages) + tb = client.messages[0] + self.failUnless("ConflictError: database conflict error" in tb, tb) + +class CustomExceptionView: + + def __init__(self, context, request): + self.context = context + self.request = request + + def __call__(self): + return "Exception View: %s" % self.context.__class__.__name__ + +class ExceptionViewsTest(PlacelessSetup, ExceptionHookTestCase): + + def testCustomExceptionViewUnauthorized(self): + from ZPublisher.HTTPResponse import HTTPResponse + from AccessControl import Unauthorized + ztapi.browserView(IUnauthorized, u'index.html', CustomExceptionView) + def f(): + raise Unauthorized, 1 + request = self._makeRequest() + client = StandardClient() + v = self.call_exc_value(client, request, f) + self.failUnless(isinstance(v, HTTPResponse), v) + self.failUnless(v.status == 401, (v.status, 401)) + self.failUnless("Exception View: Unauthorized" in str(v)) + + def testCustomExceptionViewForbidden(self): + from ZPublisher.HTTPResponse import HTTPResponse + from zExceptions import Forbidden + ztapi.browserView(IForbidden, u'index.html', CustomExceptionView) + def f(): + raise Forbidden, "argh" + request = self._makeRequest() + client = StandardClient() + v = self.call_exc_value(client, request, f) + self.failUnless(isinstance(v, HTTPResponse), v) + self.failUnless(v.status == 403, (v.status, 403)) + self.failUnless("Exception View: Forbidden" in str(v)) + + def testCustomExceptionViewNotFound(self): + from ZPublisher.HTTPResponse import HTTPResponse + from zExceptions import NotFound + ztapi.browserView(INotFound, u'index.html', CustomExceptionView) + def f(): + raise NotFound, "argh" + request = self._makeRequest() + client = StandardClient() + v = self.call_exc_value(client, request, f) + self.failUnless(isinstance(v, HTTPResponse), v) + self.failUnless(v.status == 404, (v.status, 404)) + self.failUnless("Exception View: NotFound" in str(v), v) + + def testCustomExceptionViewBadRequest(self): + from ZPublisher.HTTPResponse import HTTPResponse + from zExceptions import BadRequest + ztapi.browserView(IException, u'index.html', CustomExceptionView) + def f(): + raise BadRequest, "argh" + request = self._makeRequest() + client = StandardClient() + v = self.call_exc_value(client, request, f) + self.failUnless(isinstance(v, HTTPResponse), v) + self.failUnless(v.status == 400, (v.status, 400)) + self.failUnless("Exception View: BadRequest" in str(v), v) + + def testCustomExceptionViewInternalError(self): + from ZPublisher.HTTPResponse import HTTPResponse + from zExceptions import InternalError + ztapi.browserView(IException, u'index.html', CustomExceptionView) + def f(): + raise InternalError, "argh" + request = self._makeRequest() + client = StandardClient() + v = self.call_exc_value(client, request, f) + self.failUnless(isinstance(v, HTTPResponse), v) + self.failUnless(v.status == 500, (v.status, 500)) + self.failUnless("Exception View: InternalError" in str(v), v) + + def testRedirectNoExceptionView(self): + from ZPublisher.HTTPResponse import HTTPResponse + from zExceptions import Redirect + ztapi.browserView(IException, u'index.html', CustomExceptionView) + def f(): + raise Redirect, "http://zope.org/" + request = self._makeRequest() + client = StandardClient() + v = self.call_exc_value(client, request, f) + self.failUnless(isinstance(v, Redirect), v) + self.assertEquals(v.args[0], "http://zope.org/") + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ExceptionHookTest)) + suite.addTest(unittest.makeSuite(ExceptionMessageRenderTest)) + suite.addTest(unittest.makeSuite(ExceptionViewsTest)) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') Modified: Zope/trunk/lib/python/zExceptions/__init__.py =================================================================== --- Zope/trunk/lib/python/zExceptions/__init__.py 2007-07-05 13:34:33 UTC (rev 77458) +++ Zope/trunk/lib/python/zExceptions/__init__.py 2007-07-05 14:35:49 UTC (rev 77459) @@ -20,17 +20,22 @@ from unauthorized import Unauthorized +from zope.interface import implements +from zope.interface.common.interfaces import IException +from zope.publisher.interfaces import INotFound +from zope.security.interfaces import IForbidden + class BadRequest(Exception): - pass + implements(IException) class InternalError(Exception): - pass + implements(IException) class NotFound(Exception): - pass + implements(INotFound) class Forbidden(Exception): - pass + implements(IForbidden) class MethodNotAllowed(Exception): pass Modified: Zope/trunk/lib/python/zExceptions/unauthorized.py =================================================================== --- Zope/trunk/lib/python/zExceptions/unauthorized.py 2007-07-05 13:34:33 UTC (rev 77458) +++ Zope/trunk/lib/python/zExceptions/unauthorized.py 2007-07-05 14:35:49 UTC (rev 77459) @@ -15,9 +15,13 @@ """ from types import StringType +from zope.interface import implements +from zope.security.interfaces import IUnauthorized class Unauthorized(Exception): - """Some user wasn't allowed to access a resource""" + """Some user wasn't allowed to access a resource + """ + implements(IUnauthorized) def __init__(self, message=None, value=None, needed=None, name=None, **kw): """Possible signatures: _______________________________________________ Zope-Checkins maillist - Zope-Checkins@zope.org http://mail.zope.org/mailman/listinfo/zope-checkins