Author: vetinari
Date: Tue Jun 10 10:47:42 2008
New Revision: 926
Added:
contrib/vetinari/thread/
contrib/vetinari/thread/lib/
contrib/vetinari/thread/lib/Qpsmtpd/
contrib/vetinari/thread/lib/Qpsmtpd/SMTP/
contrib/vetinari/thread/lib/Qpsmtpd/SMTP/Threaded.pm
contrib/vetinari/thread/lib/Qpsmtpd/Threaded.pm
contrib/vetinari/thread/qpsmtpd-threaded (contents, props changed)
Log:
qpsmtpd-threaded - new flavour with perl threads -- experimental
Added: contrib/vetinari/thread/lib/Qpsmtpd/SMTP/Threaded.pm
==============================================================================
--- (empty file)
+++ contrib/vetinari/thread/lib/Qpsmtpd/SMTP/Threaded.pm Tue Jun 10
10:47:42 2008
@@ -0,0 +1,9 @@
+package Qpsmtpd::SMTP::Threaded;
+use Qpsmtpd::SMTP;
+use Qpsmtpd::Constants;
+
[EMAIL PROTECTED] = qw(Qpsmtpd::SMTP);
+
+sub getline { shift->_socket->getline; }
+
+1;
Added: contrib/vetinari/thread/lib/Qpsmtpd/Threaded.pm
==============================================================================
--- (empty file)
+++ contrib/vetinari/thread/lib/Qpsmtpd/Threaded.pm Tue Jun 10 10:47:42 2008
@@ -0,0 +1,85 @@
+
+package Qpsmtpd::Threaded;
+
+use Socket;
+use Qpsmtpd::Constants;
+use base qw(Qpsmtpd::SMTP::Threaded Qpsmtpd::TcpServer);
+
+sub thread_init {
+ my ($self, $sock) = @_;
+ $self->{_socket} = $sock;
+ $self->{_select} = IO::Select->new();
+ $self->{_select}->add($sock);
+}
+
+sub _select { shift->{_select}; }
+sub _socket { shift->{_socket}; }
+
+sub start_connection {
+ my $self = shift;
+
+ $self->{_connection} = Qpsmtpd::Connection->new(); #reset connection
+ $self->reset_transaction;
+
+ my $cl_sockaddr = $self->{_socket}->peername;
+ my ($port, $iaddr) = sockaddr_in($cl_sockaddr);
+ my $remote_ip = inet_ntoa($iaddr);
+ my $remote_host = gethostbyaddr($iaddr, AF_INET) || "[$remote_ip]";
+ my $remote_info = $remote_host;
+ $remote_host =~ tr/a-zA-Z\.\-0-9//cd;
+ $self->SUPER::connection->start(
+ remote_info => $remote_info,
+ remote_ip => $remote_ip,
+ remote_host => $remote_host,
+ @_,
+ );
+}
+
+sub read_input {
+ my $self = shift;
+
+ my $timeout =
+ $self->config('timeoutsmtpd') # qmail smtpd control file
+ || $self->config('timeout') # qpsmtpd control file
+ || 1200; # default value
+
+ while (1) {
+ die "threaded_disconnect" unless $self->_socket->connected;
+ $self->_select->can_read($timeout)
+ or $self->respond(500, "Time's over..."),
+ $self->disconnect;
+ $_ = $self->_socket->getline;
+ die "threaded_disconnect" unless defined $_;
+ $_ =~ s/\r?\n$//s;
+ $self->log(LOGINFO, "dispatching $_");
+ $self->connection->notes('original_string', $_);
+ defined $self->dispatch(split / +/, $_, 2)
+ or $self->respond(502, "command unrecognized: '$_'");
+ }
+}
+
+sub respond {
+ my ($self, $code, @messages) = @_;
+ my $buf = '';
+ while (my $msg = shift @messages) {
+ my $line = $code . (@messages?"-":" ").$msg;
+ $self->log(LOGINFO, $line);
+ $buf .= "$line\r\n";
+ }
+ $self->_socket->print($buf)
+ or ($self->log(LOGERROR, "Could not print [$buf]: $!"), return 0);
+ return 1;
+}
+
+sub disconnect {
+ my $self = shift;
+ $self->log(LOGINFO,"click, disconnecting");
+ $self->SUPER::disconnect(@_);
+ # $self->run_hooks("post-connection");
+ # $self->connection->reset;
+ $self->_socket->shutdown(2);
+}
+
+1;
+
+# vim: ts=4 sw=4 expandtab syn=perl
Added: contrib/vetinari/thread/qpsmtpd-threaded
==============================================================================
--- (empty file)
+++ contrib/vetinari/thread/qpsmtpd-threaded Tue Jun 10 10:47:42 2008
@@ -0,0 +1,303 @@
+#!/usr/bin/perl -wT
+#
+#
+# TODO:
+# * use vars instead of calling $client->peer*, $client->sock* so many times :)
+# WARNING: experimental
+use strict;
+use threads;
+use threads::shared;
+
+use IO::Socket;
+use Socket;
+use POSIX;
+
+use lib 'lib';
+use Qpsmtpd::TcpServer;
+use Qpsmtpd::Threaded;
+use Qpsmtpd::Constants;
+use Getopt::Long;
+
+# secure shell
+$ENV{'PATH'} = '/bin:/usr/bin';
+delete @ENV{qw(IFS CDPATH ENV BASH_ENV)};
+
+my %workers = (); # client's list
+my @shared = (); # shared[$slot] = fd / 0 / undef (client / idle / shutdown)
+my @sockets = (); # just for cleaning up...
+my @threads = (); # keep thread objects here
+
+my $PORT = 2525;
+my $MAX_CONN = 25;
+my $MAX_CONN_IP = 8;
+my $BIND_ADDR = '0.0.0.0';
+my $THREAD_CONNS = 100; # max times a thread will be reused, same as -prefork
+my $USER;
+my $PID_FILE;
+my $DETACH;
+
+sub usage {
+ print <<"EOT";
+Usage: qpsmtpd-threaded [ options ]
+ --interface addr : Interface daemon should listen on (default: $BIND_ADDR)
+ --port int : TCP port daemon should listen on (default: $PORT)
+ --max-from-ip int : Limit number of connections from single IP (default:
$MAX_CONN_IP, 0 to disable)
+ --threads int : Number of threads that will be created (default:
$MAX_CONN)
+ --user username : User the daemon should run as
+ --pid-file path : Path to pid file
+ --detach : detach from controlling terminal (daemonize)
+ --help : This message
+EOT
+ exit 0;
+}
+
+GetOptions(
+ 'interface=s' => \$BIND_ADDR,
+ 'port|p=i' => \$PORT,
+ 'threads|t=i' => \$MAX_CONN,
+ 'max-from-ip=i' => \$MAX_CONN_IP,
+ 'user|u=s' => \$USER,
+ 'pid-file' => \$PID_FILE,
+ 'detach' => \$DETACH,
+ 'help' => sub { &usage },
+) or &usage;
+
+if ($USER and $USER =~ /^([\w\-]+)$/) { $USER = $1 } else { print "USER\n";
&usage }
+if ($BIND_ADDR =~ /^([\d\.\:a-f]+)$/i) { $BIND_ADDR = $1 } else { print
"ADDR\n"; &usage }
+$MAX_CONN_IP = $MAX_CONN unless $MAX_CONN_IP;
+++$MAX_CONN_IP; # plugins/hosts_allow bug...
+
+if ($PID_FILE) {
+ if ($PID_FILE =~ m#^(/[\w\d/\-.]+)$#) { $PID_FILE = $1 } else { &usage }
+ if (-e $PID_FILE) {
+ open PID, "+<$PID_FILE"
+ or die "open pid file: $!\n";
+ my $running = <PID> || '';
+ if ($running =~ /^(\d+)$/) {
+ $running = $1;
+ die "Found an already running qpsmtpd with pid $running.\n"
+ if (kill 0, $running);
+ }
+ seek PID, 0, 0
+ or die "Could not seek back to beginning of $PID_FILE: $!\n";
+ truncate PID, 0
+ or die "Could not truncate $PID_FILE at 0: $!";
+ }
+ else {
+ open PID, ">$PID_FILE"
+ or die "open pid_file: $!\n";
+ }
+}
+
+my $socket = IO::Socket::INET->new(
+ LocalPort => $PORT,
+ LocalAddr => $BIND_ADDR,
+ Proto => 'tcp',
+ Listen => SOMAXCONN,
+ Reuse => 1,
+ );
+die "FATAL: failed to open socket: $!\n" unless $socket;
+if ($DETACH) {
+ open STDIN, '/dev/null' or die "/dev/null: $!";
+ open STDOUT, '>/dev/null' or die "/dev/null: $!";
+ open STDERR, '>&STDOUT' or die "open(stderr): $!";
+ defined (my $pid = fork) or die "fork: $!";
+ exit 0 if $pid;
+ POSIX::setsid or die "setsid: $!";
+}
+if ($PID_FILE) {
+ print PID $$,"\n";
+ close PID;
+}
+if ($USER) {
+ my ($quid, $qgid, $groups);
+ (undef, undef, $quid, $qgid) = getpwnam $USER
+ or die "unable to determine uid/gid for $USER\n";
+ $groups = "$qgid $qgid";
+ while (my ($name,$passwd,$gid,$members) = getgrent()) {
+ my @m = split(/ /, $members);
+ if (grep {$_ eq $USER} @m) {
+ $groups .= " $gid";
+ }
+ }
+ endgrent;
+ # change UUID/UGID
+ $) = $groups;
+ POSIX::setgid($qgid) or die "unable to change gid: $!\n";
+ POSIX::setuid($quid) or die "unable to change uid: $!\n";
+ $> = $quid;
+ die "FATAL: failed to setuid to user: $USER, uid: $quid\n"
+ if ($> != $quid and $> != ($quid - 2**32));
+}
+
+# ... and now for the real stuff ;-)
+my $qpsmtpd = Qpsmtpd::Threaded->new();
+$qpsmtpd->load_plugins;
+$qpsmtpd->spool_dir;
+$qpsmtpd->size_threshold;
+
+&share(\%workers); # used by pre-connection hook
+
+$qpsmtpd->log(LOGINFO, "Please be patient, creating $MAX_CONN threads...");
+for (my $slot = 0; $slot < $MAX_CONN; $slot++) {
+ $shared[$slot] = 0; # 0 = idle thread
+ &share(\$shared[$slot]);
+ $threads[$slot] = threads->create("run_client", $slot, $qpsmtpd);
+}
+$qpsmtpd->log(LOGINFO, "...done creating $MAX_CONN threads, ready for action");
+
+while (1) {
+ my ($client, $cl_info) = $socket->accept();
+ next unless ($client and $cl_info);
+
+ $qpsmtpd->log(LOGINFO, "connect from: "
+ .$client->peerhost.":".$client->peerport);
+
+ # find first unsed slot:
+ my $slot = undef;
+ for (my $t = 0; $t < $MAX_CONN; $t++) {
+ next if $shared[$t]; # thread has work to do, get next
+ if (!defined $shared[$t]) { # undef = max connections per thread done
+ # "reap" this thread by join()ing it
+ my $thr = $threads[$t];
+ $qpsmtpd->log(LOGINFO, "joining thread ".$thr->tid
+ ." and creating new");
+ (undef) = $thr->join();
+ $shared[$t] = 0; # idle again
+ $thr = threads->create("run_client", $t, $qpsmtpd);
+ $threads[$t] = $thr;
+ }
+ $slot = $t; # found idle thread, use it
+ last;
+ }
+ unless (defined $slot) { # all busy..
+ respond_client($client, 451, "Too many connections, try again later");
+ $client->shutdown(2);
+ undef $client;
+ select undef, undef, undef, 0.25;
+ next;
+ }
+
+ # cleanup old fd ..., we can't undef (close) it after we passed the FD
+ # it to the worker thread: the worker can't open it
+ undef $sockets[$slot];
+ $sockets[$slot] = $client;
+ $shared[$slot] = fileno($client);
+ # add to clients list...
+ {
+ lock(%workers);
+ $workers{$slot} = $client->peeraddr;
+ };
+ # notify thread in slot $slot, he's got work to do...
+ {
+ lock($shared[$slot]);
+ cond_signal($shared[$slot]);
+ };
+}
+
+sub run_client {
+ my $slot = shift;
+ my $base = shift;
+
+ for (my $conn = 0; $conn < $THREAD_CONNS; $conn++) {
+ # wait for something todo
+ {
+ lock($shared[$slot]);
+ cond_wait($shared[$slot])
+ until $shared[$slot];
+ };
+
+ # too bad we can't share the socket object ...
+ my $client = IO::Socket::INET->new_from_fd($shared[$slot], "w+")
+ or do {
+ $base->log(LOGERROR, "new_from_fd() Failed: $!");
+ # ... and cleanup
+ {
+ lock(%workers);
+ delete $workers{$slot};
+ };
+ {
+ lock($shared[$slot]);
+ $shared[$slot] = 0;
+ };
+ next;
+ };
+ $client->autoflush();
+ $qpsmtpd = $base; # clone base to get a fresh copy
+ $qpsmtpd->thread_init($client);
+
+ # maybe this should be moved to parent?
+ my ($rc, @msg) =
+ $qpsmtpd->run_hooks("pre-connection",
+ remote_ip => $client->peerhost,
+ remote_port => $client->peerport,
+ local_ip => $client->sockhost,
+ local_port => $client->sockport,
+ max_conn_ip => $MAX_CONN_IP,
+ child_addrs => [values %workers],
+ );
+ if ( $rc == DENYSOFT
+ || $rc == DENYSOFT_DISCONNECT
+ || $rc == DENY
+ || $rc == DENY_DISCONNECT)
+ {
+ my $rc_reply = 451;
+ unless ($msg[0]) {
+ if ($rc == DENYSOFT || $rc == DENYSOFT_DISCONNECT) {
+ @msg = ("Sorry, try again later");
+ }
+ else {
+ @msg = ("Sorry, service not available to you");
+ $rc_reply = 550;
+ }
+ }
+ respond_client($client, $rc_reply, @msg);
+ }
+ # end: move to parent?
+ else {
+ eval {
+ $qpsmtpd->start_connection(
+ local_port => $client->sockport,
+ local_ip =>
inet_ntoa($client->sockaddr),
+ remote_port=> $client->peerport,
+ );
+ $qpsmtpd->run();
+ };
+ }
+ $qpsmtpd->run_hooks("post-connection");
+ $qpsmtpd->connection->reset;
+ $client->shutdown(2);
+ $client->close();
+ # cleanup.. notify parent we're idle
+ {
+ lock(%workers);
+ delete $workers{$slot};
+ };
+ {
+ lock($shared[$slot]);
+ $shared[$slot] = 0;
+ };
+ }
+ # max number of connections processed in this thread, return to parent
+ $base->log(LOGDEBUG, "thread ". threads->self->tid(). ", slot $slot: "
+ ."lifetime reached, returning");
+ {
+ lock($shared[$slot]);
+ $shared[$slot] = undef; # we need to be reaped
+ };
+ return;
+}
+
+sub respond_client {
+ my ($client, $code, @message) = @_;
+ $client->autoflush(1);
+ while (my $msg = shift @message) {
+ my $line = $code . (@message ? "-" : " ") . $msg;
+ warn ("reply to client: <$line>");
+ print $client "$line\r\n"
+ or (warn("Could not print [$line]: $!"), return 0);
+ }
+ return 1;
+}
+
+# vim: ts=4 sw=4 expandtab syn=perl