Log message for revision 110620: Coverage for App.ApplicationManager.ApplicationManager.
Changed: U Zope/trunk/src/App/ApplicationManager.py U Zope/trunk/src/App/tests/test_ApplicationManager.py -=- Modified: Zope/trunk/src/App/ApplicationManager.py =================================================================== --- Zope/trunk/src/App/ApplicationManager.py 2010-04-08 01:21:28 UTC (rev 110619) +++ Zope/trunk/src/App/ApplicationManager.py 2010-04-08 01:21:30 UTC (rev 110620) @@ -318,8 +318,10 @@ """Return to the main management screen""" raise Redirect, URL2+'/manage' - def process_time(self): - s=int(time.time())-self.process_start + def process_time(self, _when=None): + if _when is None: + _when = time.time() + s=int(_when)-self.process_start d=int(s/86400) s=s-(d*86400) h=int(s/3600) @@ -378,11 +380,14 @@ """ @requestmethod('POST') - def manage_pack(self, days=0, REQUEST=None): + def manage_pack(self, days=0, REQUEST=None, _when=None): """Pack the database""" - t=time.time()-days*86400 + if _when is None: + _when = time.time() + t = _when - (days*86400) + db=self._p_jar.db() t=db.pack(t) if REQUEST is not None: Modified: Zope/trunk/src/App/tests/test_ApplicationManager.py =================================================================== --- Zope/trunk/src/App/tests/test_ApplicationManager.py 2010-04-08 01:21:28 UTC (rev 110619) +++ Zope/trunk/src/App/tests/test_ApplicationManager.py 2010-04-08 01:21:30 UTC (rev 110620) @@ -1,5 +1,23 @@ import unittest +class ConfigTestBase: + + def setUp(self): + import App.config + self._old_config = App.config._config + + def tearDown(self): + import App.config + App.config._config = self._old_config + + def _makeConfig(self, **kw): + import App.config + class DummyConfig: + pass + App.config._config = config = DummyConfig() + config.dbtab = DummyDBTab(kw) + return config + class FakeConnectionTests(unittest.TestCase): def _getTargetClass(self): @@ -15,16 +33,8 @@ fc = self._makeOne(db, parent_jar) self.failUnless(fc.db() is db) -class DatabaseChooserTests(unittest.TestCase): +class DatabaseChooserTests(ConfigTestBase, unittest.TestCase): - def setUp(self): - import App.config - self._old_config = App.config._config - - def tearDown(self): - import App.config - App.config._config = self._old_config - def _getTargetClass(self): from App.ApplicationManager import DatabaseChooser return DatabaseChooser @@ -40,13 +50,6 @@ return self return Root() - def _makeConfig(self, **kw): - import App.config - class DummyConfig: - pass - App.config._config = config = DummyConfig() - config.dbtab = DummyDBTab(kw) - def test_getDatabaseNames_sorted(self): self._makeConfig(foo=object(), bar=object(), qux=object()) dc = self._makeOne('test') @@ -242,6 +245,225 @@ self.assertEqual(dm.manage_getSysPath(), list(sys.path)) +class ApplicationManagerTests(ConfigTestBase, unittest.TestCase): + + def setUp(self): + ConfigTestBase.setUp(self) + self._tempdirs = () + + def tearDown(self): + import shutil + for tempdir in self._tempdirs: + shutil.rmtree(tempdir) + ConfigTestBase.tearDown(self) + + def _getTargetClass(self): + from App.ApplicationManager import ApplicationManager + return ApplicationManager + + def _makeOne(self): + return self._getTargetClass()() + + def _makeJar(self, dbname, dbsize): + class Jar: + def db(self): + return self._db + jar = Jar() + jar._db = DummyDB(dbname, dbsize) + return jar + + def _makeTempdir(self): + import tempfile + tmp = tempfile.mkdtemp() + self._tempdirs += (tmp,) + return tmp + + def _makeFile(self, dir, name, text): + import os + os.makedirs(dir) + fqn = os.path.join(dir, name) + f = open(fqn, 'w') + f.write(text) + f.flush() + f.close() + return fqn + + def test_ctor_initializes_Products(self): + from App.Product import ProductFolder + am = self._makeOne() + self.failUnless(isinstance(am.Products, ProductFolder)) + + def test__canCopy(self): + am = self._makeOne() + self.failIf(am._canCopy()) + + def test_manage_app(self): + from zExceptions import Redirect + am = self._makeOne() + try: + am.manage_app('http://example.com/foo') + except Redirect, v: + self.assertEqual(v.args, ('http://example.com/foo/manage',)) + else: + self.fail('Redirect not raised') + + def test_process_time_seconds(self): + am = self._makeOne() + am.process_start = 0 + self.assertEqual(am.process_time(0).strip(), '0 sec') + self.assertEqual(am.process_time(1).strip(), '1 sec') + self.assertEqual(am.process_time(2).strip(), '2 sec') + + def test_process_time_minutes(self): + am = self._makeOne() + am.process_start = 0 + self.assertEqual(am.process_time(60).strip(), '1 min 0 sec') + self.assertEqual(am.process_time(61).strip(), '1 min 1 sec') + self.assertEqual(am.process_time(62).strip(), '1 min 2 sec') + self.assertEqual(am.process_time(120).strip(), '2 min 0 sec') + self.assertEqual(am.process_time(121).strip(), '2 min 1 sec') + self.assertEqual(am.process_time(122).strip(), '2 min 2 sec') + + def test_process_time_hours(self): + am = self._makeOne() + am.process_start = 0 + n1 = 60 * 60 + n2 = n1 * 2 + self.assertEqual(am.process_time(n1).strip(), + '1 hour 0 sec') + self.assertEqual(am.process_time(n1 + 61).strip(), + '1 hour 1 min 1 sec') + self.assertEqual(am.process_time(n2 + 1).strip(), + '2 hours 1 sec') + self.assertEqual(am.process_time(n2 + 122).strip(), + '2 hours 2 min 2 sec') + + def test_process_time_days(self): + am = self._makeOne() + am.process_start = 0 + n1 = 60 * 60 * 24 + n2 = n1 * 2 + self.assertEqual(am.process_time(n1).strip(), + '1 day 0 sec') + self.assertEqual(am.process_time(n1 + 3661).strip(), + '1 day 1 hour 1 min 1 sec') + self.assertEqual(am.process_time(n2 + 1).strip(), + '2 days 1 sec') + self.assertEqual(am.process_time(n2 + 7322).strip(), + '2 days 2 hours 2 min 2 sec') + + def test_thread_get_ident(self): + import thread + am = self._makeOne() + self.assertEqual(am.thread_get_ident(), thread.get_ident()) + + def test_db_name(self): + am = self._makeOne() + am._p_jar = self._makeJar('foo', '') + self.assertEqual(am.db_name(), 'foo') + + def test_db_size_string(self): + am = self._makeOne() + am._p_jar = self._makeJar('foo', 'super') + self.assertEqual(am.db_size(), 'super') + + def test_db_size_lt_1_meg(self): + am = self._makeOne() + am._p_jar = self._makeJar('foo', 4497) + self.assertEqual(am.db_size(), '4.4K') + + def test_db_size_gt_1_meg(self): + am = self._makeOne() + am._p_jar = self._makeJar('foo', (2048 * 1024) + 123240) + self.assertEqual(am.db_size(), '2.1M') + + #def test_manage_restart(self): XXX -- TOO UGLY TO TEST + #def test_manage_restart(self): XXX -- TOO UGLY TO TEST + + def test_manage_pack(self): + am = self._makeOne() + jar = am._p_jar = self._makeJar('foo', '') + am.manage_pack(1, _when=86400*2) + self.assertEqual(jar._db._packed, 86400) + + def test_revert_points(self): + am = self._makeOne() + self.assertEqual(list(am.revert_points()), []) + + def test_version_list(self): + # XXX this method is too stupid to live: returning a bare list + # of versions without even tying them to the products? + # and what about products living outside SOFTWARE_HOME? + # Nobody calls it, either + import os + am = self._makeOne() + config = self._makeConfig() + swdir = config.softwarehome = self._makeTempdir() + foodir = os.path.join(swdir, 'Products', 'foo') + self._makeFile(foodir, 'VERSION.TXT', '1.2') + bardir = os.path.join(swdir, 'Products', 'bar') + self._makeFile(bardir, 'VERSION.txt', '3.4') + bazdir = os.path.join(swdir, 'Products', 'baz') + self._makeFile(bazdir, 'version.txt', '5.6') + versions = am.version_list() + self.assertEqual(versions, ['3.4', '5.6', '1.2']) + + def test_getSOFTWARE_HOME_missing(self): + am = self._makeOne() + config = self._makeConfig() + self.assertEqual(am.getSOFTWARE_HOME(), None) + + def test_getSOFTWARE_HOME_present(self): + am = self._makeOne() + config = self._makeConfig() + swdir = config.softwarehome = self._makeTempdir() + self.assertEqual(am.getSOFTWARE_HOME(), swdir) + + def test_getZOPE_HOME_present(self): + am = self._makeOne() + config = self._makeConfig() + zopedir = config.zopehome = self._makeTempdir() + self.assertEqual(am.getZOPE_HOME(), zopedir) + + def test_getINSTANCE_HOME_present(self): + am = self._makeOne() + config = self._makeConfig() + instdir = config.instancehome = self._makeTempdir() + self.assertEqual(am.getINSTANCE_HOME(), instdir) + + def test_getCLIENT_HOME_present(self): + am = self._makeOne() + config = self._makeConfig() + cldir = config.clienthome = self._makeTempdir() + self.assertEqual(am.getCLIENT_HOME(), cldir) + + def test_getServers(self): + from asyncore import socket_map + class DummySocketServer: + def __init__(self, port): + self.port = port + class AnotherSocketServer(DummySocketServer): + pass + class NotAServer: + pass + am = self._makeOne() + _old_socket_map = socket_map.copy() + socket_map.clear() + socket_map['foo'] = DummySocketServer(45) + socket_map['bar'] = AnotherSocketServer(57) + socket_map['qux'] = NotAServer() + try: + pairs = am.getServers() + finally: + socket_map.clear() + socket_map.update(_old_socket_map) + self.assertEqual(len(pairs), 2) + self.failUnless((str(DummySocketServer), 'Port: 45') in pairs) + self.failUnless((str(AnotherSocketServer), 'Port: 57') in pairs) + + #def test_objectIds(self): XXX -- TOO UGLY TO TEST (BBB for Zope 2.3!!) + + class DummyDBTab: def __init__(self, databases=None): self._databases = databases or {} @@ -255,10 +477,27 @@ def getDatabase(self, name): return self._databases[name] +class DummyDB: + _packed = None + + def __init__(self, name, size): + self._name = name + self._size = size + + def getName(self): + return self._name + + def getSize(self): + return self._size + + def pack(self, when): + self._packed = when + def test_suite(): return unittest.TestSuite(( unittest.makeSuite(FakeConnectionTests), unittest.makeSuite(DatabaseChooserTests), unittest.makeSuite(DebugManagerTests), + unittest.makeSuite(ApplicationManagerTests), )) _______________________________________________ Zope-Checkins maillist - Zope-Checkins@zope.org https://mail.zope.org/mailman/listinfo/zope-checkins