I attach a revision of the class SMTPDataInputStream which I submit for your consideration. This replaces the version which I mailed to this list on July 2.

The external behavior of the code has not changed. It still passes the JUnit tests.

This code should run faster. I had arranged the code for logical clarity as much as for executing speed. But Noel's comments made me realize just how rare would be the case in which this class enters its BUFFERING_STATE. It may never get into that state. Yet the code was testing if(receivedState == BUFFERING_STATE) for every byte read. Now that contingency is handled as a case in a switch, so the most common reading of ordinary sequences goes faster.

In addition to the file containing SMTPDataInputStream, I attach once again TestSIS.java (a file of JUnit tests). I also attach two little stub classes, for WatchDog and MessageSizeException, which you may find useful if you want to compile and test SMTPDataInputStream separate from a James installation.

Rich
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

/** 
 * <p>An InputStream for SMTP message body data. Performs four of the functions
 *  needed while receiving the message body in the SMTP DATA command:
 * <ol>
 *   <li>watches for the end of mail data indicator, and signals this by
 *   returning EOF, that is <code>-1</code>.
 *   </li>
 *   <li>removes dot stuffing as described in RFC 2821 section 4.5.2.
 *   </li>
 *   <li>works with james.util.watchdog.Watchdog to police the minimum rate of 
 *   data transfer.  Calls Watchdog.reset() every time a certain number of
 *   bytes have been read, thus forestalling intervention of the watchdog for
 *   another time increment.
 *   </li>
 *   <li>optionally polices the total size of the data.  Throws 
 *   MessageSizeException if this exceeds a limit.
 *  </li>
 * </ol>
 * </p>
 * <p>The end of mail data indicator which this class recognizes is a period
 * alone in a line.  This indicator is often described as "CRLF.CRLF", but that
 * description leads to errors in possibly minor ways.  The better 
 * description which this class recognizes, "a period alone in a line", leads
 * to better behavior in two ways:
 * <ol>
 *   <li>When the end of mail data indicator is recognized in the input stream,
 *    the CRLF which immediately preceded the period in the indicator is 
 *    returned as part of the mail data as the CRLF which concludes the final
 *    line of mail data, rather than being discarded as part of the end of mail
 *    data indicator.
 *   </li>
 *   <li>The end of mail data indicator can occur in the very first line of 
 *   mail data, with the period being the first character read.
 *   </li>
 * </ol>
 * RFC 2821 discusses this in sections 2.3.7, 3.3, 4.1.1.4, 4.5.2.
 * </p>
 * <p> This class resets the WatchDog each time it has read a quota of bytes
 * as specified in the constructor.  But it does not reset or stop the 
 * WatchDog when it recognizes the end of mail data indicator and returns EOF.
 * </p>
 * <p>This class returns EOF in two circumstances: when it recognizes the end 
 * of mail data indicator in the stream (a normal occurrence); when the 
 * underlying stream signals EOF (probably an error of some sort).  This 
 * behavior may be okay, in that it mimics the behavior of the earlier James 
 * class CharTerminatedInputStream, but it may need further examination at some
 * point.
 * </p>
 * <p>An instance of this class can not be reset.  A new instance must 
 * constructed for each message's data.
 * </p>
 */
public class SMTPDataInputStream extends InputStream{
    BufferedInputStream in;

    /* For a discussion of some decisions made in designing this class,
     * see the comment at the end.
     */

    // The kinds of bytes we care about
    static final int
        EOF    = -1,
        CR     = 13,
        LF     = 10,
        PERIOD = 46;

    //the states in which this SMTPDataInputStream may be
    static final int
        LINE_STARTING_STATE  = 0, //at the start of a line
        MID_LINE_STATE       = 1, //the most common state
        CR_STATE             = 2, //a CR has been received        
        INIT_PERIOD_STATE    = 3, //a period at start of line
        INIT_PERIOD_CR_STATE = 4, //initial period then CR
        BUFFERING_STATE      = 5, //see comments further down
        EOF_STATE            = 6; //either EOF of end of data

    //the variable in which we keep the present state
    private int receivedState = LINE_STARTING_STATE;
        
