Author: msergeant
Date: Fri Dec  8 11:46:18 2006
New Revision: 679

Added:
   branches/0.3x/lib/Qpsmtpd/PollServer.pm
   branches/0.3x/qpsmtpd-async   (contents, props changed)
Modified:
   branches/0.3x/lib/Qpsmtpd.pm
   branches/0.3x/lib/Qpsmtpd/SMTP.pm

Log:
Async qpsmtpd (still entirely compatible with non-async version)


Modified: branches/0.3x/lib/Qpsmtpd.pm
==============================================================================
--- branches/0.3x/lib/Qpsmtpd.pm        (original)
+++ branches/0.3x/lib/Qpsmtpd.pm        Fri Dec  8 11:46:18 2006
@@ -348,7 +348,7 @@
     $self->{_continuation} = [$hook, [EMAIL PROTECTED], @local_hooks];
     return $self->run_continuation();
   }
-  return (0, '');
+  return $self->hook_responder($hook, [0, ''], [EMAIL PROTECTED]);
 }
 
 sub run_continuation {
@@ -423,7 +423,7 @@
   
   my $responder = $hook . '_respond';
   if (my $meth = $self->can($responder)) {
-    return $meth->($self, $code, $msg, @$args);
+    return $meth->($self, $code, $msg, $args);
   }
   return $code, @$msg;
 }

