Author: vetinari
Date: Tue Jun 10 10:47:42 2008
New Revision: 926

Added:
   contrib/vetinari/thread/
   contrib/vetinari/thread/lib/
   contrib/vetinari/thread/lib/Qpsmtpd/
   contrib/vetinari/thread/lib/Qpsmtpd/SMTP/
   contrib/vetinari/thread/lib/Qpsmtpd/SMTP/Threaded.pm
   contrib/vetinari/thread/lib/Qpsmtpd/Threaded.pm
   contrib/vetinari/thread/qpsmtpd-threaded   (contents, props changed)

Log:
qpsmtpd-threaded - new flavour with perl threads -- experimental

Added: contrib/vetinari/thread/lib/Qpsmtpd/SMTP/Threaded.pm
==============================================================================
--- (empty file)
+++ contrib/vetinari/thread/lib/Qpsmtpd/SMTP/Threaded.pm        Tue Jun 10 
10:47:42 2008
@@ -0,0 +1,9 @@
+package Qpsmtpd::SMTP::Threaded;
+use Qpsmtpd::SMTP;
+use Qpsmtpd::Constants;
+
[EMAIL PROTECTED] = qw(Qpsmtpd::SMTP);
+
+sub getline { shift->_socket->getline; }
+
+1;

Added: contrib/vetinari/thread/lib/Qpsmtpd/Threaded.pm
==============================================================================
--- (empty file)
+++ contrib/vetinari/thread/lib/Qpsmtpd/Threaded.pm     Tue Jun 10 10:47:42 2008
@@ -0,0 +1,85 @@
+
+package Qpsmtpd::Threaded;
+
+use Socket;
+use Qpsmtpd::Constants;
+use base qw(Qpsmtpd::SMTP::Threaded Qpsmtpd::TcpServer);
+
+sub thread_init {
+   my ($self, $sock) = @_;
+   $self->{_socket} = $sock;
+   $self->{_select} = IO::Select->new();
+   $self->{_select}->add($sock);
+}
+
+sub _select { shift->{_select}; }
+sub _socket { shift->{_socket}; }
+
+sub start_connection {
+    my $self = shift;
+
+    $self->{_connection} = Qpsmtpd::Connection->new(); #reset connection
+    $self->reset_transaction;
+
+    my $cl_sockaddr    = $self->{_socket}->peername;
+    my ($port, $iaddr) = sockaddr_in($cl_sockaddr);
+    my $remote_ip      = inet_ntoa($iaddr);
+    my $remote_host    = gethostbyaddr($iaddr, AF_INET) || "[$remote_ip]";
+    my $remote_info    = $remote_host;
+    $remote_host       =~ tr/a-zA-Z\.\-0-9//cd;
+    $self->SUPER::connection->start(
+            remote_info => $remote_info,
+            remote_ip   => $remote_ip,
+            remote_host => $remote_host,
+            @_,
+        );
+}
+
+sub read_input {
+    my $self = shift;
+
+    my $timeout =
+        $self->config('timeoutsmtpd')   # qmail smtpd control file
+          || $self->config('timeout')   # qpsmtpd control file
+          || 1200;                      # default value
+    
+    while (1) {
+        die "threaded_disconnect" unless $self->_socket->connected;
+        $self->_select->can_read($timeout)
+          or $self->respond(500, "Time's over..."),
+             $self->disconnect;
+        $_ = $self->_socket->getline;
+        die "threaded_disconnect" unless defined $_;
+        $_ =~ s/\r?\n$//s;
+        $self->log(LOGINFO, "dispatching $_");
+        $self->connection->notes('original_string', $_);
+        defined $self->dispatch(split / +/, $_, 2)
+           or $self->respond(502, "command unrecognized: '$_'");
+    }
+}
+
+sub respond {
+    my ($self, $code, @messages) = @_;
+    my $buf = '';
+    while (my $msg = shift @messages) {
+        my $line = $code . (@messages?"-":" ").$msg;
+        $self->log(LOGINFO, $line);
+        $buf .= "$line\r\n";
+    }
+    $self->_socket->print($buf)
+      or ($self->log(LOGERROR, "Could not print [$buf]: $!"), return 0);
+    return 1;
+}
+
+sub disconnect {
+    my $self = shift;
+    $self->log(LOGINFO,"click, disconnecting");
+    $self->SUPER::disconnect(@_);
+    # $self->run_hooks("post-connection");
+    # $self->connection->reset;
+    $self->_socket->shutdown(2);
+}
+
+1;
+
+# vim: ts=4 sw=4 expandtab syn=perl