    /* This comment describes the strategy for monitoring message size and
     * data transfer rate.  The five variables below serve these purposes.
     * 
     * The use of maxMessageSize should be obvious, but note that if it is
     * set to zero then it signals that there is no limit on message size.
     * 
     * Both of the limits (message size and data rate) are checked with one
     * operation in the most frequently used code by using a quota, kept in
     * currentQuota.  When the quota is reached (by decrementing 
     * bytesRemainingInQuota) then the code detects which limit has been 
     * reached and responds accordingly.
     * 
     * Here is an example showing what may be a typical case.  Assume we have 
     * an instance of this class constructed so that maxMessageSize is 
     * 1,000,000 and minBytesPerUnitTime is 300,000.  The constructor will set
     * currentQuota to 300,000, the first quota toward which we count, and 
     * remnantQuotaThreshold to 700,000, the number above which we need to
     * start looking out for exceeding maxMessageSize.
     * 
     * In operation bytesSummedSoFar will be increased by 300,000 each time
     * the quota is read: to 300,000, then 600,000, then 900,000.  When 
     * it reaches 900,000 it exceeds remnantQuotaThreshold (700,000) for the 
     * first time, so now the quota for the remainder of the message is set to
     * 100,001, the number needed to trigger a MessageSizeException.
     */    
    long maxMessageSize,
         bytesSummedSoFar,
         remnantQuotaThreshold;
    int currentQuota,
        bytesRemainingInQuota;
        
    WatchDog watchDog;
    
    /**
     * Constructor with the option to set maximum message size.
     * 
     * @param in the BufferedInputStream from which this will read
     * @param maxMessageSize the limit on the number of bytes which this will
     * return.  But, if set to zero, signifies that there is no limit.
     * @param watchDog a WatchDog object which will intervene (elsewhere, not
     * in this class) if too much time passes without it being reset. 
     * @param minBytesPerUnitTime the number of bytes to be read before each
     * call to <code>WatchDog.reset()</code>
     */
    public SMTPDataInputStream(BufferedInputStream in, long maxMessageSize, 
                WatchDog watchDog, int minBytesPerUnitTime) {
        if(in == null || maxMessageSize < 0 || 
            watchDog == null || minBytesPerUnitTime < 1)
        {   
            throw new IllegalArgumentException(
                        "Illegal argument to SMTPDataInputStream constructor");
        }
        this.in = in;
        this.maxMessageSize = maxMessageSize;
        this.watchDog = watchDog;
        if (maxMessageSize == 0){
            //there is no limit on message size, so ...
            currentQuota = minBytesPerUnitTime;
        }
        else{
            //currentQuota might be determined by maxMessageSize
            currentQuota = 
               (int)Math.min(maxMessageSize + 1, minBytesPerUnitTime);
            remnantQuotaThreshold = maxMessageSize - minBytesPerUnitTime;
        }
        bytesRemainingInQuota = currentQuota;
    }

    /**
     * Constructor for a message stream of unlimited size.
     * 
     * @param in the BufferedInputStream from which this will read
     * @param watchDog a WatchDog object which will intervene (elsewhere, not
     * in this class) if too much time passes without it being reset. 
     * @param minBytesPerUnitTime the number of bytes to be read before each
     * call to <code>WatchDog.reset()</code>
     */
    public SMTPDataInputStream(BufferedInputStream in,
                WatchDog watchDog, int minBytesPerUnitTime) {
        this(in, 0, watchDog, minBytesPerUnitTime);
    }
            
    /* This declares a buffer for one character.  We need to use this in
       one rare circumstance.  For more information look in the
       switch-case below for INIT_PERIOD_CR_STATE.
     */
    private int characterBuffer;