Added: branches/0.3x/lib/Qpsmtpd/PollServer.pm
==============================================================================
--- (empty file)
+++ branches/0.3x/lib/Qpsmtpd/PollServer.pm     Fri Dec  8 11:46:18 2006
@@ -0,0 +1,392 @@
+# $Id: Server.pm,v 1.10 2005/02/14 22:04:48 msergeant Exp $
+
+package Qpsmtpd::PollServer;
+
+use base ('Danga::Client', 'Qpsmtpd::SMTP');
+# use fields required to be a subclass of Danga::Client. Have to include
+# all fields used by Qpsmtpd.pm here too.
+use fields qw(
+    input_sock
+    mode
+    header_lines
+    in_header
+    data_size
+    max_size
+    hooks
+    start_time
+    cmd_timeout
+    _auth_mechanism
+    _auth_state
+    _auth_ticket
+    _auth_user
+    _commands
+    _config_cache
+    _connection
+    _transaction
+    _test_mode
+    _extras
+    _continuation
+);
+use Qpsmtpd::Constants;
+use Qpsmtpd::Address;
+use ParaDNS;
+use Mail::Header;
+use POSIX qw(strftime);
+use Socket qw(inet_aton AF_INET CRLF);
+use Time::HiRes qw(time);
+use strict;
+
+sub max_idle_time { 60 }
+sub max_connect_time { 1200 }
+
+sub input_sock {
+    my $self = shift;
+    @_ and $self->{input_sock} = shift;
+    $self->{input_sock} || $self;
+}
+
+sub new {
+    my Qpsmtpd::PollServer $self = shift;
+    
+    $self = fields::new($self) unless ref $self;
+    $self->SUPER::new( @_ );
+    $self->{cmd_timeout} = 5;
+    $self->{start_time} = time;
+    $self->{mode} = 'connect';
+    $self->load_plugins;
+    $self->load_logging;
+    return $self;
+}
+
+sub uptime {
+    my Qpsmtpd::PollServer $self = shift;
+    
+    return (time() - $self->{start_time});
+}
+
+sub reset_for_next_message {
+    my Qpsmtpd::PollServer $self = shift;
+    $self->SUPER::reset_for_next_message(@_);
+    
+    $self->{_commands} = {
+        ehlo => 1,
+        helo => 1,
+        rset => 1,
+        mail => 1,
+        rcpt => 1,
+        data => 1,
+        help => 1,
+        vrfy => 1,
+        noop => 1,
+        quit => 1,
+        auth => 0, # disabled by default
+    };
+    $self->{mode} = 'cmd';
+    $self->{_extras} = {};
+}
+
+sub respond {
+    my Qpsmtpd::PollServer $self = shift;
+    my ($code, @messages) = @_;
+    while (my $msg = shift @messages) {
+        my $line = $code . (@messages ? "-" : " ") . $msg;
+        $self->write("$line\r\n");
+    }
+    return 1;
+}
+
+sub fault {
+    my Qpsmtpd::PollServer $self = shift;
+    $self->SUPER::fault(@_);
+    return;
+}
+
+sub process_line {
+    my Qpsmtpd::PollServer $self = shift;
+    my $line = shift || return;
+    if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
+    eval { $self->_process_line($line) };
+    if ($@) {
+        print STDERR "Error: [EMAIL PROTECTED]";
+        return $self->fault("command failed unexpectedly") if $self->{mode} eq 
'cmd';
+        return $self->fault("error processing data lines") if $self->{mode} eq 
'data';
+        return $self->fault("unknown error");
+    }
+    return;
+}
+
+sub _process_line {
+    my Qpsmtpd::PollServer $self = shift;
+    my $line = shift;
+    
+    if ($self->{mode} eq 'connect') {
+        $self->{mode} = 'cmd';
+        my $rc = $self->start_conversation;
+        return;
+    }
+    elsif ($self->{mode} eq 'cmd') {
+        $line =~ s/\r?\n//;
+        return $self->process_cmd($line);
+    }
+    elsif ($self->{mode} eq 'data') {
+        return $self->data_line($line);
+    }
+    else {
+        die "Unknown mode";
+    }
+}
+
+sub process_cmd {
+    my Qpsmtpd::PollServer $self = shift;
+    my $line = shift;
+    my ($cmd, @params) = split(/ +/, $line);
+    my $meth = lc($cmd);
+    if (my $lookup = $self->{_commands}->{$meth} && $self->can($meth)) {
+        my $resp = eval {
+            $lookup->($self, @params);
+        };
+        if ($@) {
+            my $error = $@;
+            chomp($error);
+            $self->log(LOGERROR, "Command Error: $error");
+            return $self->fault("command '$cmd' failed unexpectedly");
+        }
+        return $resp;
+    }
+    else {
+        # No such method - i.e. unrecognized command
+        my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, 
@params);
+        return 1;
+    }
+}
+
+sub disconnect {
+    my Qpsmtpd::PollServer $self = shift;
+    $self->SUPER::disconnect(@_);
+    $self->close;
+}
+
+sub start_conversation {
+    my Qpsmtpd::PollServer $self = shift;
+    
+    my $conn = $self->connection;
+    # set remote_host, remote_ip and remote_port
+    my ($ip, $port) = split(':', $self->peer_addr_string);
+    $conn->remote_ip($ip);
+    $conn->remote_port($port);
+    $conn->remote_info("[$ip]");
+    ParaDNS->new(
+        finished   => sub { $self->run_hooks("connect") },
+        # NB: Setting remote_info to the same as remote_host
+        callback   => sub { $conn->remote_info($conn->remote_host($_[0])) },
+        host       => $ip,
+    );
+    
+    return;
+}
+
+sub data {
+    my Qpsmtpd::PollServer $self = shift;
+    
+    my ($rc, $msg) = $self->run_hooks("data");
+    return 1;
+}
+
+sub data_respond {
+    my Qpsmtpd::PollServer $self = shift;
+    my ($rc, $msg) = @_;
+    if ($rc == DONE) {
+        return;
+    }
+    elsif ($rc == DENY) {
+        $self->respond(554, $msg || "Message denied");
+        $self->reset_transaction();
+        return;
+    }
+    elsif ($rc == DENYSOFT) {
+        $self->respond(451, $msg || "Message denied temporarily");
+        $self->reset_transaction();
+        return;
+    } 
+    elsif ($rc == DENY_DISCONNECT) {
+        $self->respond(554, $msg || "Message denied");
+        $self->disconnect;
+        return;
+    }
+    elsif ($rc == DENYSOFT_DISCONNECT) {
+        $self->respond(451, $msg || "Message denied temporarily");
+        $self->disconnect;
+        return;
+    }
+    return $self->respond(503, "MAIL first") unless $self->transaction->sender;
+    return $self->respond(503, "RCPT first") unless 
$self->transaction->recipients;
+    
+    $self->{mode} = 'data';
+    
+    $self->{header_lines} = '';
+    $self->{data_size} = 0;
+    $self->{in_header} = 1;
+    $self->{max_size} = ($self->config('databytes'))[0] || 0;
+    
+    $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: 
$self->{data_size}");
+
+    $self->respond(354, "go ahead");
+
+    my $max_get = $self->{max_size} || 1048576;
+    $self->get_chunks($max_get, sub { $self->got_data($_[0]) });
+    return 1;
+}
+
+sub got_data {
+    my Qpsmtpd::PollServer $self = shift;
+    my $data = shift;
+
+    my $done = 0;
+    my $remainder;
+    if ($data =~ s/^\.\r\n(.*)\z//m) {
+        $remainder = $1;
+        $done = 1;
+    }
+
+    # add a transaction->blocked check back here when we have line by line 
plugin access...
+    unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
+        $data =~ s/\r\n/\n/mg;
+        $data =~ s/^\.\./\./mg;
+        
+        if ($self->{in_header} and $data =~ s/\A(.*?)\n[ \t]*\n//ms) {
+            $self->{header_lines} .= $1;
+            # end of headers
+            $self->{in_header} = 0;
+            
+            # ... need to check that we don't reformat any of the received 
lines.
+            #
+            # 3.8.2 Received Lines in Gatewaying
+            #   When forwarding a message into or out of the Internet 
environment, a
+            #   gateway MUST prepend a Received: line, but it MUST NOT alter 
in any
+            #   way a Received: line that is already in the header.
+            my @header_lines = split(/\n/, $self->{header_lines});
+    
+            my $header = Mail::Header->new([EMAIL PROTECTED],
+                                            Modify => 0, MailFrom => "COERCE");
+            $self->transaction->header($header);
+            $self->{header_lines} = '';
+
+            #$header->add("X-SMTPD", "qpsmtpd/".$self->version.", 
http://smtpd.develooper.com/";);
+    
+            # FIXME - call plugins to work on just the header here; can
+            # save us buffering the mail content.
+        }
+        
+        if ($self->{in_header}) {
+            $self->{header_lines} .= $data;
+        }
+        else {
+            $self->transaction->body_write(\$data);
+        }
+        
+        $self->{data_size} += length $data;
+    }
+ 
+
+    if ($done) {
+        $self->{mode} = 'cmd';
+        $self->end_of_data;
+        $self->end_get_chunks($remainder);
+    }
+
+}
+
+sub data_line {
+    my Qpsmtpd::PollServer $self = shift;
+
+    print "YIKES\n";
+    
+    my $line = shift;
+    
+    if ($line eq ".\r\n") {
+        # add received etc.
+        $self->{mode} = 'cmd';
+        return $self->end_of_data;
+    }
+
+    # Reject messages that have either bare LF or CR. rjkaes noticed a
+    # lot of spam that is malformed in the header.
+    if ($line eq ".\n" or $line eq ".\r") {
+        $self->respond(421, "See http://smtpd.develooper.com/barelf.html";);
+        $self->disconnect;
+        return;
+    }
+    
+    # add a transaction->blocked check back here when we have line by line 
plugin access...
+    unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
+        $line =~ s/\r\n$/\n/;
+        $line =~ s/^\.\./\./;
+        
+        if ($self->{in_header} and $line =~ m/^\s*$/) {
+            # end of headers
+            $self->{in_header} = 0;
+            
+            # ... need to check that we don't reformat any of the received 
lines.
+            #
+            # 3.8.2 Received Lines in Gatewaying
+            #   When forwarding a message into or out of the Internet 
environment, a
+            #   gateway MUST prepend a Received: line, but it MUST NOT alter 
in any
+            #   way a Received: line that is already in the header.
+    
+            my $header = Mail::Header->new($self->{header_lines},
+                                            Modify => 0, MailFrom => "COERCE");
+            $self->transaction->header($header);
+
+            #$header->add("X-SMTPD", "qpsmtpd/".$self->version.", 
http://smtpd.develooper.com/";);
+    
+            # FIXME - call plugins to work on just the header here; can
+            # save us buffering the mail content.
+        }
+
+        if ($self->{in_header}) {
+            push @{ $self->{header_lines} }, $line;
+        }
+        else {
+            $self->transaction->body_write(\$line);
+        }
+        
+        $self->{data_size} += length $line;
+    }
+    
+    return;
+}
+
+sub end_of_data {
+    my Qpsmtpd::PollServer $self = shift;
+    
+    #$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300);
+    
+    $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: 
$self->{data_size}");
+    
+    my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
+    
+    my $header = $self->transaction->header;
+    if (!$header) {
+        $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
+        $self->transaction->header($header);
+    }
+    
+    # only true if client authenticated
+    if ( $self->authenticated == OK ) { 
+        $header->add("X-Qpsmtpd-Auth","True");
+    }
+    
+    $header->add("Received", "from ".$self->connection->remote_info
+                 ." (HELO ".$self->connection->hello_host . ") 
(".$self->connection->remote_ip
+                 . ")\n  by ".$self->config('me')." (qpsmtpd/".$self->version
+                 .") with $smtp; ". (strftime('%a, %d %b %Y %H:%M:%S %z', 
localtime)),
+                  0);
+    
+    return $self->respond(552, "Message too big!") if $self->{max_size} and 
$self->{data_size} > $self->{max_size};
+    
+    my ($rc, $msg) = $self->run_hooks("data_post");
+    return 1;
+}
+
+1;
+

Modified: branches/0.3x/lib/Qpsmtpd/SMTP.pm
==============================================================================
--- branches/0.3x/lib/Qpsmtpd/SMTP.pm   (original)
+++ branches/0.3x/lib/Qpsmtpd/SMTP.pm   Fri Dec  8 11:46:18 2006
@@ -53,21 +53,7 @@
   $self->{_counter}++; 
 
   if ($cmd !~ /^(\w{1,12})$/ or !exists $self->{_commands}->{$1}) {
-    my ($rc, @msg) = $self->run_hooks("unrecognized_command", $cmd, @_);
-    @msg = map { split /\n/ } @msg;
-    if ($rc == DENY_DISCONNECT) {
-      $self->respond(521, @msg);
-      $self->disconnect;
-    }
-    elsif ($rc == DENY) {
-      $self->respond(500, @msg);
-    }
-    elsif ($rc == DONE) {
-      1;
-    }
-    else {
-      $self->respond(500, "Unrecognized command");
-    }
+    $self->run_hooks("unrecognized_command", $cmd, @_);
     return 1
   }
   $cmd = $1;
@@ -82,6 +68,20 @@
   return;
 }
 
