Author: vetinari
Date: Tue Aug 21 07:20:41 2007
New Revision: 774

Modified:
   contrib/vetinari/experimental/chunking

Log:
chunking plugin update: fixes, additions
 - now supports Apache::Qpsmtpd, qpsmtpd-forkserver, qpsmtpd-prefork
 - fix missing empty line between header and body
 - add received_line hook


Modified: contrib/vetinari/experimental/chunking
==============================================================================
--- contrib/vetinari/experimental/chunking      (original)
+++ contrib/vetinari/experimental/chunking      Tue Aug 21 07:20:41 2007
@@ -33,23 +33,65 @@
 
 DON'T USE :P
 
+=head1 TODO
+
+=over 4
+
+=item headers
+
+get headers earlier, so we can run a (currently non existing) C<data_headers>?
+hook
+
+=item CRLF
+
+s/\r?\n$/\n/: only in headers ... and in body when BODY=BINARYMIME was not?
+given
+
+=back
+
 =cut
 
 use Qpsmtpd::DSN;
 use POSIX qw(strftime);
+use Fcntl qw(:seek);
+
+our $reader;
 
 sub register {
     my ($self, $qp, @args) = @_;
     if (@args > 2) {
         $self->log(LOGERROR, "Bad parameters for the chunking plugin")
     }
+
     $self->{_binarymime} = 0;
     if (lc $args[0] eq 'binarymime') {
         $self->{_binarymime} = 1;
     }
+
+    $reader = 'read_block';
+    $self->{_bdat_block} = 4096;
+    if ($qp->{conn} && $qp->{conn}->isa('Apache2::Connection')) {
+        $reader = 'ap_read_block';
+        $self->{_bdat_block} = 8000;
+
+        require APR::Const;
+        APR::Const->import(qw(BLOCK_READ EOF SUCCESS TIMEUP));
+
+        # require APR::Socket;
+        # APR::Socket->import(qw());
+
+        require Apache2::Const;
+        Apache2::Const->import(qw(MODE_READBYTES));
+
+        require Apache2::Connection;
+        Apache2::Connection->import(qw());
+
+        require APR::Error;
+        APR::Error->import(qw());
+    }
 }
 
