Log message for revision 112875: Coverage for ZPublisher.WSGIPublisher.publish.
Changed: U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py -=- Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py =================================================================== --- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py 2010-05-31 17:27:45 UTC (rev 112874) +++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py 2010-05-31 18:26:04 UTC (rev 112875) @@ -56,6 +56,9 @@ # HTTP/1.1 should use chunked encoding http_chunk = 0 + # Append any "cleanup" functions to this list. + after_list = () + def finalize(self): headers = self.headers @@ -144,7 +147,12 @@ # return '' raise NotImplementedError -def publish(request, module_name, after_list, debug=0): +def publish(request, module_name, + _get_module_info=None, # only for testing + ): + if _get_module_info is None: + _get_module_info = get_module_info + (bobo_before, bobo_after, object, @@ -153,12 +161,13 @@ err_hook, validated_hook, transactions_manager, - )= get_module_info(module_name) + )= _get_module_info(module_name) request.processInputs() response = request.response - after_list[0] = bobo_after + if bobo_after is not None: + response.after_list += (bobo_after,) if debug_mode: response.debug_mode = debug_mode @@ -197,7 +206,6 @@ def publish_module(environ, start_response): status = 200 - after_list = [None] stdout = StringIO() stderr = StringIO() response = WSGIResponse(stdout=stdout, stderr=stderr) @@ -208,13 +216,9 @@ request = HTTPRequest(environ['wsgi.input'], environ, response) if ISkinnable.providedBy(request): setDefaultSkin(request) - - # Let's support post-mortem debugging - handle_errors = environ.get('wsgi.handleErrors', True) try: - response = publish(request, 'Zope2', after_list=[None], - debug=handle_errors) + response = publish(request, 'Zope2') except Unauthorized, v: pass except Redirect, v: @@ -241,8 +245,8 @@ stdout.close() - if after_list[0] is not None: - after_list[0]() + for callable in response.after_list: + callable() # Return the result body iterable. return result Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py =================================================================== --- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py 2010-05-31 17:27:45 UTC (rev 112874) +++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py 2010-05-31 18:26:04 UTC (rev 112875) @@ -154,6 +154,71 @@ self.assertRaises(NotImplementedError, lambda: str(response)) +class Test_publish(unittest.TestCase): + + def _callFUT(self, request, module_name, _get_module_info=None): + from ZPublisher.WSGIPublisher import publish + if _get_module_info is None: + return publish(request, module_name) + + return publish(request, module_name, _get_module_info) + + def test_invalid_module_doesnt_catch_error(self): + _gmi = DummyCallable() + _gmi._raise = ImportError('testing') + self.assertRaises(ImportError, self._callFUT, None, 'nonesuch', _gmi) + self.assertEqual(_gmi._called_with, (('nonesuch',), {})) + + def test_wo_REMOTE_USER(self): + request = DummyRequest(PATH_INFO='/') + response = request.response = DummyResponse() + _before = DummyCallable() + _after = object() + _object = DummyCallable() + _object._result = 'RESULT' + request._traverse_to = _object + _realm = 'TESTING' + _debug_mode = True + _err_hook = DummyCallable() + _validated_hook = object() + _tm = DummyTM() + _gmi = DummyCallable() + _gmi._result = (_before, _after, _object, _realm, _debug_mode, + _err_hook, _validated_hook, _tm) + returned = self._callFUT(request, 'okmodule', _gmi) + self.failUnless(returned is response) + self.assertEqual(_gmi._called_with, (('okmodule',), {})) + self.failUnless(request._processedInputs) + self.assertEqual(response.after_list, (_after,)) + self.failUnless(response.debug_mode) + self.assertEqual(response.realm, 'TESTING') + self.assertEqual(_before._called_with, ((), {})) + self.assertEqual(request['PARENTS'], [_object]) + self.assertEqual(request._traversed, ('/', None, _validated_hook)) + self.assertEqual(_tm._recorded, (_object, request)) + self.assertEqual(_object._called_with, ((), {})) + self.assertEqual(response._body, 'RESULT') + self.assertEqual(_err_hook._called_with, None) + + def test_w_REMOTE_USER(self): + request = DummyRequest(PATH_INFO='/', REMOTE_USER='phred') + response = request.response = DummyResponse() + _before = DummyCallable() + _after = object() + _object = DummyCallable() + _object._result = 'RESULT' + request._traverse_to = _object + _realm = 'TESTING' + _debug_mode = True + _err_hook = DummyCallable() + _validated_hook = object() + _tm = DummyTM() + _gmi = DummyCallable() + _gmi._result = (_before, _after, _object, _realm, _debug_mode, + _err_hook, _validated_hook, _tm) + self._callFUT(request, 'okmodule', _gmi) + self.assertEqual(response.realm, None) + class Test_publish_module(unittest.TestCase): def setUp(self): @@ -223,12 +288,50 @@ ('', 'foobar')) +class DummyRequest(dict): + _processedInputs = False + _traversed = None + _traverse_to = None + args = () + + def processInputs(self): + self._processedInputs = True + + def traverse(self, path, response=None, validated_hook=None): + self._traversed = (path, response, validated_hook) + return self._traverse_to + +class DummyResponse(object): + debug_mode = False + after_list = () + realm = None + _body = None + + def setBody(self, body): + self._body = body + +class DummyCallable(object): + _called_with = _raise = _result = None + + def __call__(self, *args, **kw): + self._called_with = (args, kw) + if self._raise: + raise self._raise + return self._result + +class DummyTM(object): + _recorded = _raise = _result = None + + def recordMetaData(self, *args): + self._recorded = args + def noopStartResponse(status, headers): pass def test_suite(): return unittest.TestSuite(( - unittest.makeSuite(WSGIResponseTests, 'test'), - unittest.makeSuite(Test_publish_module, 'test'), + unittest.makeSuite(WSGIResponseTests), + unittest.makeSuite(Test_publish), + unittest.makeSuite(Test_publish_module), )) _______________________________________________ Zope-Checkins maillist - Zope-Checkins@zope.org https://mail.zope.org/mailman/listinfo/zope-checkins