Author: msergeant
Date: Tue Mar 18 08:20:45 2008
New Revision: 864

Added:
   trunk/plugins/async/queue/
   trunk/plugins/async/queue/smtp-forward

Log:
Async smtp-forward plugin


Added: trunk/plugins/async/queue/smtp-forward
==============================================================================
--- (empty file)
+++ trunk/plugins/async/queue/smtp-forward      Tue Mar 18 08:20:45 2008
@@ -0,0 +1,390 @@
+#!/usr/bin/perl -w
+
+=head1 NAME
+
+smtp-forward
+
+=head1 DESCRIPTION
+
+This plugin forwards the mail via SMTP to a specified server, rather than
+delivering the email locally.
+
+=head1 CONFIG
+
+It takes one required parameter, the IP address or hostname to forward to. 
+
+  async/queue/smtp-forward 10.2.2.2
+
+Optionally you can also add a port:
+
+  async/queue/smtp-forward 10.2.2.2 9025
+
+=cut
+
+use Qpsmtpd::Constants;
+
+sub register {
+    my ($self, $qp) = @_;
+    
+    $self->register_hook(queue => "start_queue");
+    $self->register_hook(queue => "finish_queue");
+}
+
+sub init {
+    my ($self, $qp, @args) = @_;
+
+    if (@args > 0) {
+        if ($args[0] =~ /^([\.\w_-]+)$/) {
+            $self->{_smtp_server} = $1;
+        }
+        else {
+            die "Bad data in smtp server: $args[0]";
+        }
+        $self->{_smtp_port} = 25;
+        if (@args > 1 and $args[1] =~ /^(\d+)$/) {
+            $self->{_smtp_port} = $1;
+        }
+    
+        $self->log(LOGWARN, "WARNING: Ignoring additional arguments.") if 
(@args > 2);
+    }
+    else {
+        die("No SMTP server specified in smtp-forward config");
+    }
+
+}
+
+sub start_queue {
+    my ($self, $transaction) = @_;
+    
+    my $qp = $self->qp;
+    my $SERVER = $self->{_smtp_server};
+    my $PORT   = $self->{_smtp_port};
+    $self->log(LOGINFO, "forwarding to $SERVER:$PORT");
+    
+    $transaction->notes('async_sender', 
+        AsyncSMTPSender->new($SERVER, $PORT, $qp, $self, $transaction)
+    );
+    
+    return YIELD;
+}
+
+sub finish_queue {
+    my ($self, $transaction) = @_;
+    
+    my $sender = $transaction->notes('async_sender');
+    
+    my ($rc, $msg) = $sender->results;
+    
+    return $rc, $msg;
+}
+
+package AsyncSMTPSender;
+
+use IO::Socket;
+
+use base qw(Danga::Socket);
+use fields qw(
+    qp
+    pkg
+    tran
+    state
+    rcode
+    rmsg
+    buf
+    command
+    resp
+    to
+    );
+
+use constant ST_CONNECTING => 0;
+use constant ST_CONNECTED  => 1;
+use constant ST_COMMANDS   => 2;
+use constant ST_DATA       => 3;
+
+use Qpsmtpd::Constants;
+
+sub new {
+    my ($self, $server, $port, $qp, $pkg, $transaction) = @_;
+    $self = fields::new($self) unless ref $self;
+    
+    my $sock = IO::Socket::INET->new(
+        PeerAddr => $server,
+        PeerPort => $port,
+        Blocking  => 0,
+    ) or die "Error connecting to server $server:$port : $!\n";
+
+    IO::Handle::blocking($sock, 0);
+    binmode($sock, ':raw');
+    
+    $self->{qp} = $qp;
+    $self->{pkg} = $pkg;
+    $self->{tran} = $transaction;
+    $self->{state} = ST_CONNECTING;
+    $self->{rcode} = DECLINED;
+    $self->{command} = 'connect';
+    $self->{buf} = '';
+    $self->{resp} = [];
+    # copy the recipients so we can pop them off one by one
+    $self->{to} = [ $transaction->recipients ];
+    
+    $self->SUPER::new($sock);
+    # Watch for write first, this is when the TCP session is established.
+    $self->watch_write(1);
+
+    return $self;
+}
+
+sub results {
+    my AsyncSMTPSender $self = shift;
+    return ( $self->{rcode}, $self->{rmsg} );
+}
+
+sub log {
+    my AsyncSMTPSender $self = shift;
+    $self->{qp}->log(@_);
+}
+
+sub cont {
+    my AsyncSMTPSender $self = shift;
+    $self->{qp}->run_continuation;
+}
+
+sub command {
+    my AsyncSMTPSender $self = shift;
+    my ($command, $params) = @_;
+    $params ||= '';
+    
+    $self->log(LOGDEBUG, ">> $command $params");
+    
+    $self->write(($command =~ m/ / ? "$command:" : $command)
+      . ($params ? " $params" : "") . "\r\n");
+    $self->watch_read(1);
+    $self->{command} = ($command =~ /(\S+)/)[0];
+}
+
+sub handle_response {
+    my AsyncSMTPSender $self = shift;
+    
+    my $method = "cmd_" . lc($self->{command});
+    
+    $self->$method(@_);
+}
+
+sub cmd_connect {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 220) {
+        $self->{rmsg} = "Error on connect: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        my $host = $self->{qp}->config('me');
+        print "HELOing with $host\n";
+        $self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO", 
$host);
+    }
+}
+
+sub cmd_helo {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 250) {
+        $self->{rmsg} = "Error on HELO: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        $self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
+    }
+}
+
+sub cmd_ehlo {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 250) {
+        $self->{rmsg} = "Error on EHLO: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        $self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
+    }
+}
+
+sub cmd_mail {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 250) {
+        $self->{rmsg} = "Error on MAIL FROM: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
+    }
+}
+
+sub cmd_rcpt {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 250) {
+        $self->{rmsg} = "Error on RCPT TO: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        if (@{$self->{to}}) {
+            $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
+        }
+        else {
+            $self->command("DATA");
+        }
+    }
+}
+
+sub cmd_data {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 354) {
+        $self->{rmsg} = "Error on DATA: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        # $self->{state} = ST_DATA;
+        $self->datasend($self->{tran}->header->as_string);
+        $self->{tran}->body_resetpos;
+        while (my $line = $self->{tran}->body_getline) {
+            $self->log(LOGDEBUG, ">> $line");
+            $line =~ s/\r?\n/\r\n/;
+            $self->datasend($line);
+        }
+        $self->write(".\r\n");
+        $self->{command} = "DATAEND";
+    }
+}
+
+sub cmd_dataend {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    if ($code != 250) {
+        $self->{rmsg} = "Error after DATA: @$response";
+        $self->close;
+        $self->cont;
+    }
+    else {
+        $self->command("QUIT");
+    }
+}
+
+sub cmd_quit {
+    my AsyncSMTPSender $self = shift;
+    my ($code, $response) = @_;
+    
+    $self->{rcode} = OK;
+    $self->{rmsg} = "Queued!";
+    $self->close;
+    $self->cont;
+}
+
+sub datasend {
+    my AsyncSMTPSender $self = shift;
+    my ($data) = @_;
+    $data =~ s/^\./../mg;
+    $self->write(\$data);
+}
+
+sub event_read {
+    my AsyncSMTPSender $self = shift;
+    
+    if ($self->{state} == ST_CONNECTED) {
+        $self->{state} = ST_COMMANDS;
+    }
+
+    if ($self->{state} == ST_COMMANDS) {
+        my $in = $self->read(1024);
+        if (!$in) {
+            # XXX: connection closed
+            $self->close("lost connection");
+            return;
+        }
+    
+        my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
+        $self->{buf} = delete $lines[-1];
+    
+        for(@lines) {
+            if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) {
+                $self->log(LOGDEBUG, "<< $code$cont$rest");
+                push @{$self->{resp}}, $rest;
+
+                if($cont eq ' ') {
+                    $self->handle_response($code, $self->{resp});
+                    $self->{resp} = [];
+                }
+            }
+            else {
+                $self->log(LOGERROR, "Unrecognised SMTP response line: $_");
+                $self->{rmsg} = "Error from upstream SMTP server";
+                $self->close;
+                $self->cont;
+            }
+        }
+    }
+    else {
+        $self->log(LOGERROR, "SMTP Session occurred out of order");
+        $self->close;
+        $self->cont;
+    }
+}
+
+sub event_write {
+    my AsyncSMTPSender $self = shift;
+
+    if ($self->{state} == ST_CONNECTING) {
+        $self->watch_write(0);
+        $self->{state} = ST_CONNECTED;
+        $self->watch_read(1);
+    }
+    elsif (0 && $self->{state} == ST_DATA) {
+        # send more data
+        if (my $line = $self->{tran}->body_getline) {
+            $self->log(LOGDEBUG, ">> $line");
+            $line =~ s/\r?\n/\r\n/;
+            $self->datasend($line);
+        }
+        else {
+            # no more data.
+            $self->log(LOGINFO, "No more data");
+            $self->watch_write(0);
+            $self->{state} = ST_COMMANDS;
+        }
+    }
+    else {
+        $self->write(undef);
+    }
+}
+
+sub event_err {
+    my ($self) = @_;
+    eval { $self->read(1); }; # gives us the correct error in errno
+    $self->{rmsg} = "Read error from remote server: $!";
+    #print "lost connection: $!\n";
+    $self->close;
+    $self->cont;
+}
+
+sub event_hup {
+    my ($self) = @_;
+    eval { $self->read(1); }; # gives us the correct error in errno
+    $self->{rmsg} = "HUP error from remote server: $!";
+    #print "lost connection: $!\n";
+    $self->close;
+    $self->cont;
+}

Reply via email to