On 5 Jan 2006, at 15:33, John Peacock wrote:

I've attached a segment of the server log, as well as the complete swaks
session.  As you can see, it looks like cram-md5 code is not able to
read the client input at line lib/Qpsmtpd/Auth.pm:280, so it gets passed
as an unrecognized command.  Is there something different I should be
doing while running under pollserver to read STDIN?

Yeah you can't read from STDIN because the socket is non-blocking, and so you have to just wait to be told there's a line available. This will require a continuation - I'll look into it now.

I'd also appreciate any hints you could give me on how to understand the
Danga::* code as well.  I expect it would be esential to know how to do
the virus scanning under an asynchronous scheme.  It seems like the
Danga::Socket code came from Danga.com's wcmtools, but it isn't
documented (well, not in such a way that I can make heads or tails of it).

ok, from the top then...

Yes, Danga::Socket is code from Brad Fitzpatrick's livejournal stuff. It's basically a module that does essentially the same thing as POE or Event or LibEvent, but is lighter and faster than POE and Event.pm, and doesn't require XS like LibEvent.

The basic idea is you have a socket (file descriptor) that you want to receive notification events on - usually just event_read - i.e. data is available. In order to process these you subclass Danga::Socket to create a class that handles all the relevant events, and construct an instance of your new class passing in the socket: MyDangaSubclass->new($socket). You don't need to store that object anywhere, it is automatically stored for you in the sea of objects processed by Danga::Socket.

Danga::Socket uses "use fields" for field safety and a bit of a performance gain. So subclasses need to follow suit.

I've also created subclasses of Danga::Socket called Danga::Client and Danga::TimeoutSocket, which basically add in extra capabilities (mostly stacked read suspension, line by line processing and timeouts). Usually you should be subclassing Danga::Client instead of Danga::Socket.

Now in order to get Clamd, smtp-forward and spamd working in a non-blocking manner each will require their own subclass of Danga::Client to handle the communication, and a continuation so that other clients can be processed while those things occur. That's not really as hard as it sounds, but it does take a bit of getting your head around.

The easiest thing to do might be to see a simple example client class. So here's the start of an SMTP client. Note this is subclassing Danga::Socket instead of Danga::Client, so I've re-implemented some of the line by line and timeout stuff that Danga::Client does for you:

package SMTPSock;

use base qw(Danga::Socket);
use fields qw(connected ip line);

use IO::Handle;
use Socket;

# total session time
use constant SESSION_TIMEOUT => 3600;
# connection timeout
use constant CONNECT_TIMEOUT => 60;

BEGIN {
    our $PROTO = getprotobyname('tcp');
    our $PORT = getservbyname('smtp', 'tcp');
}

sub new {
    my SMTPSock $self = shift;

    $self = fields::new($self) unless ref $self;

    $self->{ip} = shift;
    my $sin = sockaddr_in($PORT,inet_aton($self->{ip}));
    socket(my $sock, PF_INET, SOCK_STREAM, $PROTO);
    IO::Handle::blocking($sock, 0);
    $self->{connected} = connect($sock, $sin);

    $self->SUPER::new( $sock );
    $self->watch_read(1);

    # connect timeout
    $self->AddTimer(CONNECT_TIMEOUT, sub { connect_timeout($self) });

    return $self;
}

sub log {
    my SMTPSock $self = shift;
    warn("$self->{ip} : ", @_);
}

sub connect_timeout {
    my SMTPSock $self = shift;
    if (!$self->{connected} && !$self->{closed}) {
        $self->log("Connection timed out\n");
        $self->close("connect timeout");
    }
}

sub session_timeout {
    my SMTPSock $self = shift;
    $self->close("session timeout");
}

sub event_read {
    my SMTPSock $self = shift;
    if (!$self->{connected}) {
        if (eof($self->sock)) {
            $self->close("connection failed");
        }
        else {
            $self->log("connected\n");
            $self->{connected} = 1;
$self->AddTimer(SESSION_TIMEOUT, sub { session_timeout($self) });
        }
    }
    else {
        my $bref = $self->read(8192);
        return $self->close($!) unless defined $bref;
        $self->process_read_buf($bref);
    }
}

sub close {
    my SMTPSock $self = shift;
    my $code = shift;
    $self->log("Closing: $code\n");
    $self->SUPER::close($code);
}

sub event_err {
    my SMTPSock $self = shift;
    $self->log("error on socket\n");
    $self->close("$self->{ip} : error on socket");
}

sub event_hup {
    my SMTPSock $self = shift;
    $self->log("HUP\n");
    $self->close("$self->{ip} : remote end hangup");
}

sub process_read_buf {
    my SMTPSock $self = shift;
    my $bref = shift;
    $self->{line} .= $$bref;

    while ($self->{line} =~ s/^(.*?\n)//) {
        my $line = $1;
        my $resp = $self->process_line($line);
        $self->log("> $resp") if $resp;
        $self->write($resp) if $resp;
    }
}

######################################################################## ####
# EDIT HERE

sub process_line {
    my SMTPSock $self = shift;
    my $line = shift;

    $self->log("< $line");

# examine $line for recognised responses. Send data by return()ing what
    # you want to write() to the socket.
    #
# If you need to maintain state, add fields to the "use fields" line above.

    if ($line =~ /^220 /) {
        # just connected?
        return "EHLO foo\r\n";
    }
    # etc
    $self->log("Unrecognised line: $line");
    return "QUIT\r\n";
}

Reply via email to