Author: msergeant
Date: Mon Dec  3 09:45:31 2007
New Revision: 822

Modified:
   trunk/qpsmtpd-async

Log:
Support for HUPing the server to clear the cache
Wake-one child support


Modified: trunk/qpsmtpd-async
==============================================================================
--- trunk/qpsmtpd-async (original)
+++ trunk/qpsmtpd-async Mon Dec  3 09:45:31 2007
@@ -21,10 +21,11 @@
 use Carp;
 use POSIX qw(WNOHANG);
 use Getopt::Long;
+use List::Util qw(shuffle);
 
 $|++;
 
-use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
+use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET 
AF_UNIX SOCK_STREAM PF_UNSPEC);
 
 $SIG{'PIPE'} = "IGNORE";  # handled manually
 
@@ -79,6 +80,7 @@
     'pid-file=s'            => \$PID_FILE,
     'd|detach'              => \$DETACH,
     'h|help'                => \&help,
+    'config-port'           => \$CONFIG_PORT,
 ) || help();
 
 # detaint the commandline
@@ -98,7 +100,11 @@
 my $SERVER;
 my $CONFIG_SERVER;
 
+use constant ACCEPTING  => 1;
+use constant RESTARTING => 2;
 my %childstatus = ();
+my %childhandle = ();
+
 if ($PID_FILE && -r $PID_FILE) {
     open PID, "<$PID_FILE"
        or die "open_pidfile $PID_FILE: $!\n";
@@ -142,14 +148,23 @@
 
 sub spawn_child {
     my $plugin_loader = shift || Qpsmtpd::SMTP->new;
+    
+    socketpair(my $reader, my $writer, AF_UNIX, SOCK_STREAM, PF_UNSPEC) || die 
"Unable to create a pipe";
+    $reader->autoflush(1);
+    $writer->autoflush(1);
+    
     if (my $pid = _fork) {
+        $childstatus{$pid} = ACCEPTING;
+        $childhandle{$pid} = $writer;
         return $pid;
     }
 
-    $SIG{HUP} = $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
+    $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
     $SIG{PIPE} = 'IGNORE';
-
-    Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
+    $SIG{HUP}  = sub { print "Child got SIGHUP\n" };
+    # sub { cmd_hup(); Qpsmtpd::PollServer->EventLoop(); }; # so we can HUP 
just one child
+    
+    Qpsmtpd::PollServer->OtherFds(fileno($reader) => sub { 
command_handler($reader) });
 
     $plugin_loader->run_hooks('post-fork');
 
@@ -157,6 +172,15 @@
     exit;
 }
 
+sub sig_hup {
+    for my $writer (values %childhandle) {
+        print $writer "hup\n";
+        my $result = <$writer>;
+    }
+    $SIG{HUP} = \&sig_hup;
+    Qpsmtpd::PollServer->EventLoop();
+}
+
 sub sig_chld {
     my $spawn_count = 0;
     while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
@@ -167,13 +191,13 @@
         last unless $child > 0;
         print "SIGCHLD: child $child died\n";
         delete $childstatus{$child};
+        delete $childhandle{$child};
         $spawn_count++;
     }
     if ($spawn_count) {
         for (1..$spawn_count) {
             # restart a new child if in poll server mode
             my $pid = spawn_child();
-            $childstatus{$pid} = 1;
         }
     }
     $SIG{CHLD} = \&sig_chld;
@@ -250,27 +274,21 @@
 
     $SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
 
-    if ($PROCS > 1) {
-        for (1..$PROCS) {
-            my $pid = spawn_child($plugin_loader);
-            $childstatus{$pid} = 1;
-        }
-        $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children 
$POLL");
-        $SIG{'CHLD'} = \&sig_chld;
-        sleep while (1);
-    }
-    else {
-        $plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process 
$POLL");
-        Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
-                                      fileno($CONFIG_SERVER) => 
\&config_handler,
-                                      );
-        $plugin_loader->run_hooks('post-fork');
-        while (1) {
-            Qpsmtpd::PollServer->EventLoop();
-        }
-        exit;
+    for (1..$PROCS) {
+        my $pid = spawn_child($plugin_loader);
     }
-
+    $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children 
$POLL");
+    $SIG{CHLD} = \&sig_chld;
+    $SIG{HUP}  = \&sig_hup;
+    
+    Qpsmtpd::PollServer->OtherFds(
+        fileno($SERVER) => \&accept_handler,
+        fileno($CONFIG_SERVER) => \&config_handler
+        );
+    Qpsmtpd::PollServer->EventLoop;
+    
+    exit;
+    
 }
 
 sub config_handler {
@@ -291,8 +309,44 @@
     return;
 }
 
-# Accept all new connections
+# server is ready to accept - tell a child to accept().
 sub accept_handler {
+    # pick a random child to tell to accept()
+    my $child = (shuffle keys %childstatus)[0];
+    if ($childstatus{$child} != ACCEPTING) {
+        # recurse...
+        return accept_handler() if %childstatus;
+        die "No children available";
+    }
+    my $writer = $childhandle{$child};
+    print $writer "accept\n";
+    my $result = <$writer>;
+}
+
+sub command_handler {
+    my $reader = shift;
+    
+    chomp(my $command = <$reader>);
+    
+    #print "Got command: $command\n";
+    
+    my $real_command = "cmd_$command";
+    
+    no strict 'refs';
+    my $result = $real_command->();
+    print $reader "$result\n";
+}
+
+sub cmd_hup {
+    # clear cache
+    #print "Clearing cache\n";
+    Qpsmtpd::clear_config_cache();
+    # should also reload modules... but can't do that yet.
+    return "ok";
+}
+
+# Accept all new connections
+sub cmd_accept {
     for (1 .. $NUMACCEPT) {
         return unless _accept_handler();
     }
@@ -303,6 +357,7 @@
     $NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX;
     $ACCEPT_RSET->cancel;
     $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
+    return "ok";
 }
 
 use Errno qw(EAGAIN EWOULDBLOCK);
@@ -321,6 +376,7 @@
     IO::Handle::blocking($csock, 0);
     #setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
 
+    #print "Got connection\n";
     my $client = Qpsmtpd::PollServer->new($csock);
     
     if ($PAUSED) {

Reply via email to