Hi everyone,
Attached is the "final" patch addressing the following issues
1) Reloading the config for an fcgi process group did not close the fcgi
socket - now, the socket is closed whenever the group is stopped as a unit
(including during config update). However, if you stop all the processes in
a group individually, the socket will remain open to allow for graceful
restarts of FCGI daemons
2) Rereading the config did not pick up changes to the socket parameter in a
fcgi-program section - this was a simple fix requiring a little
customization of the __eq__() method
3) Made a more friendly exception message when a FCGI socket cannot be
created
Let me know if you find any issues. Thanks,
Roger
On Fri, Aug 28, 2009 at 6:38 AM, Marco Vittorini Orgeas <[email protected]>wrote:
> All right,
>
> just drop a mail when the final patch is out.
> thanks again,
>
> --
> Marco
>
> On Thu, August 27, 2009 9:07 pm, Roger Hoover wrote:
> > Hi Marco,
> >
> > My original patch is functional and tested just not as ideal as I want.
> I
> > got most of way toward a final patch before getting swamped with a hard
> > deadline at work. I'll probably be able to get it finished early next
> > week.
> >
> > Roger
> >
> > On Thu, Aug 27, 2009 at 7:24 AM, Marco Vittorini Orgeas
> > <[email protected]>wrote:
> >
> >> Hi Roger,
> >> did you end up with a final patch for this or the good one is the one
> >> you
> >> already have attached?
> >> If you have any new, can you please post it here or write the revision
> >> number in
> >> which is contained, I'd like to test against latest stable.
> >>
> >>
> >> Please keep me CC'ed,
> >>
> >> thank you
> >>
> >> Marco
> >>
> >> _______________________________________________
> >> Supervisor-users mailing list
> >> [email protected]
> >> http://lists.supervisord.org/mailman/listinfo/supervisor-users
> >>
> >
>
>
>
Index: src/supervisor/options.py
===================================================================
--- src/supervisor/options.py (revision 881)
+++ src/supervisor/options.py (working copy)
@@ -59,8 +59,6 @@
from supervisor.datatypes import profile_options
from supervisor.datatypes import set_here
-from supervisor.socket_manager import SocketManager
-
from supervisor import loggers
from supervisor import states
from supervisor import xmlrpc
@@ -664,7 +662,7 @@
FastCGIProcessConfig)
groups.append(
FastCGIGroupConfig(self, program_name, priority, processes,
- SocketManager(socket_config))
+ socket_config)
)
@@ -681,8 +679,7 @@
path = normalize_path(path)
return UnixStreamSocketConfig(path)
- tcp_re = re.compile(r'^tcp://([^\s:]+):(\d+)$')
- m = tcp_re.match(sock)
+ m = re.match(r'tcp://([^\s:]+):(\d+)$', sock)
if m:
host = m.group(1)
port = int(m.group(2))
@@ -1557,6 +1554,7 @@
return dispatchers, p
class FastCGIProcessConfig(ProcessConfig):
+
def make_process(self, group=None):
if group is None:
raise NotImplementedError('FastCGI programs require a group')
@@ -1627,16 +1625,25 @@
class FastCGIGroupConfig(ProcessGroupConfig):
def __init__(self, options, name, priority, process_configs,
- socket_manager):
+ socket_config):
self.options = options
self.name = name
self.priority = priority
self.process_configs = process_configs
- self.socket_manager = socket_manager
+ self.socket_config = socket_config
- def after_setuid(self):
- ProcessGroupConfig.after_setuid(self)
- self.socket_manager.prepare_socket()
+ def __eq__(self, other):
+ if not isinstance(other, FastCGIGroupConfig):
+ return False
+
+ if self.socket_config != other.socket_config:
+ return False
+
+ return ProcessGroupConfig.__eq__(self, other)
+
+ def make_group(self):
+ from supervisor.process import FastCGIProcessGroup
+ return FastCGIProcessGroup(self)
def readFile(filename, offset, length):
""" Read length bytes from the file named by filename starting at
Index: src/supervisor/datatypes.py
===================================================================
--- src/supervisor/datatypes.py (revision 881)
+++ src/supervisor/datatypes.py (working copy)
@@ -184,7 +184,22 @@
return '<%s at %s for %s>' % (self.__class__,
id(self),
self.url)
+
+ def __str__(self):
+ return str(self.url)
+
+ def __eq__(self, other):
+ if not isinstance(other, SocketConfig):
+ return False
+ if self.url != other.url:
+ return False
+
+ return True
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
def addr(self):
raise NotImplementedError
Index: src/supervisor/rpcinterface.py
===================================================================
--- src/supervisor/rpcinterface.py (revision 881)
+++ src/supervisor/rpcinterface.py (working copy)
@@ -427,6 +427,8 @@
if group is None:
raise RPCError(Faults.BAD_NAME, name)
+ group.stop_requested()
+
processes = group.processes.values()
processes.sort()
processes = [ (group, process) for process in processes ]
Index: src/supervisor/tests/test_options.py
===================================================================
--- src/supervisor/tests/test_options.py (revision 881)
+++ src/supervisor/tests/test_options.py (working copy)
@@ -15,7 +15,7 @@
from supervisor.tests.base import DummyPConfig
from supervisor.tests.base import DummyPGroupConfig
from supervisor.tests.base import DummyProcess
-from supervisor.tests.base import DummySocketManager
+from supervisor.tests.base import DummySocketConfig
from supervisor.tests.base import lstrip
class OptionTests(unittest.TestCase):
@@ -806,7 +806,7 @@
self.assertEqual(gconfig0.__class__, FastCGIGroupConfig)
self.assertEqual(gconfig0.name, 'foo')
self.assertEqual(gconfig0.priority, 1)
- self.assertEqual(gconfig0.socket_manager.config().url,
+ self.assertEqual(gconfig0.socket_config.url,
'unix:///tmp/foo.sock')
self.assertEqual(len(gconfig0.process_configs), 2)
self.assertEqual(gconfig0.process_configs[0].__class__,
@@ -817,7 +817,7 @@
gconfig1 = gconfigs[1]
self.assertEqual(gconfig1.name, 'bar')
self.assertEqual(gconfig1.priority, 999)
- self.assertEqual(gconfig1.socket_manager.config().url,
+ self.assertEqual(gconfig1.socket_config.url,
'tcp://localhost:6000')
self.assertEqual(len(gconfig1.process_configs), 3)
@@ -1300,23 +1300,36 @@
def test_ctor(self):
options = DummyOptions()
- sock_manager = DummySocketManager(6)
- instance = self._makeOne(options, 'whatever', 999, [], sock_manager)
+ sock_config = DummySocketConfig(6)
+ instance = self._makeOne(options, 'whatever', 999, [], sock_config)
self.assertEqual(instance.options, options)
self.assertEqual(instance.name, 'whatever')
self.assertEqual(instance.priority, 999)
self.assertEqual(instance.process_configs, [])
- self.assertEqual(instance.socket_manager, sock_manager)
+ self.assertEqual(instance.socket_config, sock_config)
- def test_after_setuid(self):
+ def test_same_sockets_are_equal(self):
options = DummyOptions()
- sock_manager = DummySocketManager(6)
- pconfigs = [DummyPConfig(options, 'process1', '/bin/process1')]
- instance = self._makeOne(options, 'whatever', 999, pconfigs,
sock_manager)
- instance.after_setuid()
- self.assertTrue(pconfigs[0].autochildlogs_created)
- self.assertTrue(instance.socket_manager.prepare_socket_called)
+ sock_config1 = DummySocketConfig(6)
+ instance1 = self._makeOne(options, 'whatever', 999, [], sock_config1)
+ sock_config2 = DummySocketConfig(6)
+ instance2 = self._makeOne(options, 'whatever', 999, [], sock_config2)
+
+ self.assertTrue(instance1 == instance2)
+ self.assertFalse(instance1 != instance2)
+
+ def test_diff_sockets_are_not_equal(self):
+ options = DummyOptions()
+ sock_config1 = DummySocketConfig(6)
+ instance1 = self._makeOne(options, 'whatever', 999, [], sock_config1)
+
+ sock_config2 = DummySocketConfig(7)
+ instance2 = self._makeOne(options, 'whatever', 999, [], sock_config2)
+
+ self.assertTrue(instance1 != instance2)
+ self.assertFalse(instance1 == instance2)
+
class UtilFunctionsTests(unittest.TestCase):
def test_make_namespec(self):
from supervisor.options import make_namespec
Index: src/supervisor/tests/test_datatypes.py
===================================================================
--- src/supervisor/tests/test_datatypes.py (revision 881)
+++ src/supervisor/tests/test_datatypes.py (working copy)
@@ -183,6 +183,25 @@
self.assertTrue(reuse)
sock.close
+ def test_same_urls_are_equal(self):
+ conf1 = self._makeOne('localhost', '5001')
+ conf2 = self._makeOne('localhost', '5001')
+ self.assertTrue(conf1 == conf2)
+ self.assertFalse(conf1 != conf2)
+
+ def test_diff_urls_are_not_equal(self):
+ conf1 = self._makeOne('localhost', '5001')
+ conf2 = self._makeOne('localhost', '5002')
+ self.assertTrue(conf1 != conf2)
+ self.assertFalse(conf1 == conf2)
+
+ def test_diff_objs_are_not_equal(self):
+ conf1 = self._makeOne('localhost', '5001')
+ conf2 = 'blah'
+ self.assertTrue(conf1 != conf2)
+ self.assertFalse(conf1 == conf2)
+
+
class UnixStreamSocketConfigTests(unittest.TestCase):
def _getTargetClass(self):
return datatypes.UnixStreamSocketConfig
@@ -213,6 +232,24 @@
sock = conf.create()
self.assertFalse(os.path.exists(tf_name))
sock.close
+
+ def test_same_paths_are_equal(self):
+ conf1 = self._makeOne('/tmp/foo.sock')
+ conf2 = self._makeOne('/tmp/foo.sock')
+ self.assertTrue(conf1 == conf2)
+ self.assertFalse(conf1 != conf2)
+
+ def test_diff_paths_are_not_equal(self):
+ conf1 = self._makeOne('/tmp/foo.sock')
+ conf2 = self._makeOne('/tmp/bar.sock')
+ self.assertTrue(conf1 != conf2)
+ self.assertFalse(conf1 == conf2)
+
+ def test_diff_objs_are_not_equal(self):
+ conf1 = self._makeOne('/tmp/foo.sock')
+ conf2 = 'blah'
+ self.assertTrue(conf1 != conf2)
+ self.assertFalse(conf1 == conf2)
def test_suite():
return unittest.findTestCases(sys.modules[__name__])
Index: src/supervisor/tests/base.py
===================================================================
--- src/supervisor/tests/base.py (revision 881)
+++ src/supervisor/tests/base.py (working copy)
@@ -293,6 +293,63 @@
def get_state(self):
return self.options.mood
+class DummySocket:
+ bind_called = False
+ bind_addr = None
+ listen_called = False
+ listen_backlog = None
+ close_called = False
+
+ def __init__(self, fd):
+ self.fd = fd
+
+ def fileno(self):
+ return self.fd
+
+ def bind(self, addr):
+ self.bind_called = True
+ self.bind_addr = addr
+
+ def listen(self, backlog):
+ self.listen_called = True
+ self.listen_backlog = backlog
+
+ def close(self):
+ self.close_called = True
+
+ def __str__(self):
+ return 'dummy socket'
+
+class DummySocketConfig:
+ def __init__(self, fd):
+ self.fd = fd
+
+ def addr(self):
+ return 'dummy addr'
+
+ def __eq__(self, other):
+ return self.fd == other.fd
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def create(self):
+ return DummySocket(self.fd)
+
+class DummySocketManager:
+ def __init__(self, config, **kwargs):
+ self._config = config
+ self.request_close_called = False
+
+ def config(self):
+ return self._config
+
+ def get_socket(self):
+ return DummySocket(self._config.fd)
+
+ def request_close(self):
+ self.request_close_called = True
+
class DummyProcess:
# Initial state; overridden by instance variables
pid = 0 # Subprocess pid; 0 when not running
@@ -861,9 +918,9 @@
self.name)
class DummyFCGIGroupConfig(DummyPGroupConfig):
- def __init__(self, options, name, priority, pconfigs, socket_manager):
+ def __init__(self, options, name='whatever', priority=999, pconfigs=None,
socket_config=DummySocketConfig(1)):
DummyPGroupConfig.__init__(self, options, name, priority, pconfigs)
- self.socket_manager = socket_manager
+ self.socket_config = socket_config
class DummyProcessGroup:
def __init__(self, config):
@@ -872,18 +929,28 @@
self.all_stopped = False
self.dispatchers = {}
self.unstopped_processes = []
+ self.stop_was_requested = False
def transition(self):
self.transitioned = True
def stop_all(self):
self.all_stopped = True
+
+ def stop_requested(self):
+ self.stop_was_requested = True
def get_unstopped_processes(self):
return self.unstopped_processes
def get_dispatchers(self):
return self.dispatchers
+
+class DummyFCGIProcessGroup(DummyProcessGroup):
+
+ def __init__(self, config):
+ DummyProcessGroup.__init__(self, config)
+ self.socket_manager = DummySocketManager(config.socket_config)
class PopulatedDummySupervisor(DummySupervisor):
def __init__(self, options, group_name, *pconfigs):
@@ -977,55 +1044,7 @@
def __str__(self):
return 'dummy event'
-
-class DummySocket:
- bind_called = False
- bind_addr = None
- listen_called = False
- listen_backlog = None
- close_called = False
-
- def __init__(self, fd):
- self.fd = fd
- def fileno(self):
- return self.fd
-
- def bind(self, addr):
- self.bind_called = True
- self.bind_addr = addr
-
- def listen(self, backlog):
- self.listen_called = True
- self.listen_backlog = backlog
-
- def close(self):
- self.close_called = True
-
- def __str__(self):
- return 'dummy socket'
-
-class DummySocketConfig:
- def __init__(self, fd):
- self.fd = fd
-
- def addr(self):
- return 'dummy addr'
-
- def create(self):
- return DummySocket(self.fd)
-
-class DummySocketManager:
- def __init__(self, sock_fd):
- self.sock_fd = sock_fd
- self.prepare_socket_called = False
-
- def prepare_socket(self):
- self.prepare_socket_called = True
-
- def get_socket(self):
- return DummySocket(self.sock_fd)
-
def dummy_handler(event, result):
pass
Index: src/supervisor/tests/test_process.py
===================================================================
--- src/supervisor/tests/test_process.py (revision 881)
+++ src/supervisor/tests/test_process.py (working copy)
@@ -12,8 +12,10 @@
from supervisor.tests.base import DummyDispatcher
from supervisor.tests.base import DummyEvent
from supervisor.tests.base import DummyFCGIGroupConfig
+from supervisor.tests.base import DummySocketConfig
+from supervisor.tests.base import DummyProcessGroup
+from supervisor.tests.base import DummyFCGIProcessGroup
from supervisor.tests.base import DummySocketManager
-from supervisor.tests.base import DummyProcessGroup
class SubprocessTests(unittest.TestCase):
def _getTargetClass(self):
@@ -1185,10 +1187,10 @@
options.forkpid = 0
config = DummyPConfig(options, 'good', '/good/filename', uid=1)
instance = self._makeOne(config)
- sock_manager = DummySocketManager(7)
+ sock_config = DummySocketConfig(7)
gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None,
- sock_manager)
- instance.group = DummyProcessGroup(gconfig)
+ sock_config)
+ instance.group = DummyFCGIProcessGroup(gconfig)
result = instance.spawn()
self.assertEqual(result, None)
self.assertEqual(len(options.duped), 3)
@@ -1203,15 +1205,35 @@
config = DummyPConfig(options, 'good', '/good/filename', uid=1)
config.redirect_stderr = True
instance = self._makeOne(config)
- sock_manager = DummySocketManager(13)
+ sock_config = DummySocketConfig(13)
gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None,
- sock_manager)
- instance.group = DummyProcessGroup(gconfig)
+ sock_config)
+ instance.group = DummyFCGIProcessGroup(gconfig)
result = instance.spawn()
self.assertEqual(result, None)
self.assertEqual(len(options.duped), 2)
self.assertEqual(options.duped[13], 0)
self.assertEqual(len(options.fds_closed), options.minfds - 3)
+
+ def test_before_spawn_gets_socket_ref(self):
+ options = DummyOptions()
+ config = DummyPConfig(options, 'good', '/good/filename', uid=1)
+ instance = self._makeOne(config)
+ sock_config = DummySocketConfig(7)
+ gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None,
+ sock_config)
+ instance.group = DummyFCGIProcessGroup(gconfig)
+ self.assertTrue(instance.fcgi_sock is None)
+ instance.before_spawn()
+ self.assertFalse(instance.fcgi_sock is None)
+
+ def test_after_finish_removes_socket_ref(self):
+ options = DummyOptions()
+ config = DummyPConfig(options, 'good', '/good/filename', uid=1)
+ instance = self._makeOne(config)
+ instance.fcgi_sock = 'hello'
+ instance.after_finish()
+ self.assertTrue(instance.fcgi_sock is None)
class ProcessGroupBaseTests(unittest.TestCase):
def _getTargetClass(self):
@@ -1340,6 +1362,21 @@
group.transition()
self.assertEqual(process1.transitioned, True)
+class FastCGIProcessGroupTests(unittest.TestCase):
+ def _getTargetClass(self):
+ from supervisor.process import FastCGIProcessGroup
+ return FastCGIProcessGroup
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_stop_requested_signals_socket_close(self):
+ options = DummyOptions()
+ gconfig = DummyFCGIGroupConfig(options)
+ group = self._makeOne(gconfig, socketManager=DummySocketManager)
+ group.stop_requested()
+ self.assertTrue(group.socket_manager.request_close_called)
+
class EventListenerPoolTests(ProcessGroupBaseTests):
def setUp(self):
from supervisor.events import clear
Index: src/supervisor/tests/test_socket_manager.py
===================================================================
--- src/supervisor/tests/test_socket_manager.py (revision 881)
+++ src/supervisor/tests/test_socket_manager.py (working copy)
@@ -10,6 +10,80 @@
from supervisor.datatypes import UnixStreamSocketConfig
from supervisor.datatypes import InetStreamSocketConfig
+class TestObject():
+
+ def __init__(self):
+ self.value = 5
+
+ def getValue(self):
+ return self.value
+
+ def setValue(self, val):
+ self.value = val
+
+class ProxyTest(unittest.TestCase):
+
+ def setUp(self):
+ self.on_deleteCalled = False
+
+ def _getTargetClass(self):
+ from supervisor.socket_manager import Proxy
+ return Proxy
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def setOnDeleteCalled(self):
+ self.on_deleteCalled = True
+
+ def test_proxy_getattr(self):
+ proxy = self._makeOne(TestObject())
+ self.assertEquals(5, proxy.getValue())
+
+ def test_on_delete(self):
+ proxy = self._makeOne(TestObject(), on_delete=self.setOnDeleteCalled)
+ self.assertEquals(5, proxy.getValue())
+ proxy = None
+ self.assertTrue(self.on_deleteCalled)
+
+class ReferenceCounterTest(unittest.TestCase):
+
+ def setUp(self):
+ self.running = False
+
+ def start(self):
+ self.running = True
+
+ def stop(self):
+ self.running = False
+
+ def _getTargetClass(self):
+ from supervisor.socket_manager import ReferenceCounter
+ return ReferenceCounter
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_incr_and_decr(self):
+ ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start)
+ self.assertFalse(self.running)
+ ctr.increment()
+ self.assertTrue(self.running)
+ self.assertEquals(1, ctr.get_count())
+ ctr.increment()
+ self.assertTrue(self.running)
+ self.assertEquals(2, ctr.get_count())
+ ctr.decrement()
+ self.assertTrue(self.running)
+ self.assertEquals(1, ctr.get_count())
+ ctr.decrement()
+ self.assertFalse(self.running)
+ self.assertEquals(0, ctr.get_count())
+
+ def test_decr_at_zero_raises_error(self):
+ ctr = self._makeOne(on_zero=self.stop,on_non_zero=self.start)
+ self.assertRaises(Exception, ctr.decrement)
+
class SocketManagerTest(unittest.TestCase):
def _getTargetClass(self):
from supervisor.socket_manager import SocketManager
@@ -29,7 +103,7 @@
self.assertEqual(sock_manager.socket_config, conf)
sock = sock_manager.get_socket()
self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345))
- sock_manager.close()
+ sock_manager.request_close()
def test_tcp_w_ip(self):
conf = InetStreamSocketConfig('127.0.0.1', 12345)
@@ -37,7 +111,7 @@
self.assertEqual(sock_manager.socket_config, conf)
sock = sock_manager.get_socket()
self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345))
- sock_manager.close()
+ sock_manager.request_close()
def test_unix(self):
(tf_fd, tf_name) = tempfile.mkstemp();
@@ -46,54 +120,81 @@
self.assertEqual(sock_manager.socket_config, conf)
sock = sock_manager.get_socket()
self.assertEqual(sock.getsockname(), tf_name)
- sock_manager.close()
+ sock = None
+ sock_manager.request_close()
os.close(tf_fd)
- def test_get_socket(self):
+ def test_socket_lifecycle(self):
conf = DummySocketConfig(2)
sock_manager = self._makeOne(conf)
+ #Assert that sockets are created on demand
+ self.assertFalse(sock_manager.is_prepared())
+ #Get two socket references
sock = sock_manager.get_socket()
+ self.assertTrue(sock_manager.is_prepared()) #socket created on demand
+ sock_id = id(sock._get())
sock2 = sock_manager.get_socket()
- self.assertEqual(sock, sock2)
- sock_manager.close()
+ sock2_id = id(sock2._get())
+ #Assert that they are not the same proxy object
+ self.assertNotEqual(sock, sock2)
+ #Assert that they are the same underlying socket
+ self.assertEqual(sock_id, sock2_id)
+ #Request socket close
+ sock_manager.request_close()
+ #Socket not actually closed yet b/c ref ct is 2
+ self.assertTrue(sock_manager.is_prepared())
+ self.assertFalse(sock_manager.socket.close_called)
+ sock = None
+ #Socket not actually closed yet b/c ref ct is 1
+ self.assertTrue(sock_manager.is_prepared())
+ self.assertFalse(sock_manager.socket.close_called)
+ sock2 = None
+ #Socket closed
+ self.assertFalse(sock_manager.is_prepared())
+ self.assertTrue(sock_manager.socket.close_called)
+
+ #Get a new socket reference
sock3 = sock_manager.get_socket()
- self.assertNotEqual(sock, sock3)
+ self.assertTrue(sock_manager.is_prepared())
+ sock3_id = id(sock3._get())
+ #Assert that it is not the same socket
+ self.assertNotEqual(sock_id, sock3_id)
+ #Drop ref ct to zero
+ del sock3
+ #Assert that socket is still open until close is requested
+ self.assertTrue(sock_manager.is_prepared())
+ self.assertFalse(sock_manager.socket.close_called)
+ sock_manager.request_close()
+ #Now assert that socket is closed
+ self.assertFalse(sock_manager.is_prepared())
+ self.assertTrue(sock_manager.socket.close_called)
def test_prepare_socket(self):
conf = DummySocketConfig(1)
sock_manager = self._makeOne(conf)
sock = sock_manager.get_socket()
- self.assertTrue(sock_manager.prepared)
+ self.assertTrue(sock_manager.is_prepared())
self.assertTrue(sock.bind_called)
self.assertEqual(sock.bind_addr, 'dummy addr')
self.assertTrue(sock.listen_called)
self.assertEqual(sock.listen_backlog, socket.SOMAXCONN)
self.assertFalse(sock.close_called)
-
- def test_close(self):
- conf = DummySocketConfig(6)
- sock_manager = self._makeOne(conf)
- sock = sock_manager.get_socket()
- self.assertFalse(sock.close_called)
- self.assertTrue(sock_manager.prepared)
- sock_manager.close()
- self.assertFalse(sock_manager.prepared)
- self.assertTrue(sock.close_called)
+ sock_manager.request_close()
def test_tcp_socket_already_taken(self):
conf = InetStreamSocketConfig('127.0.0.1', 12345)
sock_manager = self._makeOne(conf)
- sock_manager.get_socket()
+ sock = sock_manager.get_socket()
sock_manager2 = self._makeOne(conf)
- self.assertRaises(socket.error, sock_manager2.prepare_socket)
- sock_manager.close()
+ self.assertRaises(socket.error, sock_manager2.get_socket)
+ sock = None
+ sock_manager.request_close()
def test_unix_bad_sock(self):
conf = UnixStreamSocketConfig('/notthere/foo.sock')
sock_manager = self._makeOne(conf)
- self.assertRaises(socket.error, sock_manager.get_socket)
- sock_manager.close()
-
+ self.assertRaises(socket.error, sock_manager.get_socket)
+
def test_suite():
return unittest.findTestCases(sys.modules[__name__])
Index: src/supervisor/tests/test_rpcinterfaces.py
===================================================================
--- src/supervisor/tests/test_rpcinterfaces.py (revision 881)
+++ src/supervisor/tests/test_rpcinterfaces.py (working copy)
@@ -732,6 +732,7 @@
self.assertEqual(process1.stop_called, True)
process2 = supervisord.process_groups['foo'].processes['process2']
self.assertEqual(process2.stop_called, True)
+ self.assertTrue(supervisord.process_groups['foo'].stop_was_requested)
def test_stopProcessGroup_nowait(self):
options = DummyOptions()
Index: src/supervisor/process.py
===================================================================
--- src/supervisor/process.py (revision 881)
+++ src/supervisor/process.py (working copy)
@@ -38,6 +38,8 @@
from supervisor.datatypes import RestartUnconditionally
+from supervisor.socket_manager import SocketManager
+
class Subprocess:
"""A class to manage a subprocess."""
@@ -233,7 +235,7 @@
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
return
-
+
try:
pid = options.fork()
except OSError, why:
@@ -557,18 +559,49 @@
class FastCGISubprocess(Subprocess):
"""Extends Subprocess class to handle FastCGI subprocesses"""
- def _prepare_child_fds(self):
+ def __init__(self, config):
+ Subprocess.__init__(self, config)
+ self.fcgi_sock = None
+
+ def before_spawn(self):
+ """
+ The FastCGI socket needs to be created by the parent before we fork
+ """
if self.group is None:
raise NotImplementedError('No group set for FastCGISubprocess')
- if not hasattr(self.group, 'config'):
- raise NotImplementedError('No config found for group on '
- 'FastCGISubprocess')
- if not hasattr(self.group.config, 'socket_manager'):
+ if not hasattr(self.group, 'socket_manager'):
raise NotImplementedError('No SocketManager set for '
- 'FastCGISubprocess group')
- sock = self.group.config.socket_manager.get_socket()
- sock_fd = sock.fileno()
+ '%s:%s' % (self.group, dir(self.group)))
+ self.fcgi_sock = self.group.socket_manager.get_socket()
+
+ def spawn(self):
+ """
+ Overrides Subprocess.spawn() so we can hook in before it happens
+ """
+ self.before_spawn()
+ Subprocess.spawn(self)
+ def after_finish(self):
+ """
+ Releases reference to FastCGI socket when process is reaped
+ """
+ #Remove object reference to decrement the reference count
+ self.fcgi_sock = None
+
+ def finish(self, pid, sts):
+ """
+ Overrides Subprocess.finish() so we can hook in after it happens
+ """
+ Subprocess.finish(self, pid, sts)
+ self.after_finish()
+
+ def _prepare_child_fds(self):
+ """
+ Overrides Subprocess._prepare_child_fds()
+ The FastCGI socket needs to be set to file descriptor 0 in the child
+ """
+ sock_fd = self.fcgi_sock.fileno()
+
options = self.config.options
options.dup2(sock_fd, 0)
options.dup2(self.pipes['child_stdout'], 1)
@@ -578,7 +611,7 @@
options.dup2(self.pipes['child_stderr'], 2)
for i in range(3, options.minfds):
options.close_fd(i)
-
+
class ProcessGroupBase:
def __init__(self, config):
self.config = config
@@ -602,6 +635,12 @@
for process in self.processes.values():
process.reopenlogs()
+ def stop_requested(self):
+ """ Hook so that the process group gets notified by
+ it's geting stopped by an RPC interface call
+ """
+ pass
+
def stop_all(self):
processes = self.processes.values()
processes.sort()
@@ -634,7 +673,30 @@
def transition(self):
for proc in self.processes.values():
proc.transition()
+
+class FastCGIProcessGroup(ProcessGroup):
+ def __init__(self, config, **kwargs):
+ ProcessGroup.__init__(self, config)
+ sockManagerKlass = kwargs.get('socketManager', SocketManager)
+ self.socket_manager = sockManagerKlass(config.socket_config,
+ logger=config.options.logger)
+ #It's not required to call get_socket() here but we want
+ #to fail early during start up if there is a config error
+ try:
+ sock = self.socket_manager.get_socket()
+ except Exception, e:
+ raise ValueError('Could not create FastCGI socket %s: %s' %
(self.socket_manager.config(), e))
+
+ def stop_requested(self):
+ """ Overriden from ProcessGroup
+ Request close on FCGI socket (it will actually be close when all
+ the child processes are reaped)
+ """
+ self.config.options.logger.debug('Stop requested for fcgi group %s'
+ % self.config.name)
+ self.socket_manager.request_close()
+
class EventListenerPool(ProcessGroupBase):
def __init__(self, config):
ProcessGroupBase.__init__(self, config)
Index: src/supervisor/socket_manager.py
===================================================================
--- src/supervisor/socket_manager.py (revision 881)
+++ src/supervisor/socket_manager.py (working copy)
@@ -14,16 +14,63 @@
import socket
+class Proxy():
+ """ Class for wrapping a shared resource object and getting
+ notified when it's deleted
+ """
+
+ def __init__(self, object, **kwargs):
+ self.object = object
+ self.on_delete = kwargs.get('on_delete', None)
+
+ def __del__(self):
+ if self.on_delete:
+ self.on_delete()
+
+ def __getattr__(self, name):
+ return getattr(self.object, name)
+
+ def _get(self):
+ return self.object
+
+class ReferenceCounter():
+ """ Class for tracking references to a shared resource
+ """
+
+ def __init__(self, **kwargs):
+ self.on_non_zero = kwargs['on_non_zero']
+ self.on_zero = kwargs['on_zero']
+ self.ref_count = 0
+
+ def get_count(self):
+ return self.ref_count
+
+ def increment(self):
+ if self.ref_count == 0:
+ self.on_non_zero()
+ self.ref_count = self.ref_count + 1
+
+ def decrement(self):
+ if self.ref_count <= 0:
+ raise Exception('Illegal operation: cannot decrement below zero')
+ self.ref_count -= 1
+ if self.ref_count == 0:
+ self.on_zero()
+
class SocketManager:
""" Class for managing sockets in servers that create/bind/listen
- before forking multiple child processes to accept() """
-
- socket_config = None #SocketConfig object
- socket = None #Socket being managed
- prepared = False
+ before forking multiple child processes to accept()
+ Sockets are managed at the process group level and referenced counted
+ at the process level b/c that's really the only place to hook in
+ """
- def __init__(self, socket_config):
+ def __init__(self, socket_config, **kwargs):
+ self.logger = kwargs.get('logger', None)
+ self.socket = None
+ self.prepared = False
self.socket_config = socket_config
+ self.close_requested = False
+ self.ref_ctr = ReferenceCounter(on_zero=self._on_ref_ct_zero,
on_non_zero=self._prepare_socket)
def __repr__(self):
return '<%s at %s for %s>' % (self.__class__,
@@ -33,17 +80,46 @@
def config(self):
return self.socket_config
- def prepare_socket(self):
- self.socket = self.socket_config.create()
- self.socket.bind(self.socket_config.addr())
- self.socket.listen(socket.SOMAXCONN)
- self.prepared = True
+ def is_prepared(self):
+ return self.prepared
+
+ def get_socket(self):
+ self.ref_ctr.increment()
+ self._require_prepared()
+ return Proxy(self.socket, on_delete=self.ref_ctr.decrement)
- def get_socket(self):
+ def get_socket_ref_count(self):
+ self._require_prepared()
+ return self.ref_ctr.get_count()
+
+ def request_close(self):
+ if self.prepared:
+ if self.ref_ctr.get_count() == 0:
+ self._close()
+ else:
+ self.close_requested = True
+
+ def _require_prepared(self):
if not self.prepared:
- self.prepare_socket()
- return self.socket
-
- def close(self):
+ raise Exception('Socket has not been prepared')
+
+ def _prepare_socket(self):
+ if not self.prepared:
+ if self.logger:
+ self.logger.info('Creating socket %s' % self.socket_config)
+ self.socket = self.socket_config.create()
+ self.socket.bind(self.socket_config.addr())
+ self.socket.listen(socket.SOMAXCONN)
+ self.prepared = True
+
+ def _on_ref_ct_zero(self):
+ if self.close_requested:
+ self.close_requested = False
+ self._close()
+
+ def _close(self):
+ self._require_prepared()
+ if self.logger:
+ self.logger.info('Closing socket %s' % self.socket_config)
self.socket.close()
self.prepared = False
_______________________________________________
Supervisor-users mailing list
[email protected]
http://lists.supervisord.org/mailman/listinfo/supervisor-users