    /**
     * Reads the next byte of data from the input stream.  Removes dot stuffing
     * and watches for the SMTP end of mail data indicator, which is a period
     * alone in a line.  This method blocks until input data is available, the 
     * end of mail data indicator is recognized, the end of the underlying 
     * stream is reached, or an exception is thrown.
     * 
     * Monitors the number of bytes that have been read, and causes the 
     * Watchdog to be reset each time minBytesPerUnitTime bytes have been read.
     * This will also throw a MessageSizeException if an attempt to read more 
     * than the maximum number of bytes would return anything other than
     * <code>-1</code>.
     * 
     * @return the next byte of data, or <code>-1</code> (EOF) if the end of 
     * mail data indicator is recognized or the end of the underlying stream is
     * reached.
     * @throws IOException if an I/O error occurs in reading from the 
     * underlying stream.
     * @throws MessageSizeException upon attempt to read one byte more than
     * the set maximum.
     */
    public int read() throws IOException{         
        /* Strategy: This stream is always in one of the seven states
         * enumerated above.  This state is kept in receivedState.
         * The state starts in LINE_STARTING_STATE.  On each
         * call we act based upon the current receivedState and we
         * update receivedState to reflect the new state. 
         */
         
        int b; //the byte we will get and return

        /* We have a while loop since we may need to read as many as three
         * characters from the underlying stream before we know what to return.
         * This while loop is exited from one of the branches with either 
         * "return EOF;" or "break getByte;".
         */
        getByte: while(true){
            switch(receivedState){
                case MID_LINE_STATE:
                    switch(b = in.read()){  
                        case CR:
                            receivedState = CR_STATE;
                            break getByte;
                        case EOF:
                            receivedState = EOF_STATE;
                            return EOF;
                        default:
                            //receivedState remains MID_LINE_STATE
                            break getByte;
                    }
                case CR_STATE:
                    switch(b = in.read()){    
                        case LF:
                            //We have received CRLF, the normal end of a line
                            receivedState = LINE_STARTING_STATE;
                            break getByte;
                        case CR: 
                            //receivedState remains CR_STATE
                            break getByte;
                        case EOF:
                            receivedState = EOF_STATE;
                            return EOF;
                        default:
                            receivedState = MID_LINE_STATE;
                            break getByte;
                    }                    
                case LINE_STARTING_STATE:
                    switch(b = in.read()){   
                        case PERIOD:
                            receivedState = INIT_PERIOD_STATE;
                            /* We need to read more from the underlying
                             stream before we know what, if anything,
                             to return.
                             */
                            continue getByte;
                        case CR:
                            receivedState = CR_STATE;
                            break getByte;
                        case EOF:
                            receivedState = EOF_STATE;
                            return EOF;
                        default:
                            //This is the normal start of a new line.
                            receivedState = MID_LINE_STATE;
                            break getByte;
                    }
                case INIT_PERIOD_STATE:
                    switch(b = in.read()){  
                        case CR:
                            /* We may be receiving the end of data indicator.
                               We need to read one more character before we know.
                             */
                            receivedState = INIT_PERIOD_CR_STATE;
                            continue getByte;
                        case EOF:
                            receivedState = EOF_STATE;
                            return EOF;
                        default:
                            /* This is a case of dot stuffing.  We return
                               this character but not the dot which preceded it.
                             */
                            receivedState = MID_LINE_STATE;
                            break getByte;
                    }            
                case INIT_PERIOD_CR_STATE:
                    switch(b = in.read()){  
                        case LF:
                             //we have reached the end of data indicator
                            receivedState = EOF_STATE;
                            return EOF;
                        case EOF:
                            receivedState = EOF_STATE;
                            return EOF;
                        default:
                            /* We have received the sequence
                               PERIOD CR
                               at the beginning of a line, but it is followed
                               by something other than LF .
                               So this is an unusual case of dot stuffing.
                               We return CR, and buffer b to return on the next
                               call.
                             */
                            receivedState = BUFFERING_STATE;
                            characterBuffer = b;
                            b = CR;
                            break getByte;
                    }
                case EOF_STATE: 
                    return EOF;
                case BUFFERING_STATE: 
                    if (characterBuffer == CR){
                        receivedState = CR_STATE;
                    }
                    else{
                        receivedState = MID_LINE_STATE;
                    }
                    b = characterBuffer;
                    break getByte;   
            }//switch (receivedState)
        }//getByte: while(true)
        
        /* If we arrive here we have a byte in b ready to return.  Before
         * returning it we check the status of the quota.
         */
        if (--bytesRemainingInQuota <= 0){
            if (maxMessageSize != 0){//if this is a size-limited stream
                bytesSummedSoFar += currentQuota;
                if (bytesSummedSoFar > remnantQuotaThreshold){
                    if (bytesSummedSoFar > maxMessageSize){
                        throw new MessageSizeException("Maximum message size of "
                             + maxMessageSize + " exceeded.");
                    }else{
                        //next quota determined by maxMessageSize
                        currentQuota =
                            (int)((maxMessageSize - bytesSummedSoFar) + 1);
                    }
                }
            }
            watchDog.reset();
            bytesRemainingInQuota = currentQuota;
        }
        return b;
    }//method read()

    
    public void close() throws IOException{
        super.close();
    }

