On 5 Jan 2006, at 15:33, John Peacock wrote:
I've attached a segment of the server log, as well as the complete
swaks
session. As you can see, it looks like cram-md5 code is not able to
read the client input at line lib/Qpsmtpd/Auth.pm:280, so it gets
passed
as an unrecognized command. Is there something different I should be
doing while running under pollserver to read STDIN?
Yeah you can't read from STDIN because the socket is non-blocking, and
so you have to just wait to be told there's a line available. This will
require a continuation - I'll look into it now.
I'd also appreciate any hints you could give me on how to understand
the
Danga::* code as well. I expect it would be esential to know how to do
the virus scanning under an asynchronous scheme. It seems like the
Danga::Socket code came from Danga.com's wcmtools, but it isn't
documented (well, not in such a way that I can make heads or tails of
it).
ok, from the top then...
Yes, Danga::Socket is code from Brad Fitzpatrick's livejournal stuff.
It's basically a module that does essentially the same thing as POE or
Event or LibEvent, but is lighter and faster than POE and Event.pm, and
doesn't require XS like LibEvent.
The basic idea is you have a socket (file descriptor) that you want to
receive notification events on - usually just event_read - i.e. data is
available. In order to process these you subclass Danga::Socket to
create a class that handles all the relevant events, and construct an
instance of your new class passing in the socket:
MyDangaSubclass->new($socket). You don't need to store that object
anywhere, it is automatically stored for you in the sea of objects
processed by Danga::Socket.
Danga::Socket uses "use fields" for field safety and a bit of a
performance gain. So subclasses need to follow suit.
I've also created subclasses of Danga::Socket called Danga::Client and
Danga::TimeoutSocket, which basically add in extra capabilities (mostly
stacked read suspension, line by line processing and timeouts). Usually
you should be subclassing Danga::Client instead of Danga::Socket.
Now in order to get Clamd, smtp-forward and spamd working in a
non-blocking manner each will require their own subclass of
Danga::Client to handle the communication, and a continuation so that
other clients can be processed while those things occur. That's not
really as hard as it sounds, but it does take a bit of getting your
head around.
The easiest thing to do might be to see a simple example client class.
So here's the start of an SMTP client. Note this is subclassing
Danga::Socket instead of Danga::Client, so I've re-implemented some of
the line by line and timeout stuff that Danga::Client does for you:
package SMTPSock;
use base qw(Danga::Socket);
use fields qw(connected ip line);
use IO::Handle;
use Socket;
# total session time
use constant SESSION_TIMEOUT => 3600;
# connection timeout
use constant CONNECT_TIMEOUT => 60;
BEGIN {
our $PROTO = getprotobyname('tcp');
our $PORT = getservbyname('smtp', 'tcp');
}
sub new {
my SMTPSock $self = shift;
$self = fields::new($self) unless ref $self;
$self->{ip} = shift;
my $sin = sockaddr_in($PORT,inet_aton($self->{ip}));
socket(my $sock, PF_INET, SOCK_STREAM, $PROTO);
IO::Handle::blocking($sock, 0);
$self->{connected} = connect($sock, $sin);
$self->SUPER::new( $sock );
$self->watch_read(1);
# connect timeout
$self->AddTimer(CONNECT_TIMEOUT, sub { connect_timeout($self) });
return $self;
}
sub log {
my SMTPSock $self = shift;
warn("$self->{ip} : ", @_);
}
sub connect_timeout {
my SMTPSock $self = shift;
if (!$self->{connected} && !$self->{closed}) {
$self->log("Connection timed out\n");
$self->close("connect timeout");
}
}
sub session_timeout {
my SMTPSock $self = shift;
$self->close("session timeout");
}
sub event_read {
my SMTPSock $self = shift;
if (!$self->{connected}) {
if (eof($self->sock)) {
$self->close("connection failed");
}
else {
$self->log("connected\n");
$self->{connected} = 1;
$self->AddTimer(SESSION_TIMEOUT, sub {
session_timeout($self) });
}
}
else {
my $bref = $self->read(8192);
return $self->close($!) unless defined $bref;
$self->process_read_buf($bref);
}
}
sub close {
my SMTPSock $self = shift;
my $code = shift;
$self->log("Closing: $code\n");
$self->SUPER::close($code);
}
sub event_err {
my SMTPSock $self = shift;
$self->log("error on socket\n");
$self->close("$self->{ip} : error on socket");
}
sub event_hup {
my SMTPSock $self = shift;
$self->log("HUP\n");
$self->close("$self->{ip} : remote end hangup");
}
sub process_read_buf {
my SMTPSock $self = shift;
my $bref = shift;
$self->{line} .= $$bref;
while ($self->{line} =~ s/^(.*?\n)//) {
my $line = $1;
my $resp = $self->process_line($line);
$self->log("> $resp") if $resp;
$self->write($resp) if $resp;
}
}
########################################################################
####
# EDIT HERE
sub process_line {
my SMTPSock $self = shift;
my $line = shift;
$self->log("< $line");
# examine $line for recognised responses. Send data by return()ing
what
# you want to write() to the socket.
#
# If you need to maintain state, add fields to the "use fields"
line above.
if ($line =~ /^220 /) {
# just connected?
return "EHLO foo\r\n";
}
# etc
$self->log("Unrecognised line: $line");
return "QUIT\r\n";
}