+sub unrecognized_command_respond {
+    my ($self, $rc, $msg) = @_;
+    if ($rc == DENY_DISCONNECT) {
+      $self->respond(521, @$msg);
+      $self->disconnect;
+    }
+    elsif ($rc == DENY) {
+      $self->respond(500, @$msg);
+    }
+    elsif ($rc != DONE) {
+      $self->respond(500, "Unrecognized command");
+    }
+}
+
 sub fault {
   my $self = shift;
   my ($msg) = shift || "program fault - command not performed";
@@ -94,19 +94,21 @@
     my $self = shift;
     # this should maybe be called something else than "connect", see
     # lib/Qpsmtpd/TcpServer.pm for more confusion.
-    my ($rc, @msg) = $self->run_hooks("connect");
+    $self->run_hooks("connect");
+    return DONE;
+}
+
+sub connect_respond {
+    my ($self, $rc, $msg) = @_;
     if ($rc == DENY) {
-      $msg[0] ||= 'Connection from you denied, bye bye.';
-      $self->respond(550, @msg);
-      return $rc;
+      $msg->[0] ||= 'Connection from you denied, bye bye.';
+      $self->respond(550, @$msg);
+      $self->disconnect;
     }
     elsif ($rc == DENYSOFT) {
-      $msg[0] ||= 'Connection from you temporarily denied, bye bye.';
-      $self->respond(450, @msg);
-      return $rc;
-    }
-    elsif ($rc == DONE) {
-      return $rc;
+      $msg->[0] ||= 'Connection from you temporarily denied, bye bye.';
+      $self->respond(450, @$msg);
+      $self->disconnect;
     }
     elsif ($rc != DONE) {
       my $greets = $self->config('smtpgreeting');
@@ -121,7 +123,6 @@
       }
 
       $self->respond(220, $greets);
-      return DONE;
     }
 }
 
