Hi,
The attached patchset allows multiple fcgi-programs to share sockets.
After applying, a new boolean option 'reuse_socket' is available
(default: false, for compatibility). Enabling this option and specifying
the same socket= for multiple process groups makes fd 0 of all processes
from these groups point to the exact same socket.
Without this option the behaviour is somewhat unexpected (I'd say
buggy): Unix sockets are created sequentially so all but the last one to
bind to a specific path will _never_ get a connection (their fd 0 is
unreachable from the file system) and duplicate TCP sockets make
supervisord explode at startup (an unhandled exception and crash,
leaving the first process running and forever occupying the socket).
The patchset is lightly tested and appears to work fine.
BTW: supervisord's option handling is very unclear for me (a twisted
maze of almost dict-like classes, all alike), so buyer beware.
Best regards,
Grzegorz Nosek
>From 8a4cfbfb275db7badf747ff0508d2ed4ffc840b3 Mon Sep 17 00:00:00 2001
From: Grzegorz Nosek <r...@localdomain.pl>
Date: Mon, 22 Nov 2010 17:13:19 +0100
Subject: [PATCH 1/3] Split out ManagedSocket class from SocketManager
The purpose is to decouple a SocketManager instance from the
lifetime of the socket's refcount. A socket may live longer than
its original SocketManager, as long as another SocketManager keeps
a reference. This will happen soon.
---
src/supervisor/socket_manager.py | 33 ++++++++++++++++++++++++++-------
1 files changed, 26 insertions(+), 7 deletions(-)
diff --git a/src/supervisor/socket_manager.py b/src/supervisor/socket_manager.py
index 707ec4b..8691a0d 100644
--- a/src/supervisor/socket_manager.py
+++ b/src/supervisor/socket_manager.py
@@ -57,13 +57,7 @@ class ReferenceCounter:
if self.ref_count == 0:
self.on_zero()
-class SocketManager:
- """ Class for managing sockets in servers that create/bind/listen
- before forking multiple child processes to accept()
- Sockets are managed at the process group level and referenced counted
- at the process level b/c that's really the only place to hook in
- """
-
+class ManagedSocket:
def __init__(self, socket_config, **kwargs):
self.logger = kwargs.get('logger', None)
self.socket = None
@@ -109,3 +103,28 @@ class SocketManager:
self.logger.info('Closing socket %s' % self.socket_config)
self.socket.close()
self.prepared = False
+
+class SocketManager:
+ """ Class for managing sockets in servers that create/bind/listen
+ before forking multiple child processes to accept()
+ Sockets are managed at the process group level and referenced counted
+ at the process level b/c that's really the only place to hook in
+ """
+
+ def __init__(self, socket_config, **kwargs):
+ self.logger = kwargs.get('logger', None)
+ self.m_socket = ManagedSocket(socket_config, logger=self.logger)
+ self.socket_config = socket_config
+
+ def config(self):
+ return self.socket_config
+
+ def get_socket(self):
+ return self.m_socket.get_socket()
+
+ def is_prepared(self):
+ return self.m_socket.is_prepared()
+
+ @property
+ def socket(self):
+ return self.m_socket.socket
\ No newline at end of file
--
1.7.0.4
>From 72f931ee782368a7614000b6cd5f8c98ab47855d Mon Sep 17 00:00:00 2001
From: Grzegorz Nosek <r...@localdomain.pl>
Date: Mon, 29 Nov 2010 15:45:11 +0100
Subject: [PATCH 2/3] Enable reuse of sockets between different process groups
socket_config instances may specify reuse=True, which means that
all process groups having the same socket_config.url (not the same
socket_config) will share the socket
---
src/supervisor/datatypes.py | 4 ++-
src/supervisor/socket_manager.py | 35 +++++++++++++++++++++++++-
src/supervisor/tests/base.py | 1 +
src/supervisor/tests/test_socket_manager.py | 9 +++++++
4 files changed, 46 insertions(+), 3 deletions(-)
diff --git a/src/supervisor/datatypes.py b/src/supervisor/datatypes.py
index 26bfe68..a85314e 100644
--- a/src/supervisor/datatypes.py
+++ b/src/supervisor/datatypes.py
@@ -206,10 +206,11 @@ class InetStreamSocketConfig(SocketConfig):
host = None # host name or ip to bind to
port = None # integer port to bind to
- def __init__(self, host, port):
+ def __init__(self, host, port, reuse=False):
self.host = host.lower()
self.port = port_number(port)
self.url = 'tcp://%s:%d' % (self.host, self.port)
+ self.reuse = reuse
def addr(self):
return (self.host, self.port)
@@ -233,6 +234,7 @@ class UnixStreamSocketConfig(SocketConfig):
self.url = 'unix://%s' % (path)
self.mode = kwargs.get('mode', None)
self.owner = kwargs.get('owner', None)
+ self.reuse = kwargs.get('reuse', False)
def addr(self):
return self.path
diff --git a/src/supervisor/socket_manager.py b/src/supervisor/socket_manager.py
index 8691a0d..9a7ffbd 100644
--- a/src/supervisor/socket_manager.py
+++ b/src/supervisor/socket_manager.py
@@ -64,7 +64,9 @@ class ManagedSocket:
self.prepared = False
self.socket_config = socket_config
self.ref_ctr = ReferenceCounter(on_zero=self._close, on_non_zero=self._prepare_socket)
-
+ self.outer_ref_ctr = ReferenceCounter(on_zero=self.unregister, on_non_zero=lambda:None)
+ self.url = socket_config.url
+
def __repr__(self):
return '<%s at %s for %s>' % (self.__class__,
id(self),
@@ -104,6 +106,21 @@ class ManagedSocket:
self.socket.close()
self.prepared = False
+ def outer_incref(self):
+ self.outer_ref_ctr.increment()
+
+ def outer_decref(self):
+ self.outer_ref_ctr.decrement()
+
+ def unregister(self):
+ try:
+ s = SocketManager.sockets[self.url]
+ except KeyError:
+ return
+
+ if s is self:
+ del SocketManager.sockets[self.url]
+
class SocketManager:
""" Class for managing sockets in servers that create/bind/listen
before forking multiple child processes to accept()
@@ -111,10 +128,24 @@ class SocketManager:
at the process level b/c that's really the only place to hook in
"""
+ # if supervisord ever goes multithreaded, this requires locking
+ sockets = dict()
+
def __init__(self, socket_config, **kwargs):
self.logger = kwargs.get('logger', None)
- self.m_socket = ManagedSocket(socket_config, logger=self.logger)
+ if socket_config.reuse:
+ sock = SocketManager.sockets.get(socket_config.url, None)
+ else:
+ sock = None
+
+ if sock is None:
+ sock = ManagedSocket(socket_config, logger=self.logger)
+ sock.outer_incref()
+
+ if socket_config.reuse:
+ SocketManager.sockets[socket_config.url] = sock
self.socket_config = socket_config
+ self.m_socket = Proxy(sock, on_delete=sock.outer_decref)
def config(self):
return self.socket_config
diff --git a/src/supervisor/tests/base.py b/src/supervisor/tests/base.py
index fac586b..f20e872 100644
--- a/src/supervisor/tests/base.py
+++ b/src/supervisor/tests/base.py
@@ -323,6 +323,7 @@ class DummySocket:
class DummySocketConfig:
def __init__(self, fd):
self.fd = fd
+ self.url = "fd://%d" % fd
def addr(self):
return 'dummy addr'
diff --git a/src/supervisor/tests/test_socket_manager.py b/src/supervisor/tests/test_socket_manager.py
index 2ec3874..f59f7d5 100644
--- a/src/supervisor/tests/test_socket_manager.py
+++ b/src/supervisor/tests/test_socket_manager.py
@@ -177,6 +177,15 @@ class SocketManagerTest(unittest.TestCase):
sock_manager2 = self._makeOne(conf)
self.assertRaises(socket.error, sock_manager2.get_socket)
sock = None
+
+ def test_tcp_socket_reuse(self):
+ conf = InetStreamSocketConfig('127.0.0.1', 12345)
+ sock_manager = self._makeOne(conf, reuse=True)
+ sock = sock_manager.get_socket()
+ sock_manager2 = self._makeOne(conf, reuse=True)
+ sock2 = sock_manager2.get_socket()
+ self.assertEqual(id(sock._get()), id(sock2._get()))
+ sock = None
def test_unix_bad_sock(self):
conf = UnixStreamSocketConfig('/notthere/foo.sock')
--
1.7.0.4
>From 2c1a9f3d1b6def98b62e64145ef7334998465dfa Mon Sep 17 00:00:00 2001
From: Grzegorz Nosek <r...@localdomain.pl>
Date: Mon, 29 Nov 2010 15:49:16 +0100
Subject: [PATCH 3/3] Expose reuse_socket setting in config
fcgi-program sections may now include reuse_socket setting
to enable socket sharing
---
src/supervisor/options.py | 17 ++++++++++++-----
src/supervisor/tests/test_options.py | 4 ++--
src/supervisor/tests/test_supervisord.py | 3 ++-
3 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/src/supervisor/options.py b/src/supervisor/options.py
index d342892..fe6b64e 100644
--- a/src/supervisor/options.py
+++ b/src/supervisor/options.py
@@ -674,6 +674,7 @@ class ServerOptions(Options):
priority = integer(get(section, 'priority', 999))
proc_uid = name_to_uid(get(section, 'user', None))
+ reuse_socket = boolean(get(section, 'reuse_socket', False))
socket_owner = get(section, 'socket_owner', None)
if socket_owner is not None:
@@ -701,7 +702,8 @@ class ServerOptions(Options):
socket = expand(socket, expansions, 'socket')
try:
socket_config = self.parse_fcgi_socket(socket, proc_uid,
- socket_owner, socket_mode)
+ socket_owner, socket_mode,
+ reuse_socket)
except ValueError, e:
raise ValueError('%s in [%s] socket' % (str(e), section))
@@ -715,7 +717,8 @@ class ServerOptions(Options):
groups.sort()
return groups
- def parse_fcgi_socket(self, sock, proc_uid, socket_owner, socket_mode):
+ def parse_fcgi_socket(self, sock, proc_uid, socket_owner, socket_mode,
+ reuse_socket):
if sock.startswith('unix://'):
path = sock[7:]
#Check it's an absolute path
@@ -733,7 +736,8 @@ class ServerOptions(Options):
socket_mode = 0700
return UnixStreamSocketConfig(path, owner=socket_owner,
- mode=socket_mode)
+ mode=socket_mode,
+ reuse=reuse_socket)
if socket_owner is not None or socket_mode is not None:
raise ValueError("socket_owner and socket_mode params should"
@@ -743,7 +747,7 @@ class ServerOptions(Options):
if m:
host = m.group(1)
port = int(m.group(2))
- return InetStreamSocketConfig(host, port)
+ return InetStreamSocketConfig(host, port, reuse=reuse_socket)
raise ValueError("Bad socket format %s", sock)
@@ -775,6 +779,7 @@ class ServerOptions(Options):
numprocs_start = integer(get(section, 'numprocs_start', 0))
process_name = get(section, 'process_name', '%(program_name)s')
clear_environment = boolean(get(section, 'clear_environment', 'false'))
+ reuse_socket = boolean(get(section, 'reuse_socket', 'false'))
environment_str = get(section, 'environment', '')
stdout_cmaxbytes = byte_size(get(section,'stdout_capture_maxbytes','0'))
stdout_events = boolean(get(section, 'stdout_events_enabled','false'))
@@ -875,6 +880,7 @@ class ServerOptions(Options):
clear_environment=clear_environment,
pam_service=pam_service,
user=user,
+ reuse_socket=reuse_socket,
serverurl=serverurl)
programs.append(pconfig)
@@ -1578,7 +1584,8 @@ class ProcessConfig(Config):
'stderr_logfile', 'stderr_capture_maxbytes',
'stderr_logfile_backups', 'stderr_logfile_maxbytes',
'stderr_events_enabled',
- 'stopsignal', 'stopwaitsecs', 'exitcodes', 'redirect_stderr' ]
+ 'stopsignal', 'stopwaitsecs', 'exitcodes', 'redirect_stderr',
+ 'reuse_socket']
optional_param_names = [ 'environment', 'serverurl', 'clear_environment',
'pam_service', 'user']
diff --git a/src/supervisor/tests/test_options.py b/src/supervisor/tests/test_options.py
index 55acf6a..f16f9bb 100644
--- a/src/supervisor/tests/test_options.py
+++ b/src/supervisor/tests/test_options.py
@@ -1282,7 +1282,7 @@ class TestProcessConfig(unittest.TestCase):
'stderr_events_enabled',
'stderr_logfile_backups', 'stderr_logfile_maxbytes',
'stopsignal', 'stopwaitsecs', 'exitcodes',
- 'redirect_stderr', 'environment'):
+ 'redirect_stderr', 'environment', 'reuse_socket'):
defaults[name] = name
defaults.update(kw)
return self._getTargetClass()(*arg, **defaults)
@@ -1356,7 +1356,7 @@ class FastCGIProcessConfigTest(unittest.TestCase):
'stderr_events_enabled',
'stderr_logfile_backups', 'stderr_logfile_maxbytes',
'stopsignal', 'stopwaitsecs', 'exitcodes',
- 'redirect_stderr', 'environment'):
+ 'redirect_stderr', 'environment', 'reuse_socket'):
defaults[name] = name
defaults.update(kw)
return self._getTargetClass()(*arg, **defaults)
diff --git a/src/supervisor/tests/test_supervisord.py b/src/supervisor/tests/test_supervisord.py
index 1be744d..9b94f01 100644
--- a/src/supervisor/tests/test_supervisord.py
+++ b/src/supervisor/tests/test_supervisord.py
@@ -259,7 +259,8 @@ class SupervisordTests(unittest.TestCase):
'stderr_logfile_backups': 0, 'stderr_logfile_maxbytes': 0,
'redirect_stderr': False,
'stopsignal': None, 'stopwaitsecs': 10,
- 'exitcodes': (0,2), 'environment': None, 'serverurl': None }
+ 'exitcodes': (0,2), 'environment': None, 'serverurl': None,
+ 'reuse_socket': False }
result.update(params)
return ProcessConfig(options, **result)
--
1.7.0.4
_______________________________________________
Supervisor-users mailing list
Supervisor-users@lists.supervisord.org
http://lists.supervisord.org/mailman/listinfo/supervisor-users