    /* This class was designed to fit in James as it existed in June 2003.
     * This comment describes my (Richard Hammer's) design decisions 
     * in writing this class.
     * 
     * In James, in SMTPHandler, the InputStream from a Socket is read 
     * alternately by two other classes.  First CRLFTerminatedReader reads 
     * SMTP envelope command lines.  Then, when it comes time to read message
     * data, this class picks up reading from the same InputStream, reading 
     * until the end of mail data is indicated, at which point this class
     * signals EOF.  But then more envelope command lines are read from the
     * underlying InputStream by CRLFTerminatedReader again.
     *  
     * In this environment each of the classes which read from the underlying
     * InputStream must leave it in the right place.  And it makes sense to 
     * have buffering done in the underlying InputStream.  Redundant 
     * buffering does not need to be done in either CRLFTerminatedReader or 
     * this class.  To help establish this practice, the constructors of this
     * class require a BufferedInputStream.
     * 
     * This class overrides InputStream method read(), but not read(byte[]) 
     * or read(byte[], int, int).  Those two InputStream methods work by making
     * repeated calls to read() in this class.  Thereby the bytes returned by
     * those methods deliver the modified functionality promised by this class.
     * 
     * Wanting to be a good boy scout, I undertook to write a 
     * read(byte[], int, int) method for this class.  What I got was large and
     * complex, and in the end I was not convinced that I had improved upon 
     * this class in its present form (in which it overrides only the one
     * read() method).
     * 
     * A difficulty arose in satisfying a part of the InputStream contract for
     * read(byte[], int, int).  That promises that no bytes will be changed in
     * the byte array except those bytes indicated by the number of the return
     * value.  In order to fulfill this promise it looked like I would have to
     * buffer in a byte array in this class, because this class needs to read
     * more bytes than it returns in the cases where it gets a period at the
     * start of a line.  Thus it looked like I would need to duplicate 
     * buffering already done in the supporting BufferedInputStream.  This may
     * not have improved performance, which was the only motive for writing
     * the method.
     *  
     * Early on I considered extending BufferedInputStream instead of 
     * InputStream.  But, in addition to the above-mentioned disadvantage of
     * buffering here rather than in the underlying stream, I discovered that 
     * extending BufferedInputStream would have required that I override
     * read(byte[], int, int).  This is because the BufferedInputStream
     * method read(byte[], int, int) --  unlike the same method in InputStream
     * -- gets its bytes from somewhere deeper.  The BufferedInputStream method
     * does not work by making repeated calls to read() in this class, so it
     * would not provide the additional functionality of this class.  This may
     * be obvious to more seasoned programmers, but it was not obvious to me at
     * the outset.
     */
}
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;


import junit.framework.TestCase;

public class TestSIS extends TestCase {
    SMTPDataInputStream sis;
    
    public TestSIS(String whooo){
        super(whooo);
    }

    static String food = 
          ".dogs\r. sp\n"
        + ".\r\n"
        + ".\r8\r\n"
        + ".\r\r\no*"
        + " \r\n.\r\n"
        + "I guess we shouldn't get here.\r\n";