Added: contrib/vetinari/thread/qpsmtpd-threaded
==============================================================================
--- (empty file)
+++ contrib/vetinari/thread/qpsmtpd-threaded    Tue Jun 10 10:47:42 2008
@@ -0,0 +1,303 @@
+#!/usr/bin/perl -wT
+#
+#
+# TODO: 
+# * use vars instead of calling $client->peer*, $client->sock* so many times :)
+# WARNING: experimental 
+use strict;
+use threads;
+use threads::shared;
+
+use IO::Socket;
+use Socket;
+use POSIX;
+
+use lib 'lib';
+use Qpsmtpd::TcpServer;
+use Qpsmtpd::Threaded;
+use Qpsmtpd::Constants;
+use Getopt::Long;
+
+# secure shell
+$ENV{'PATH'} = '/bin:/usr/bin';
+delete @ENV{qw(IFS CDPATH ENV BASH_ENV)};
+
+my %workers = (); # client's list 
+my @shared  = (); # shared[$slot] = fd / 0 / undef (client / idle / shutdown)
+my @sockets = (); # just for cleaning up... 
+my @threads = (); # keep thread objects here
+
+my $PORT         = 2525;
+my $MAX_CONN     = 25;
+my $MAX_CONN_IP  = 8;
+my $BIND_ADDR    = '0.0.0.0';
+my $THREAD_CONNS = 100; # max times a thread will be reused, same as -prefork
+my $USER;
+my $PID_FILE;
+my $DETACH;
+
+sub usage {
+    print <<"EOT";
+Usage: qpsmtpd-threaded [ options ]
+    --interface addr  : Interface daemon should listen on (default: $BIND_ADDR)
+    --port int        : TCP port daemon should listen on (default: $PORT)
+    --max-from-ip int : Limit number of connections from single IP (default: 
$MAX_CONN_IP, 0 to disable)
+    --threads int     : Number of threads that will be created (default: 
$MAX_CONN)
+    --user username   : User the daemon should run as
+    --pid-file path   : Path to pid file
+    --detach          : detach from controlling terminal (daemonize)
+    --help            : This message
+EOT
+    exit 0;
+}
+
+GetOptions(
+    'interface=s'   => \$BIND_ADDR,
+    'port|p=i'      => \$PORT,
+    'threads|t=i'   => \$MAX_CONN,
+    'max-from-ip=i' => \$MAX_CONN_IP,
+    'user|u=s'      => \$USER,
+    'pid-file'      => \$PID_FILE,
+    'detach'        => \$DETACH,
+    'help'          => sub { &usage },
+) or &usage;
+
+if ($USER and $USER =~ /^([\w\-]+)$/) { $USER = $1 } else { print "USER\n"; 
&usage }
+if ($BIND_ADDR =~ /^([\d\.\:a-f]+)$/i) { $BIND_ADDR = $1 } else { print 
"ADDR\n"; &usage }
+$MAX_CONN_IP = $MAX_CONN unless $MAX_CONN_IP;
+++$MAX_CONN_IP; # plugins/hosts_allow bug...
+
+if ($PID_FILE) {
+    if ($PID_FILE =~ m#^(/[\w\d/\-.]+)$#) { $PID_FILE = $1 } else { &usage }
+    if (-e $PID_FILE) {
+        open PID, "+<$PID_FILE"
+          or die "open pid file: $!\n";
+        my $running = <PID> || '';
+        if ($running =~ /^(\d+)$/) {
+            $running = $1;
+            die "Found an already running qpsmtpd with pid $running.\n"
+              if (kill 0, $running);
+        }
+        seek PID, 0, 0
+          or die "Could not seek back to beginning of $PID_FILE: $!\n";
+        truncate PID, 0
+          or die "Could not truncate $PID_FILE at 0: $!";
+    }
+    else {
+        open PID, ">$PID_FILE"
+          or die "open pid_file: $!\n";
+    }
+}
+
+my $socket =  IO::Socket::INET->new(
+                                    LocalPort => $PORT,
+                                    LocalAddr => $BIND_ADDR,
+                                    Proto     => 'tcp',
+                                    Listen    => SOMAXCONN,
+                                    Reuse     => 1,
+                                   );
+die "FATAL: failed to open socket: $!\n" unless $socket;
+if ($DETACH) {
+    open STDIN, '/dev/null' or die "/dev/null: $!";
+    open STDOUT, '>/dev/null' or die "/dev/null: $!";
+    open STDERR, '>&STDOUT' or die "open(stderr): $!";
+    defined (my $pid = fork) or die "fork: $!";
+    exit 0 if $pid;
+    POSIX::setsid or die "setsid: $!";
+}
+if ($PID_FILE) {
+    print PID $$,"\n";
+    close PID;
+}
+if ($USER) {
+    my ($quid, $qgid, $groups);
+    (undef, undef, $quid, $qgid) = getpwnam $USER
+      or die "unable to determine uid/gid for $USER\n";
+    $groups = "$qgid $qgid";
+    while (my ($name,$passwd,$gid,$members) = getgrent()) {
+        my @m = split(/ /, $members);
+        if (grep {$_ eq $USER} @m) {
+            $groups .= " $gid";
+        }
+    }
+    endgrent;
+    # change UUID/UGID
+    $) = $groups;
+    POSIX::setgid($qgid) or die "unable to change gid: $!\n";
+    POSIX::setuid($quid) or die "unable to change uid: $!\n";
+    $> = $quid;
+    die "FATAL: failed to setuid to user: $USER, uid: $quid\n"
+      if ($> != $quid and $> != ($quid - 2**32));
+}
+
+# ... and now for the real stuff ;-)
+my $qpsmtpd = Qpsmtpd::Threaded->new();
+$qpsmtpd->load_plugins;
+$qpsmtpd->spool_dir;
+$qpsmtpd->size_threshold;
+
+&share(\%workers); # used by pre-connection hook
+
+$qpsmtpd->log(LOGINFO, "Please be patient, creating $MAX_CONN threads...");
+for (my $slot = 0; $slot < $MAX_CONN; $slot++) {
+    $shared[$slot] = 0; # 0 = idle thread
+    &share(\$shared[$slot]);
+    $threads[$slot] = threads->create("run_client", $slot, $qpsmtpd);
+}
+$qpsmtpd->log(LOGINFO, "...done creating $MAX_CONN threads, ready for action");
+
+while (1) {
+    my ($client, $cl_info) = $socket->accept();
+    next unless ($client and $cl_info);
+
+    $qpsmtpd->log(LOGINFO, "connect from: "
+                          .$client->peerhost.":".$client->peerport);
+    
+    # find first unsed slot:
+    my $slot = undef;
+    for (my $t = 0; $t < $MAX_CONN; $t++) {
+        next if $shared[$t]; # thread has work to do, get next
+        if (!defined $shared[$t]) { # undef = max connections per thread done
+            # "reap" this thread by join()ing it
+            my $thr = $threads[$t];
+            $qpsmtpd->log(LOGINFO, "joining thread ".$thr->tid
+                                  ." and creating new");
+            (undef) = $thr->join();
+            $shared[$t] = 0; # idle again
+            $thr = threads->create("run_client", $t, $qpsmtpd);
+            $threads[$t] = $thr;
+        }
+        $slot = $t; # found idle thread, use it
+        last;
+    }
+    unless (defined $slot) { # all busy..
+        respond_client($client, 451, "Too many connections, try again later");
+        $client->shutdown(2);
+        undef $client;
+        select undef, undef, undef, 0.25;
+        next;
+    }
+
+    # cleanup old fd ..., we can't undef (close) it after we passed the FD 
+    # it to the worker thread: the worker can't open it 
+    undef $sockets[$slot]; 
+    $sockets[$slot] = $client;
+    $shared[$slot]  = fileno($client);
+    # add to clients list...
+    { 
+      lock(%workers);
+      $workers{$slot} = $client->peeraddr;
+    };
+    # notify thread in slot $slot, he's got work to do...
+    { 
+      lock($shared[$slot]); 
+      cond_signal($shared[$slot]); 
+    };
+}
+
+sub run_client {
+    my $slot  = shift;
+    my $base  = shift;
+
+    for (my $conn = 0; $conn < $THREAD_CONNS; $conn++) {
+        # wait for something todo
+        { 
+          lock($shared[$slot]); 
+          cond_wait($shared[$slot])
+            until $shared[$slot]; 
+        };
+        
+        # too bad we can't share the socket object ...
+        my $client = IO::Socket::INET->new_from_fd($shared[$slot], "w+")
+          or do { 
+                  $base->log(LOGERROR, "new_from_fd() Failed: $!"); 
+                  # ... and cleanup
+                  { 
+                    lock(%workers);
+                    delete $workers{$slot};
+                  };
+                  { 
+                    lock($shared[$slot]); 
+                    $shared[$slot] = 0; 
+                  };
+                  next; 
+                };
+        $client->autoflush();
+        $qpsmtpd = $base; # clone base to get a fresh copy
+        $qpsmtpd->thread_init($client);
+
+        # maybe this should be moved to parent?
+        my ($rc, @msg) = 
+            $qpsmtpd->run_hooks("pre-connection",
+                                remote_ip   => $client->peerhost,
+                                remote_port => $client->peerport,
+                                local_ip    => $client->sockhost,
+                                local_port  => $client->sockport,
+                                max_conn_ip => $MAX_CONN_IP,
+                                child_addrs => [values %workers],
+                               );
+        if (   $rc == DENYSOFT
+            || $rc == DENYSOFT_DISCONNECT
+            || $rc == DENY
+            || $rc == DENY_DISCONNECT)
+        {
+            my $rc_reply = 451;
+            unless ($msg[0]) {
+                if ($rc == DENYSOFT || $rc == DENYSOFT_DISCONNECT) {
+                    @msg = ("Sorry, try again later");
+                }
+                else {
+                    @msg = ("Sorry, service not available to you");
+                    $rc_reply = 550;
+                }
+            }
+            respond_client($client, $rc_reply, @msg);
+        }
+        # end: move to parent?
+        else {
+            eval {
+                $qpsmtpd->start_connection(
+                                        local_port => $client->sockport,
+                                        local_ip   => 
inet_ntoa($client->sockaddr),
+                                        remote_port=> $client->peerport,
+                                      );
+                $qpsmtpd->run();
+            };    
+        }
+        $qpsmtpd->run_hooks("post-connection");
+        $qpsmtpd->connection->reset;
+        $client->shutdown(2);
+        $client->close();
+        # cleanup.. notify parent we're idle
+        { 
+          lock(%workers);
+          delete $workers{$slot};
+        };
+        { 
+          lock($shared[$slot]); 
+          $shared[$slot] = 0;
+        };
+    }
+    # max number of connections processed in this thread, return to parent
+    $base->log(LOGDEBUG, "thread ". threads->self->tid(). ", slot $slot: "
+                        ."lifetime reached, returning");
+    { 
+        lock($shared[$slot]);
+        $shared[$slot] = undef; # we need to be reaped
+    };
+    return;
+}
+
+sub respond_client {
+    my ($client, $code, @message) = @_;
+    $client->autoflush(1);
+    while (my $msg = shift @message) {
+        my $line = $code . (@message ? "-" : " ") . $msg;
+        warn ("reply to client: <$line>");
+        print $client "$line\r\n"
+          or (warn("Could not print [$line]: $!"), return 0);
+    }
+    return 1;
+}
+
+# vim: ts=4 sw=4 expandtab syn=perl

Reply via email to