Author: msergeant
Date: Tue Mar 18 08:20:45 2008
New Revision: 864
Added:
trunk/plugins/async/queue/
trunk/plugins/async/queue/smtp-forward
Log:
Async smtp-forward plugin
Added: trunk/plugins/async/queue/smtp-forward
==============================================================================
--- (empty file)
+++ trunk/plugins/async/queue/smtp-forward Tue Mar 18 08:20:45 2008
@@ -0,0 +1,390 @@
+#!/usr/bin/perl -w
+
+=head1 NAME
+
+smtp-forward
+
+=head1 DESCRIPTION
+
+This plugin forwards the mail via SMTP to a specified server, rather than
+delivering the email locally.
+
+=head1 CONFIG
+
+It takes one required parameter, the IP address or hostname to forward to.
+
+ async/queue/smtp-forward 10.2.2.2
+
+Optionally you can also add a port:
+
+ async/queue/smtp-forward 10.2.2.2 9025
+
+=cut
+
+use Qpsmtpd::Constants;
+
+sub register {
+ my ($self, $qp) = @_;
+
+ $self->register_hook(queue => "start_queue");
+ $self->register_hook(queue => "finish_queue");
+}
+
+sub init {
+ my ($self, $qp, @args) = @_;
+
+ if (@args > 0) {
+ if ($args[0] =~ /^([\.\w_-]+)$/) {
+ $self->{_smtp_server} = $1;
+ }
+ else {
+ die "Bad data in smtp server: $args[0]";
+ }
+ $self->{_smtp_port} = 25;
+ if (@args > 1 and $args[1] =~ /^(\d+)$/) {
+ $self->{_smtp_port} = $1;
+ }
+
+ $self->log(LOGWARN, "WARNING: Ignoring additional arguments.") if
(@args > 2);
+ }
+ else {
+ die("No SMTP server specified in smtp-forward config");
+ }
+
+}
+
+sub start_queue {
+ my ($self, $transaction) = @_;
+
+ my $qp = $self->qp;
+ my $SERVER = $self->{_smtp_server};
+ my $PORT = $self->{_smtp_port};
+ $self->log(LOGINFO, "forwarding to $SERVER:$PORT");
+
+ $transaction->notes('async_sender',
+ AsyncSMTPSender->new($SERVER, $PORT, $qp, $self, $transaction)
+ );
+
+ return YIELD;
+}
+
+sub finish_queue {
+ my ($self, $transaction) = @_;
+
+ my $sender = $transaction->notes('async_sender');
+
+ my ($rc, $msg) = $sender->results;
+
+ return $rc, $msg;
+}
+
+package AsyncSMTPSender;
+
+use IO::Socket;
+
+use base qw(Danga::Socket);
+use fields qw(
+ qp
+ pkg
+ tran
+ state
+ rcode
+ rmsg
+ buf
+ command
+ resp
+ to
+ );
+
+use constant ST_CONNECTING => 0;
+use constant ST_CONNECTED => 1;
+use constant ST_COMMANDS => 2;
+use constant ST_DATA => 3;
+
+use Qpsmtpd::Constants;
+
+sub new {
+ my ($self, $server, $port, $qp, $pkg, $transaction) = @_;
+ $self = fields::new($self) unless ref $self;
+
+ my $sock = IO::Socket::INET->new(
+ PeerAddr => $server,
+ PeerPort => $port,
+ Blocking => 0,
+ ) or die "Error connecting to server $server:$port : $!\n";
+
+ IO::Handle::blocking($sock, 0);
+ binmode($sock, ':raw');
+
+ $self->{qp} = $qp;
+ $self->{pkg} = $pkg;
+ $self->{tran} = $transaction;
+ $self->{state} = ST_CONNECTING;
+ $self->{rcode} = DECLINED;
+ $self->{command} = 'connect';
+ $self->{buf} = '';
+ $self->{resp} = [];
+ # copy the recipients so we can pop them off one by one
+ $self->{to} = [ $transaction->recipients ];
+
+ $self->SUPER::new($sock);
+ # Watch for write first, this is when the TCP session is established.
+ $self->watch_write(1);
+
+ return $self;
+}
+
+sub results {
+ my AsyncSMTPSender $self = shift;
+ return ( $self->{rcode}, $self->{rmsg} );
+}
+
+sub log {
+ my AsyncSMTPSender $self = shift;
+ $self->{qp}->log(@_);
+}
+
+sub cont {
+ my AsyncSMTPSender $self = shift;
+ $self->{qp}->run_continuation;
+}
+
+sub command {
+ my AsyncSMTPSender $self = shift;
+ my ($command, $params) = @_;
+ $params ||= '';
+
+ $self->log(LOGDEBUG, ">> $command $params");
+
+ $self->write(($command =~ m/ / ? "$command:" : $command)
+ . ($params ? " $params" : "") . "\r\n");
+ $self->watch_read(1);
+ $self->{command} = ($command =~ /(\S+)/)[0];
+}
+
+sub handle_response {
+ my AsyncSMTPSender $self = shift;
+
+ my $method = "cmd_" . lc($self->{command});
+
+ $self->$method(@_);
+}
+
+sub cmd_connect {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 220) {
+ $self->{rmsg} = "Error on connect: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ my $host = $self->{qp}->config('me');
+ print "HELOing with $host\n";
+ $self->command((join '', @$response) =~ m/ ESMTP/ ? "EHLO" : "HELO",
$host);
+ }
+}
+
+sub cmd_helo {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 250) {
+ $self->{rmsg} = "Error on HELO: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ $self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
+ }
+}
+
+sub cmd_ehlo {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 250) {
+ $self->{rmsg} = "Error on EHLO: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ $self->command("MAIL", "FROM:" . $self->{tran}->sender->format);
+ }
+}
+
+sub cmd_mail {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 250) {
+ $self->{rmsg} = "Error on MAIL FROM: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
+ }
+}
+
+sub cmd_rcpt {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 250) {
+ $self->{rmsg} = "Error on RCPT TO: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ if (@{$self->{to}}) {
+ $self->command("RCPT", "TO:" . shift(@{$self->{to}})->format);
+ }
+ else {
+ $self->command("DATA");
+ }
+ }
+}
+
+sub cmd_data {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 354) {
+ $self->{rmsg} = "Error on DATA: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ # $self->{state} = ST_DATA;
+ $self->datasend($self->{tran}->header->as_string);
+ $self->{tran}->body_resetpos;
+ while (my $line = $self->{tran}->body_getline) {
+ $self->log(LOGDEBUG, ">> $line");
+ $line =~ s/\r?\n/\r\n/;
+ $self->datasend($line);
+ }
+ $self->write(".\r\n");
+ $self->{command} = "DATAEND";
+ }
+}
+
+sub cmd_dataend {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ if ($code != 250) {
+ $self->{rmsg} = "Error after DATA: @$response";
+ $self->close;
+ $self->cont;
+ }
+ else {
+ $self->command("QUIT");
+ }
+}
+
+sub cmd_quit {
+ my AsyncSMTPSender $self = shift;
+ my ($code, $response) = @_;
+
+ $self->{rcode} = OK;
+ $self->{rmsg} = "Queued!";
+ $self->close;
+ $self->cont;
+}
+
+sub datasend {
+ my AsyncSMTPSender $self = shift;
+ my ($data) = @_;
+ $data =~ s/^\./../mg;
+ $self->write(\$data);
+}
+
+sub event_read {
+ my AsyncSMTPSender $self = shift;
+
+ if ($self->{state} == ST_CONNECTED) {
+ $self->{state} = ST_COMMANDS;
+ }
+
+ if ($self->{state} == ST_COMMANDS) {
+ my $in = $self->read(1024);
+ if (!$in) {
+ # XXX: connection closed
+ $self->close("lost connection");
+ return;
+ }
+
+ my @lines = split /\r?\n/, $self->{buf} . $$in, -1;
+ $self->{buf} = delete $lines[-1];
+
+ for(@lines) {
+ if (my ($code, $cont, $rest) = /^([0-9]{3})([ -])(.*)/) {
+ $self->log(LOGDEBUG, "<< $code$cont$rest");
+ push @{$self->{resp}}, $rest;
+
+ if($cont eq ' ') {
+ $self->handle_response($code, $self->{resp});
+ $self->{resp} = [];
+ }
+ }
+ else {
+ $self->log(LOGERROR, "Unrecognised SMTP response line: $_");
+ $self->{rmsg} = "Error from upstream SMTP server";
+ $self->close;
+ $self->cont;
+ }
+ }
+ }
+ else {
+ $self->log(LOGERROR, "SMTP Session occurred out of order");
+ $self->close;
+ $self->cont;
+ }
+}
+
+sub event_write {
+ my AsyncSMTPSender $self = shift;
+
+ if ($self->{state} == ST_CONNECTING) {
+ $self->watch_write(0);
+ $self->{state} = ST_CONNECTED;
+ $self->watch_read(1);
+ }
+ elsif (0 && $self->{state} == ST_DATA) {
+ # send more data
+ if (my $line = $self->{tran}->body_getline) {
+ $self->log(LOGDEBUG, ">> $line");
+ $line =~ s/\r?\n/\r\n/;
+ $self->datasend($line);
+ }
+ else {
+ # no more data.
+ $self->log(LOGINFO, "No more data");
+ $self->watch_write(0);
+ $self->{state} = ST_COMMANDS;
+ }
+ }
+ else {
+ $self->write(undef);
+ }
+}
+
+sub event_err {
+ my ($self) = @_;
+ eval { $self->read(1); }; # gives us the correct error in errno
+ $self->{rmsg} = "Read error from remote server: $!";
+ #print "lost connection: $!\n";
+ $self->close;
+ $self->cont;
+}
+
+sub event_hup {
+ my ($self) = @_;
+ eval { $self->read(1); }; # gives us the correct error in errno
+ $self->{rmsg} = "HUP error from remote server: $!";
+ #print "lost connection: $!\n";
+ $self->close;
+ $self->cont;
+}