    //test response to a challenging sequence of characters
    public void testFood0() throws Exception {
        sis=constructSIS(food, 25, 7);
        assertEquals((byte)sis.read(),(byte)'d');
        assertEquals((byte)sis.read(),(byte)'o');
        assertEquals((byte)sis.read(),(byte)'g');
        assertEquals((byte)sis.read(),(byte)'s');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)'.');
        assertEquals((byte)sis.read(),(byte)' ');
        assertEquals((byte)sis.read(),(byte)'s');
        assertEquals((byte)sis.read(),(byte)'p');
        assertEquals((byte)sis.read(),(byte)012);
        assertEquals((byte)sis.read(),(byte)'.');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)10);
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)'8');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)10);
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)10);
        assertEquals((byte)sis.read(),(byte)'o');
        assertEquals((byte)sis.read(),(byte)'*');
        assertEquals((byte)sis.read(),(byte)' ');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)10);
        assertEquals((byte)sis.read(),(byte)-1);
        assertEquals((byte)sis.read(),(byte)-1);
        assertEquals((byte)sis.read(),(byte)-1);
        sis.close();        
    }

    //test attempt to read beyond maxMessageSize
    public void testFood1() throws Exception {
        sis=constructSIS(food, 6, 7);
        assertEquals((byte)sis.read(),(byte)'d');
        assertEquals((byte)sis.read(),(byte)'o');
        assertEquals((byte)sis.read(),(byte)'g');
        assertEquals((byte)sis.read(),(byte)'s');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)'.');
        Exception ex = null;
        try{
            sis.read();
        }catch (Exception e){
            ex = e;
        }        
        assertTrue(ex instanceof MessageSizeException);
        sis.close();        
    }
    
    //test same with different reset interval
    public void testFood2() throws Exception {
        sis=constructSIS(food, 6, 1);
        assertEquals((byte)sis.read(),(byte)'d');
        assertEquals((byte)sis.read(),(byte)'o');
        assertEquals((byte)sis.read(),(byte)'g');
        assertEquals((byte)sis.read(),(byte)'s');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)'.');
        Exception ex = null;
        try{
            sis.read();
        }catch (Exception e){
            ex = e;
        }        
        assertTrue(ex instanceof MessageSizeException);
        sis.close();        
    }
    
    //test same with different reset interval
    public void testFood3() throws Exception {
        sis=constructSIS(food, 6, 6);
        assertEquals((byte)sis.read(),(byte)'d');
        assertEquals((byte)sis.read(),(byte)'o');
        assertEquals((byte)sis.read(),(byte)'g');
        assertEquals((byte)sis.read(),(byte)'s');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)'.');
        Exception ex = null;
        try{
            sis.read();
        }catch (Exception e){
            ex = e;
        }        
        assertTrue(ex instanceof MessageSizeException);
        sis.close();        
    }

    //test same with different reset interval
    public void testFood4() throws Exception {
        sis=constructSIS(food, 6, 3);
        assertEquals((byte)sis.read(),(byte)'d');
        assertEquals((byte)sis.read(),(byte)'o');
        assertEquals((byte)sis.read(),(byte)'g');
        assertEquals((byte)sis.read(),(byte)'s');
        assertEquals((byte)sis.read(),(byte)13);
        assertEquals((byte)sis.read(),(byte)'.');
        Exception ex = null;
        try{
            sis.read();
        }catch (Exception e){
            ex = e;
        }        
        assertTrue(ex instanceof MessageSizeException);
        sis.close();        
    }

    
    //test a set of illegal arguments to the constructor
    public void testFood5() throws Exception {
        Throwable t0 = null;
        try{
            sis=new SMTPDataInputStream(null, 6, new WatchDog(), 3);
        }catch (Throwable t){
            t0=t;
        }
        assertTrue(t0 instanceof IllegalArgumentException);

        t0 = null;
        try{
            sis=constructSIS(food, -1, 8);
        }catch (Throwable t){
            t0=t;
        }
        assertTrue(t0 instanceof IllegalArgumentException);


        t0 = null;
        try{
            sis=constructSIS(food, 4, 0);
        }catch (Throwable t){
            t0=t;
        }
        assertTrue(t0 instanceof IllegalArgumentException);

        if (sis != null) sis.close();        
    }
    
    //test read(byte[])
    public void testFood6() throws Exception {
        sis=constructSIS(food, 40, 600);
        byte[] b = new byte[5];
        assertEquals(5, sis.read(b));
        assertEquals(b[0],(byte)'d');
        assertEquals(b[4],(byte)13);
        assertEquals((byte)sis.read(),(byte)'.');
        sis.close();        
    }

    //test read(byte[], int, int)
    public void testFood7() throws Exception {
        sis=constructSIS(food, 6880, 32);
        byte[] b = {
            (byte) 71,
            (byte) 72,
            (byte) 73,
            (byte) 74,
            (byte) 75,
            (byte) 76,
            (byte) 77,
            (byte) 78,
            (byte) 79,
            (byte) 80 };
            
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        sis.read();
        assertEquals(4, sis.read(b, 3, 4));
        
        assertEquals(b[0],(byte) 71);
        assertEquals(b[1],(byte) 72);
        assertEquals(b[2],(byte) 73);
        assertEquals(b[3],(byte)10);
        assertEquals(b[4],(byte)13);
        assertEquals(b[5],(byte)'8');
        assertEquals(b[6],(byte)13);
        assertEquals(b[7],(byte)78);
        assertEquals(b[8],(byte)79);
        assertEquals(b[9],(byte)80);
        assertEquals((byte)sis.read(),(byte)10);
        sis.close();
    }
    
    //test the exact-length end
    public void testExactEnd() throws Exception{
        sis=constructSIS("abcdefghij\r\n.\r\n and more too", 12, 5);
        byte[] b = new byte[3992];
        assertEquals(sis.read(b), 12);
        assertEquals(sis.read(b, 498, 500), -1);
    }

    //test a zero length inputStream
    public void testSmall() throws Exception {
        sis=constructSIS("", 6, 7);
        assertEquals((byte)sis.read(),(byte)-1);
        assertEquals((byte)sis.read(),(byte)-1);
        sis.close();        
    }
    

    
    private SMTPDataInputStream constructSIS (String food, int maxSize, 
                    int resetSize)throws Exception {
        return new SMTPDataInputStream(
            new BufferedInputStream(
            new ByteArrayInputStream
            (
                food.getBytes("ASCII"))),
                maxSize,
                new WatchDog(){
                    void reset(){
                        System.out.println("Hey, I been reset");
                    }
                  },
                resetSize
            );
    }    
}
class WatchDog{
    void reset(){}
}
import java.io.IOException;
    
class MessageSizeException extends IOException{
    MessageSizeException(String s){
        super(s);
    }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to