Author: msergeant
Date: Mon Dec 3 09:45:31 2007
New Revision: 822
Modified:
trunk/qpsmtpd-async
Log:
Support for HUPing the server to clear the cache
Wake-one child support
Modified: trunk/qpsmtpd-async
==============================================================================
--- trunk/qpsmtpd-async (original)
+++ trunk/qpsmtpd-async Mon Dec 3 09:45:31 2007
@@ -21,10 +21,11 @@
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
+use List::Util qw(shuffle);
$|++;
-use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
+use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET
AF_UNIX SOCK_STREAM PF_UNSPEC);
$SIG{'PIPE'} = "IGNORE"; # handled manually
@@ -79,6 +80,7 @@
'pid-file=s' => \$PID_FILE,
'd|detach' => \$DETACH,
'h|help' => \&help,
+ 'config-port' => \$CONFIG_PORT,
) || help();
# detaint the commandline
@@ -98,7 +100,11 @@
my $SERVER;
my $CONFIG_SERVER;
+use constant ACCEPTING => 1;
+use constant RESTARTING => 2;
my %childstatus = ();
+my %childhandle = ();
+
if ($PID_FILE && -r $PID_FILE) {
open PID, "<$PID_FILE"
or die "open_pidfile $PID_FILE: $!\n";
@@ -142,14 +148,23 @@
sub spawn_child {
my $plugin_loader = shift || Qpsmtpd::SMTP->new;
+
+ socketpair(my $reader, my $writer, AF_UNIX, SOCK_STREAM, PF_UNSPEC) || die
"Unable to create a pipe";
+ $reader->autoflush(1);
+ $writer->autoflush(1);
+
if (my $pid = _fork) {
+ $childstatus{$pid} = ACCEPTING;
+ $childhandle{$pid} = $writer;
return $pid;
}
- $SIG{HUP} = $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
+ $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
$SIG{PIPE} = 'IGNORE';
-
- Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
+ $SIG{HUP} = sub { print "Child got SIGHUP\n" };
+ # sub { cmd_hup(); Qpsmtpd::PollServer->EventLoop(); }; # so we can HUP
just one child
+
+ Qpsmtpd::PollServer->OtherFds(fileno($reader) => sub {
command_handler($reader) });
$plugin_loader->run_hooks('post-fork');
@@ -157,6 +172,15 @@
exit;
}
+sub sig_hup {
+ for my $writer (values %childhandle) {
+ print $writer "hup\n";
+ my $result = <$writer>;
+ }
+ $SIG{HUP} = \&sig_hup;
+ Qpsmtpd::PollServer->EventLoop();
+}
+
sub sig_chld {
my $spawn_count = 0;
while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
@@ -167,13 +191,13 @@
last unless $child > 0;
print "SIGCHLD: child $child died\n";
delete $childstatus{$child};
+ delete $childhandle{$child};
$spawn_count++;
}
if ($spawn_count) {
for (1..$spawn_count) {
# restart a new child if in poll server mode
my $pid = spawn_child();
- $childstatus{$pid} = 1;
}
}
$SIG{CHLD} = \&sig_chld;
@@ -250,27 +274,21 @@
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
- if ($PROCS > 1) {
- for (1..$PROCS) {
- my $pid = spawn_child($plugin_loader);
- $childstatus{$pid} = 1;
- }
- $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children
$POLL");
- $SIG{'CHLD'} = \&sig_chld;
- sleep while (1);
- }
- else {
- $plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process
$POLL");
- Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
- fileno($CONFIG_SERVER) =>
\&config_handler,
- );
- $plugin_loader->run_hooks('post-fork');
- while (1) {
- Qpsmtpd::PollServer->EventLoop();
- }
- exit;
+ for (1..$PROCS) {
+ my $pid = spawn_child($plugin_loader);
}
-
+ $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children
$POLL");
+ $SIG{CHLD} = \&sig_chld;
+ $SIG{HUP} = \&sig_hup;
+
+ Qpsmtpd::PollServer->OtherFds(
+ fileno($SERVER) => \&accept_handler,
+ fileno($CONFIG_SERVER) => \&config_handler
+ );
+ Qpsmtpd::PollServer->EventLoop;
+
+ exit;
+
}
sub config_handler {
@@ -291,8 +309,44 @@
return;
}
-# Accept all new connections
+# server is ready to accept - tell a child to accept().
sub accept_handler {
+ # pick a random child to tell to accept()
+ my $child = (shuffle keys %childstatus)[0];
+ if ($childstatus{$child} != ACCEPTING) {
+ # recurse...
+ return accept_handler() if %childstatus;
+ die "No children available";
+ }
+ my $writer = $childhandle{$child};
+ print $writer "accept\n";
+ my $result = <$writer>;
+}
+
+sub command_handler {
+ my $reader = shift;
+
+ chomp(my $command = <$reader>);
+
+ #print "Got command: $command\n";
+
+ my $real_command = "cmd_$command";
+
+ no strict 'refs';
+ my $result = $real_command->();
+ print $reader "$result\n";
+}
+
+sub cmd_hup {
+ # clear cache
+ #print "Clearing cache\n";
+ Qpsmtpd::clear_config_cache();
+ # should also reload modules... but can't do that yet.
+ return "ok";
+}
+
+# Accept all new connections
+sub cmd_accept {
for (1 .. $NUMACCEPT) {
return unless _accept_handler();
}
@@ -303,6 +357,7 @@
$NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX;
$ACCEPT_RSET->cancel;
$ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
+ return "ok";
}
use Errno qw(EAGAIN EWOULDBLOCK);
@@ -321,6 +376,7 @@
IO::Handle::blocking($csock, 0);
#setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
+ #print "Got connection\n";
my $client = Qpsmtpd::PollServer->new($csock);
if ($PAUSED) {