-sub hook_ehlo {
+sub hook_ehlo { # announce that we're able to do CHUNKING (and BINARYMIME)
     my ($self, $transaction) = @_;
     my $cap = $transaction->notes('capabilities');
     $cap ||= [];
@@ -78,9 +120,10 @@
 
 sub hook_unrecognized_command {
     my ($self, $transaction, $cmd, $size) = @_;
-    return (DECLINED) unless $cmd eq 'bdat';
-    my ($err, $last);
+    return (DECLINED) 
+      unless lc($cmd) eq 'bdat';
 
+    my ($err, $last);
     my $msg_size = $transaction->notes('bdat_size') || 0;
     
     # DATA and BDAT commands cannot be used in the same transaction.  If a
@@ -107,52 +150,52 @@
 
     ($err) = 
         (Qpsmtpd::DSN->proto_syntax_error("Syntax error in BDAT 
parameter"))[1];
-    
-    unless (defined $size || $size =~ /^\d+$/) {
-        $self->qp->respond(552, $err);
-        return (DONE);
-    }
 
-    if ($size =~ /^(\d+)\s*(LAST)?\s*$/) {
+    if ($size =~ /^(\d+)\s*(\S+)?\s*$/) {
         $size = $1;
         $last = $2;
     }
+    unless (defined $size && $size =~ /^\d+$/) {
+        $self->qp->respond(552, $err);
+        return (DONE);
+    }
 
-    if (defined $last) {
-        if ($last =~ /^LAST$/i) { # RFC says LAST all upper, we don't care
-            $last = 1;
-        }
-        else {
+    if (!defined($last) or $last =~ /^$/) {
+        $last = $size ? 0 : 1;
+    }
+    else {
+        unless (uc($last) eq 'LAST') { # RFC says LAST all upper, we don't care
             $self->qp->respond(552, $err);
             return (DONE);
         }
-    }
-    else {
-        $last = 0;
-        ($last = 1) unless $size;
+        $last = 1;
     }
 
     $transaction->notes('bdat_bdat', 1); # remember we've seen BDAT
 
-    # get a file to write the data chunks
-    my $file = $transaction->body_filename;
-    # ouch :o), don't do this abuse of internals at home kids :P 
-    my $fh   = $transaction->body_fh; 
-    seek($fh, 0, 2)
+    ## get a file to write the data chunks:
+    # ... open a new temporary file if it does not exist
+    my $file = $transaction->body_filename; 
+    my $fh   = $transaction->body_fh; # and get the fh for the open file
+    seek($fh, 0, SEEK_END)
       or $self->log(LOGERROR, "failed to seek: $!"),
          $self->qp->respond(452, "Temporary storage allocation error"), 
          return (DONE);
 
-    # we're at the end of the file, now read the chunk
-    my $rest = $size % 4096;
-    my $num  = ($size-$rest)/4096;
-    my $i    = 0;
-    my $buffer;
-    my $bytes;
-
-    while($i < $num) {
-        $bytes = read(STDIN, $buffer, 4096);
-        if ($bytes != 4096) {
+    # we're at the end of the file, now read the chunk (and write it to $fh)
+    my $sum  = 0;
+    my ($buffer, $bytes, $left) = ("", 0, 0);
+
+    my $block = $self->{_bdat_block};
+    my ($rc, $msg);
+    while ($sum < $size) {
+        ($buffer, $bytes, $rc, $msg) = $self->$reader($block);
+        if ($rc) {
+            $self->log(LOGERROR, "Failed to read: $msg");
+            $self->qp->respond($rc, $msg);
+            return (DONE);
+        }
+        if (!defined $buffer or !$bytes) {
             $self->log(LOGERROR, "Failed to read: $!");
             $self->qp->respond(452, "Error reading your data");
             return (DONE);
@@ -161,74 +204,80 @@
             or $self->log(LOGERROR, "Failed to write: $!"),
                $self->qp->respond(452, "Temporary storage allocation error"), 
                return (DONE);
-        ++$i;
-    }
-    $bytes = read(STDIN, $buffer, $rest);
-    if ($bytes != $rest) {
-        $self->log(LOGERROR, "Failed to read: $!");
-        $self->qp->respond(452, "Error reading your data");
-        return (DONE);
+
+        $sum  += $bytes;
+        $left  = $size - $sum;
+        $block = ($left < $block) ? $left : $block;
     }
-    print $fh $buffer
-        or $self->log(LOGERROR, "Failed to write: $!"),
-           $self->qp->respond(452, "Temporary storage allocation error"), 
-           return (DONE);
     # ok, got the chunk on disk
+    $self->log(LOGDEBUG, "OK, got the chunk of $size bytes, LAST=$last");
 
     # let's see if the mail is too big... 
     # ...we can't do this before reading the chunk, because the BDAT command
     # requires us to read the chunk before responding
     my $max = $self->qp->config('databytes');
-    if ($max) {
-        if (($msg_size + $size) > $max) {
-            $self->qp->respond(552, "Message too big!");
-            return(DONE);
-        }
+    if ($max && (($msg_size + $size) > $max)) {
+        $self->qp->respond(552, "Message too big!");
+        # $self->qp->reset_transaction; ### FIXME: reset here?
+        return(DONE);
     }
     $transaction->notes('bdat_size', $msg_size + $size);
 
-    if (!$last) { # get the next chunk
+    unless ($last) { # get the next chunk
         $self->qp->respond(250, "Ok, got $size octets");
-        return (DONE);
+        return(DONE);
     } 
     # else 
-
+    
     # ... get the headers, run data_post & queue hooks
     $transaction->notes('bdat_last', 1);
-    seek($fh, 0, 0)
+    seek($fh, 0, SEEK_SET)
       or $self->log(LOGERROR, "Failed to seek: $!"),
          $self->qp->respond(452, "Temporary storage allocation error"), 
          return (DONE);
 
     $buffer = "";
     while (<$fh>) {
-        last if /^\r?\n$/;
+        if (/^\r?\n$/) {
+            seek($fh, -length($_), SEEK_CUR);
+            # the body starts here...
+            $self->transaction->set_body_start();
+            last;
+        }
         s/\r\n$/\n/;
         $buffer .= $_;
+        # if (length($buffer) > 50_000) ;
+        #     $self->qp->respond(500, "Header size too large")
+        #     return (DONE);
+        # }
     }
-    # the body starts here...
-    $self->transaction->set_body_start();
 
     my $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
     my @header = split /^/m, $buffer;
+    # undef $buffer;
     $header->extract([EMAIL PROTECTED]);
     $self->transaction->header($header);
 
-    my $authheader = (defined $self->{_auth} and $self->{_auth} == OK) 
-        ?  "(smtp-auth username $self->{_auth_user}, "
-          ."mechanism $self->{_auth_mechanism})\n" 
-        : "";
-
-    # no need for SMPT/ESMTP diff, we know we've just received via ESMTP (EHLO)
-    $header->add("Received", 
-        "from ".$self->connection->remote_info
-         # can/should/must this be EHLO instead of HELO?
-        ." (HELO ".$self->connection->hello_host.")"
-        ." (". $self->connection->remote_ip. ")\n "
-        .$authheader
-        ." by ".$self->qp->config('me')." (qpsmtpd/".$self->qp->version.") "
-        ."with ESMTP". ($authheader ? "A" : "")."; " # ESMPTA: RFC 3848
-        .(strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), 0);
+    my $rcvd_line;
+    ($rc, $rcvd_line) = $self->qp->run_hooks("received_line");
+    if ($rc != OK or not $rcvd_line) {
+        my $authheader = (defined $self->{_auth} and $self->{_auth} == OK) 
+            ?  "\t(smtp-auth username $self->{_auth_user}, "
+              ."mechanism $self->{_auth_mechanism})\n" 
+            : "";
+
+        $rcvd_line = "from ".$self->connection->remote_info
+             # can/should/must this be EHLO instead of HELO?
+            ." (HELO ".$self->connection->hello_host.")"
+            ." (". $self->connection->remote_ip. ")\n "
+            .$authheader
+            ."\tby ".$self->qp->config('me')." 
(qpsmtpd/".$self->qp->version.") "
+            # no need for SMPT/ESMTP diff, we know we've just received 
+            # via ESMTP (EHLO)
+            ."with ESMTP". ($authheader ? "A" : "")."; " # ESMPTA: RFC 3848
+            .(strftime('%a, %d %b %Y %H:%M:%S %z', localtime));
+    }
+    $header->add("Received", $rcvd_line, 0);
     
     # everything done for running data_post... 
     # this will call the spamassassin, virus scanner and queue plugins 
@@ -238,11 +287,52 @@
     $self->qp->run_hooks("data_post");
 
     # BDAT (0( LAST)?|$num LAST) is always the end of a "transaction"
-    # ... doesn't matter if it had done before
-    $self->qp->reset_transaction;
+    $self->qp->reset_transaction; # ... doesn't matter if it had done before
     return (DONE);
 }
 
+sub ap_read_block {
+    my ($self, $block_size) = @_; 
+    my $conn = $self->qp->{conn};
+    return (undef, 0, 452, "You don't see this, your connection is dead")
+      if $conn->aborted;
+
+    my $buffer;
+
+### This does not work if the client does not fetch the response after
+### every BDAT command... why should he fetch it, we're offering PIPELINING 
+#    my $sock    = $conn->client_socket;
+#    my $bytes   = eval { $sock->recv($buffer, $block_size) }; 
+#    if ($@ && ref $@ && $@ == APR::Const::TIMEUP()) {
+#        return (undef, 0, 452, "Timeout reading your data");
+#    }
+#    return ($buffer, $bytes);    
+
+    my $bb = $self->qp->{bb_in};
+    my $rc = $conn->input_filters->get_brigade($bb, 
+                        Apache2::Const::MODE_READBYTES(), 
+                        APR::Const::BLOCK_READ(), 
+                        $block_size);
+    return (undef, 0, 452, "You don't see this, got EOF") 
+      if $rc == APR::Const::EOF();
+    die APR::Error::strerror($rc)
+      unless $rc == APR::Const::SUCCESS();
+
+    $bb->flatten($buffer);
+    $bb->cleanup;
+    return ($buffer, length($buffer));
+}
+
+sub read_block {
+    my ($self, $block_size) = @_; 
+    my ($bytes, $buffer);
+    $bytes = read(STDIN, $buffer, $block_size);
+
+    return (undef, 0, 452, "Failed to read your data")
+      unless $bytes;
+    return ($buffer, $bytes);
+}
+
 sub hook_data {
     my ($self, $transaction) = @_;
     if ($transaction->notes('bdat_body_binarymime') 

Reply via email to