@@ -154,20 +155,26 @@
   my $conn = $self->connection;
   return $self->respond (503, "but you already said HELO ...") if $conn->hello;
 
-  ($rc, @msg) = $self->run_hooks("helo", $hello_host, @stuff);
+  $self->run_hooks("helo", $hello_host, @stuff);
+}
+
+sub helo_respond {
+  my ($self, $rc, $msg, $args) = @_;
+  my ($hello_host) = @$args;
   if ($rc == DONE) {
     # do nothing
   } elsif ($rc == DENY) {
-    $self->respond(550, @msg);
+    $self->respond(550, @$msg);
   } elsif ($rc == DENYSOFT) {
-    $self->respond(450, @msg);
+    $self->respond(450, @$msg);
   } elsif ($rc == DENY_DISCONNECT) {
-      $self->respond(550, @msg);
+      $self->respond(550, @$msg);
       $self->disconnect;
   } elsif ($rc == DENYSOFT_DISCONNECT) {
-      $self->respond(450, @msg);
+      $self->respond(450, @$msg);
       $self->disconnect;
   } else {
+    my $conn = $self->connection;
     $conn->hello("helo");
     $conn->hello_host($hello_host);
     $self->transaction;
@@ -184,20 +191,26 @@
   my $conn = $self->connection;
   return $self->respond (503, "but you already said HELO ...") if $conn->hello;
 
-  ($rc, @msg) = $self->run_hooks("ehlo", $hello_host, @stuff);
+  $self->run_hooks("ehlo", $hello_host, @stuff);
+}
+
+sub ehlo_respond {
+  my ($self, $rc, $msg, $args) = @_;
+  my ($hello_host) = @$args;
   if ($rc == DONE) {
     # do nothing
   } elsif ($rc == DENY) {
-    $self->respond(550, @msg);
+    $self->respond(550, @$msg);
   } elsif ($rc == DENYSOFT) {
-    $self->respond(450, @msg);
+    $self->respond(450, @$msg);
   } elsif ($rc == DENY_DISCONNECT) {
-      $self->respond(550, @msg);
+      $self->respond(550, @$msg);
       $self->disconnect;
   } elsif ($rc == DENYSOFT_DISCONNECT) {
-      $self->respond(450, @msg);
+      $self->respond(450, @$msg);
       $self->disconnect;
   } else {
+    my $conn = $self->connection;
     $conn->hello("ehlo");
     $conn->hello_host($hello_host);
     $self->transaction;
@@ -238,8 +251,14 @@
 
 sub auth {
     my ($self, $line) = @_;
-    my ($rc, $sub)    = $self->run_hooks('auth_parse');
-    my ($ok, $mechanism, @stuff) = Qpsmtpd::Command->parse('auth', $line, 
$sub);
+    $self->run_hooks('auth_parse', $line);
+}
+
+sub auth_parse_respond {
+    my ($self, $rc, $msg, $args) = @_;
+    my ($line) = @$args;
+
+    my ($ok, $mechanism, @stuff) = Qpsmtpd::Command->parse('auth', $line, 
$msg->[0]);
     return $self->respond(501, $mechanism || "Syntax error in command") 
       unless ($ok == OK);
 
@@ -293,8 +312,14 @@
   }
   else {
     $self->log(LOGINFO, "full from_parameter: $line");
-    my ($rc, @msg) = $self->run_hooks("mail_parse");
-    my ($ok, $from, @params) = Qpsmtpd::Command->parse('mail', $line, $msg[0]);
+    $self->run_hooks("mail_parse", $line);
+  }
+}
+
+sub mail_parse_respond {
+    my ($self, $rc, $msg, $args) = @_;
+    my ($line) = @$args;
+    my ($ok, $from, @params) = Qpsmtpd::Command->parse('mail', $line, 
$msg->[0]);
     return $self->respond(501, $from || "Syntax error in command") 
       unless ($ok == OK); 
     my %param;
@@ -307,9 +332,14 @@
     #   return (OK, "<$from>"); 
     # (...or anything else parseable by Qpsmtpd::Address ;-))
     # see also comment in sub rcpt()
-    ($rc, @msg) = $self->run_hooks("mail_pre", $from);
+    $self->run_hooks("mail_pre", $from, \%param);
+}
+
+sub mail_pre_respond {
+    my ($self, $rc, $msg, $args) = @_;
+    my ($from, $param) = @$args;
     if ($rc == OK) {
-      $from = shift @msg;
+      $from = shift @$msg;
     }
 
     $self->log(LOGALERT, "from email address : [$from]");
@@ -324,30 +354,35 @@
     }
     return $self->respond(501, "could not parse your mail from command") 
unless $from;
 
-    ($rc, @msg) = $self->run_hooks("mail", $from, %param);
+    $self->run_hooks("mail", $from, %$param);
+}
+
+sub mail_respond {
+    my ($self, $rc, $msg, $args) = @_;
+    my ($from, $param) = @$args;
     if ($rc == DONE) {
       return 1;
     }
     elsif ($rc == DENY) {
-      $msg[0] ||= $from->format . ', denied';
-      $self->log(LOGINFO, "deny mail from " . $from->format . " (@msg)");
-      $self->respond(550, @msg);
+      $msg->[0] ||= $from->format . ', denied';
+      $self->log(LOGINFO, "deny mail from " . $from->format . " (@$msg)");
+      $self->respond(550, @$msg);
     }
     elsif ($rc == DENYSOFT) {
-      $msg[0] ||= $from->format . ', temporarily denied';
-      $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@msg)");
-      $self->respond(450, @msg);
+      $msg->[0] ||= $from->format . ', temporarily denied';
+      $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@$msg)");
+      $self->respond(450, @$msg);
     }
     elsif ($rc == DENY_DISCONNECT) {
-      $msg[0] ||= $from->format . ', denied';
-      $self->log(LOGINFO, "deny mail from " . $from->format . " (@msg)");
-      $self->respond(550, @msg);
+      $msg->[0] ||= $from->format . ', denied';
+      $self->log(LOGINFO, "deny mail from " . $from->format . " (@$msg)");
+      $self->respond(550, @$msg);
       $self->disconnect;
     }
     elsif ($rc == DENYSOFT_DISCONNECT) {
-      $msg[0] ||= $from->format . ', temporarily denied';
-      $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@msg)");
-      $self->respond(421, @msg);
+      $msg->[0] ||= $from->format . ', temporarily denied';
+      $self->log(LOGINFO, "denysoft mail from " . $from->format . " (@$msg)");
+      $self->respond(421, @$msg);
       $self->disconnect;
     }
     else { # includes OK
@@ -355,13 +390,17 @@
       $self->respond(250, $from->format . ", sender OK - how exciting to get 
mail from you!");
       $self->transaction->sender($from);
     }
-  }
 }
 
 sub rcpt {
   my ($self, $line) = @_;
-  my ($rc, @msg)    = $self->run_hooks("rcpt_parse");
-  my ($ok, $rcpt, @param) = Qpsmtpd::Command->parse("rcpt", $line, $msg[0]);
+  $self->run_hooks("rcpt_parse", $line);
+}
+
+sub rcpt_parse_respond {
+  my ($self, $rc, $msg, $args) = @_;
+  my ($line) = @$args;
+  my ($ok, $rcpt, @param) = Qpsmtpd::Command->parse("rcpt", $line, $msg->[0]);
   return $self->respond(501, $rcpt || "Syntax error in command")
     unless ($ok == OK);
   return $self->respond(503, "Use MAIL before RCPT") unless 
$self->transaction->sender;
@@ -378,9 +417,14 @@
   # this means, a plugin can decide to (pre-)accept
   # addresses like <[EMAIL PROTECTED]> or <[EMAIL PROTECTED] >
   # by removing the trailing "."/" " from this example...
-  ($rc, @msg) = $self->run_hooks("rcpt_pre", $rcpt);
+  $self->run_hooks("rcpt_pre", $rcpt, \%param);
+}
+
+sub rcpt_pre_respond {
+  my ($self, $rc, $msg, $args) = @_;
+  my ($rcpt, $param) = @$args;
   if ($rc == OK) {
-    $rcpt = shift @msg;
+    $rcpt = shift @$msg;
   }
   $self->log(LOGALERT, "to email address : [$rcpt]");
   return $self->respond(501, "could not parse recipient") 
@@ -391,28 +435,33 @@
   return $self->respond(501, "could not parse recipient") 
     if (!$rcpt or ($rcpt->format eq '<>'));
 
-  ($rc, @msg) = $self->run_hooks("rcpt", $rcpt, %param);
+  $self->run_hooks("rcpt", $rcpt, %$param);
+}
+
+sub rcpt_respond {
+  my ($self, $rc, $msg, $args) = @_;
+  my ($rcpt, $param) = @$args;
   if ($rc == DONE) {
     return 1;
   }
   elsif ($rc == DENY) {
-    $msg[0] ||= 'relaying denied';
-    $self->respond(550, @msg);
+    $msg->[0] ||= 'relaying denied';
+    $self->respond(550, @$msg);
   }
   elsif ($rc == DENYSOFT) {
-    $msg[0] ||= 'relaying denied';
-    return $self->respond(450, @msg);
+    $msg->[0] ||= 'relaying denied';
+    return $self->respond(450, @$msg);
   }
   elsif ($rc == DENY_DISCONNECT) {
-      $msg[0] ||= 'delivery denied';
-      $self->log(LOGINFO, "delivery denied (@msg)");
-      $self->respond(550, @msg);
+      $msg->[0] ||= 'delivery denied';
+      $self->log(LOGINFO, "delivery denied (@$msg)");
+      $self->respond(550, @$msg);
       $self->disconnect;
   }
   elsif ($rc == DENYSOFT_DISCONNECT) {
-    $msg[0] ||= 'relaying denied';
-    $self->log(LOGINFO, "delivery denied (@msg)");
-    $self->respond(421, @msg);
+    $msg->[0] ||= 'relaying denied';
+    $self->log(LOGINFO, "delivery denied (@$msg)");
+    $self->respond(421, @$msg);
     $self->disconnect;
   }
   elsif ($rc == OK) {
@@ -425,8 +474,6 @@
   return 0;
 }
 
-
-
 sub help {
   my $self = shift;
   $self->respond(214, 
@@ -448,19 +495,23 @@
   # documented in RFC2821#3.5.1
   # I also don't think it provides all the proper result codes.
 
-  my ($rc, @msg) = $self->run_hooks("vrfy");
+  $self->run_hooks("vrfy");
+}
+
+sub vrfy_respond {
+  my ($self, $rc, $msg, $args) = @_;
   if ($rc == DONE) {
     return 1;
   }
   elsif ($rc == DENY) {
-    $msg[0] ||= "Access Denied";
-    $self->respond(554, @msg);
+    $msg->[0] ||= "Access Denied";
+    $self->respond(554, @$msg);
     $self->reset_transaction();
     return 1;
   }
   elsif ($rc == OK) {
-    $msg[0] ||= "User OK";
-    $self->respond(250, @msg);
+    $msg->[0] ||= "User OK";
+    $self->respond(250, @$msg);
     return 1;
   }
   else { # $rc == DECLINED or anything else
@@ -477,10 +528,14 @@
 
 sub quit {
   my $self = shift;
-  my ($rc, @msg) = $self->run_hooks("quit");
+  $self->run_hooks("quit");
+}
+
+sub quit_respond {
+  my ($self, $rc, $msg, $args) = @_;
   if ($rc != DONE) {
-    $msg[0] ||= $self->config('me') . " closing connection. Have a wonderful 
day.";
-    $self->respond(221, @msg);
+    $msg->[0] ||= $self->config('me') . " closing connection. Have a wonderful 
day.";
+    $self->respond(221, @$msg);
   }
   $self->disconnect();
 }
@@ -493,31 +548,35 @@
 
 sub data {
   my $self = shift;
-  my ($rc, @msg) = $self->run_hooks("data");
+  $self->run_hooks("data");
+}
+
+sub data_respond {
+  my ($self, $rc, $msg, $args) = @_;
   if ($rc == DONE) {
     return 1;
   }
   elsif ($rc == DENY) {
-    $msg[0] ||= "Message denied";
-    $self->respond(554, @msg);
+    $msg->[0] ||= "Message denied";
+    $self->respond(554, @$msg);
     $self->reset_transaction();
     return 1;
   }
   elsif ($rc == DENYSOFT) {
-    $msg[0] ||= "Message denied temporarily";
-    $self->respond(451, @msg);
+    $msg->[0] ||= "Message denied temporarily";
+    $self->respond(451, @$msg);
     $self->reset_transaction();
     return 1;
   } 
   elsif ($rc == DENY_DISCONNECT) {
-    $msg[0] ||= "Message denied";
-    $self->respond(554, @msg);
+    $msg->[0] ||= "Message denied";
+    $self->respond(554, @$msg);
     $self->disconnect;
     return 1;
   }
   elsif ($rc == DENYSOFT_DISCONNECT) {
-    $msg[0] ||= "Message denied temporarily";
-    $self->respond(421, @msg);
+    $msg->[0] ||= "Message denied temporarily";
+    $self->respond(421, @$msg);
     $self->disconnect;
     return 1;
   }
@@ -624,17 +683,21 @@
       return 1;
   }
 
-  ($rc, @msg) = $self->run_hooks("data_post");
+  $self->run_hooks("data_post");
+}
+
+sub data_post_respond {
+  my ($self, $rc, $msg, $args) = @_;
   if ($rc == DONE) {
     return 1;
   }
   elsif ($rc == DENY) {
-    $msg[0] ||= "Message denied";
-    $self->respond(552, @msg);
+    $msg->[0] ||= "Message denied";
+    $self->respond(552, @$msg);
   }
   elsif ($rc == DENYSOFT) {
-    $msg[0] ||= "Message denied temporarily";
-    $self->respond(452, @msg);
+    $msg->[0] ||= "Message denied temporarily";
+    $self->respond(452, @$msg);
   } 
   else {
     $self->queue($self->transaction);
@@ -658,7 +721,11 @@
   my ($self, $transaction) = @_;
 
   # First fire any queue_pre hooks
-  my ($rc, @msg) = $self->run_hooks("queue_pre");
+  $self->run_hooks("queue_pre");
+}
+
+sub queue_pre_respond {
+  my ($self, $rc, $msg, $args) = @_;
   if ($rc == DONE) {
     return 1;
   }
@@ -668,30 +735,38 @@
   }
 
   # If we got this far, run the queue hooks
-  ($rc, @msg) = $self->run_hooks("queue");
+  $self->run_hooks("queue");
+}
+
+sub queue_respond {
+  my ($self, $rc, $msg, $args) = @_;
   if ($rc == DONE) {
     return 1;
   }
   elsif ($rc == OK) {
-    $msg[0] ||= 'Queued';
-    $self->respond(250, @msg);
+    $msg->[0] ||= 'Queued';
+    $self->respond(250, @$msg);
   }
   elsif ($rc == DENY) {
-    $msg[0] ||= 'Message denied';
-    $self->respond(552, @msg);
+    $msg->[0] ||= 'Message denied';
+    $self->respond(552, @$msg);
   }
   elsif ($rc == DENYSOFT) {
-    $msg[0] ||= 'Message denied temporarily';
-    $self->respond(452, @msg);
+    $msg->[0] ||= 'Message denied temporarily';
+    $self->respond(452, @$msg);
   } 
   else {
-    $msg[0] ||= 'Queuing declined or disabled; try again later';
-    $self->respond(451, @msg);
+    $msg->[0] ||= 'Queuing declined or disabled; try again later';
+    $self->respond(451, @$msg);
   }
   
   # And finally run any queue_post hooks
-  ($rc, @msg) = $self->run_hooks("queue_post");
-  $self->log(LOGERROR, @msg) unless ($rc == OK or $rc == 0);
+  $self->run_hooks("queue_post");
+}
+
+sub queue_post_respond {
+  my ($self, $rc, $msg, $args) = @_;
+  $self->log(LOGERROR, @$msg) unless ($rc == OK or $rc == 0);
 }
 
 

Added: branches/0.3x/qpsmtpd-async
==============================================================================
--- (empty file)
+++ branches/0.3x/qpsmtpd-async Fri Dec  8 11:46:18 2006
@@ -0,0 +1,311 @@
+#!/usr/bin/perl
+
+use lib "./lib";
+BEGIN {
+    delete $ENV{ENV};
+    delete $ENV{BASH_ENV};
+    $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
+}
+
+use strict;
+use vars qw($DEBUG);
+use FindBin qw();
+# TODO: need to make this taint friendly
+use lib "$FindBin::Bin/lib";
+use Danga::Socket;
+use Danga::Client;
+use Qpsmtpd::PollServer;
+use Qpsmtpd::ConfigServer;
+use Qpsmtpd::Constants;
+use IO::Socket;
+use Carp;
+use POSIX qw(WNOHANG);
+use Getopt::Long;
+
+$|++;
+
+use Socket qw(SOMAXCONN IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);
+
+$SIG{'PIPE'} = "IGNORE";  # handled manually
+
+$DEBUG          = 0;
+
+my $CONFIG_PORT      = 20025;
+my $CONFIG_LOCALADDR = '127.0.0.1';
+
+my $PORT        = 2525;
+my $LOCALADDR   = '0.0.0.0';
+my $PROCS       = 1;
+my $USER        = 'smtpd';      # user to suid to
+my $PAUSED      = 0;
+my $NUMACCEPT   = 20;
+my $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
+
+# make sure we don't spend forever doing accept()
+use constant ACCEPT_MAX => 1000;
+
+sub reset_num_accept {
+    $NUMACCEPT = 20;
+}
+
+sub help {
+    print <<EOT;
+Usage:
+    qpsmtpd [OPTIONS]
+
+Options:
+ -l, --listen-address addr : listen on a specific address; default 0.0.0.0
+ -p, --port P              : listen on a specific port; default 2525
+ -u, --user U              : run as a particular user; defualt 'smtpd'
+ -j, --procs J             : spawn J processes; default 1
+ -h, --help                : this page
+     --use-poll            : force use of poll() instead of epoll()/kqueue()
+EOT
+    exit(0);
+}
+
+GetOptions(
+    'p|port=i'              => \$PORT,
+    'l|listen-address=s'    => \$LOCALADDR,
+    'j|procs=i'             => \$PROCS,
+    'd|debug+'              => \$DEBUG,
+    'u|user=s'              => \$USER,
+    'h|help'                => \&help,
+) || help();
+
+# detaint the commandline
+if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
+if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
+if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
+if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }
+
+sub force_poll {
+    $Danga::Socket::HaveEpoll = 0;
+    $Danga::Socket::HaveKQueue = 0;
+}
+
+my $POLL = "with " . ($Danga::Socket::HaveEpoll ? "epoll()" : 
+                    $Danga::Socket::HaveKQueue ? "kqueue()" : "poll()");
+
+my $SERVER;
+my $CONFIG_SERVER;
+
+my %childstatus = ();
+
+run_as_server();
+exit(0);
+
+sub _fork {
+    my $pid = fork;
+    if (!defined($pid)) { die "Cannot fork: $!" }
+    return $pid if $pid;
+
+    # Fixup Net::DNS randomness after fork
+    srand($$ ^ time);
+    
+    local $^W;
+    delete $INC{'Net/DNS/Header.pm'};
+    require Net::DNS::Header;
+    
+    # cope with different versions of Net::DNS
+    eval {
+        $Net::DNS::Resolver::global{id} = 1;
+        $Net::DNS::Resolver::global{id} = 
int(rand(Net::DNS::Resolver::MAX_ID()));
+        # print "Next DNS ID: $Net::DNS::Resolver::global{id}\n";
+    };
+    if ($@) {
+        # print "Next DNS ID: " . Net::DNS::Header::nextid() . "\n";
+    }
+    
+    # Fixup lost kqueue after fork
+    $Danga::Socket::HaveKQueue = undef;
+}
+
+sub spawn_child {
+    my $plugin_loader = shift || Qpsmtpd::SMTP->new;
+    if (my $pid = _fork) {
+        return $pid;
+    }
+
+    $SIG{HUP} = $SIG{CHLD} = $SIG{INT} = $SIG{TERM} = 'DEFAULT';
+    $SIG{PIPE} = 'IGNORE';
+
+    Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler);
+
+    $plugin_loader->run_hooks('post-fork');
+
+    Qpsmtpd::PollServer->EventLoop();
+    exit;
+}
+
+sub sig_chld {
+    my $spawn_count = 0;
+    while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
+        if (!defined $childstatus{$child}) {
+            next;
+        }
+
+        last unless $child > 0;
+        print "SIGCHLD: child $child died\n";
+        delete $childstatus{$child};
+        $spawn_count++;
+    }
+    if ($spawn_count) {
+        for (1..$spawn_count) {
+            # restart a new child if in poll server mode
+            my $pid = spawn_child();
+            $childstatus{$pid} = 1;
+        }
+    }
+    $SIG{CHLD} = \&sig_chld;
+}
+
+sub HUNTSMAN {
+    $SIG{CHLD} = 'DEFAULT';
+    kill 'INT' => keys %childstatus;
+    exit(0);
+}
+
+sub run_as_server {
+    # establish SERVER socket, bind and listen.
+    $SERVER = IO::Socket::INET->new(LocalPort => $PORT,
+                                    LocalAddr => $LOCALADDR,
+                                    Type      => SOCK_STREAM,
+                                    Proto     => IPPROTO_TCP,
+                                    Blocking  => 0,
+                                    Reuse     => 1,
+                                    Listen    => SOMAXCONN )
+               or die "Error creating server $LOCALADDR:$PORT : [EMAIL 
PROTECTED]";
+
+    IO::Handle::blocking($SERVER, 0);
+    binmode($SERVER, ':raw');
+    
+    $CONFIG_SERVER = IO::Socket::INET->new(LocalPort => $CONFIG_PORT,
+                                            LocalAddr => $CONFIG_LOCALADDR,
+                                            Type      => SOCK_STREAM,
+                                            Proto     => IPPROTO_TCP,
+                                            Blocking  => 0,
+                                            Reuse     => 1,
+                                            Listen    => 1 )
+               or die "Error creating server $CONFIG_LOCALADDR:$CONFIG_PORT : 
[EMAIL PROTECTED]";
+    
+    IO::Handle::blocking($CONFIG_SERVER, 0);
+    binmode($CONFIG_SERVER, ':raw');
+
+    # Drop priviledges
+    my (undef, undef, $quid, $qgid) = getpwnam $USER or
+          die "unable to determine uid/gid for $USER\n";
+    $) = "";
+    POSIX::setgid($qgid) or
+          die "unable to change gid: $!\n";
+    POSIX::setuid($quid) or
+          die "unable to change uid: $!\n";
+    $> = $quid;
+    
+    # Load plugins here
+    my $plugin_loader = Qpsmtpd::SMTP->new();
+    $plugin_loader->load_plugins;
+    
+    $plugin_loader->log(LOGINFO, 'Running as user '.
+        (getpwuid($>) || $>) .
+        ', group '.
+        (getgrgid($)) || $)));
+
+    $SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
+
+    if ($PROCS > 1) {
+        for (1..$PROCS) {
+            my $pid = spawn_child($plugin_loader);
+            $childstatus{$pid} = 1;
+        }
+        $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children 
$POLL");
+        $SIG{'CHLD'} = \&sig_chld;
+        sleep while (1);
+    }
+    else {
+        $plugin_loader->log(LOGDEBUG, "Listening on $PORT with single process 
$POLL");
+        Qpsmtpd::PollServer->OtherFds(fileno($SERVER) => \&accept_handler,
+                                      fileno($CONFIG_SERVER) => 
\&config_handler,
+                                      );
+        $plugin_loader->run_hooks('post-fork');
+        while (1) {
+            Qpsmtpd::PollServer->EventLoop();
+        }
+        exit;
+    }
+
+}
+
+sub config_handler {
+    my $csock = $CONFIG_SERVER->accept();
+    if (!$csock) {
+        # warn("accept failed on config server: $!");
+        return;
+    }
+    binmode($csock, ':raw');
+    
+    printf("Config server connection\n") if $DEBUG;
+    
+    IO::Handle::blocking($csock, 0);
+    setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
+    
+    my $client = Qpsmtpd::ConfigServer->new($csock);
+    $client->watch_read(1);
+    return;
+}
+
+# Accept all new connections
+sub accept_handler {
+    for (1 .. $NUMACCEPT) {
+        return unless _accept_handler();
+    }
+    
+    # got here because we have accept's left.
+    # So double the number we accept next time.
+    $NUMACCEPT *= 2;
+    $NUMACCEPT = ACCEPT_MAX if $NUMACCEPT > ACCEPT_MAX;
+    $ACCEPT_RSET->cancel;
+    $ACCEPT_RSET = Danga::Socket->AddTimer(30, \&reset_num_accept);
+}
+
+use Errno qw(EAGAIN EWOULDBLOCK);
+
+sub _accept_handler {
+    my $csock = $SERVER->accept();
+    if (!$csock) {
+        # warn("accept() failed: $!");
+        return;
+    }
+    binmode($csock, ':raw');
+
+    printf("Listen child making a Qpsmtpd::PollServer for %d.\n", 
fileno($csock))
+        if $DEBUG;
+
+    IO::Handle::blocking($csock, 0);
+    #setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
+
+    my $client = Qpsmtpd::PollServer->new($csock);
+    
+    if ($PAUSED) {
+        $client->write("451 Sorry, this server is currently paused\r\n");
+        $client->close;
+        return 1;
+    }
+    
+    $client->push_back_read("Connect\n");
+    $client->watch_read(1);
+    return 1;
+}
+
+########################################################################
+
+sub log {
+  my ($level,$message) = @_;
+  # $level not used yet.  this is reimplemented from elsewhere anyway
+  warn("$$ fd:? $message\n");
+}
+
+sub pause {
+  my ($pause) = @_;
+  $PAUSED = $pause;
+}

Reply via email to