Author: rhs
Date: Thu May 22 14:43:13 2008
New Revision: 659271

URL: http://svn.apache.org/viewvc?rev=659271&view=rev
Log:
Made Range, RangeSet, and Session all use proper RFC1982 comparisons per 
QPID-861. Also switched command ids from long -> int, and added a mutex to 
channel to prevent multi-frame commands from interleaving when invoked from 
separate threads.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
    
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/util/SerialTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Thu May 22 14:43:13 2008
@@ -173,7 +173,7 @@
             {
                 if( messageTag <= deliveryTag )
                 {
-                    ranges.add(messageTag);
+                    ranges.add((int) (long) messageTag);
                     _unacknowledgedMessageTags.remove(messageTag);
                 }
             }
@@ -182,7 +182,7 @@
         }
         else
         {
-            ranges.add(deliveryTag);
+            ranges.add((int) deliveryTag);
             _unacknowledgedMessageTags.remove(deliveryTag);
         }
         getQpidSession().messageAcknowledge(ranges);
@@ -287,7 +287,7 @@
             {
                 break;
             }
-            ranges.add(tag);
+            ranges.add((int) (long) tag);
         }
         getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
         // We need to sync so that we get notify of an error.
@@ -311,7 +311,7 @@
                 break;
             }
 
-            ranges.add(tag);
+            ranges.add((int) (long) tag);
         }
         getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
     }
@@ -326,7 +326,7 @@
     {
         // The value of requeue is always true
         RangeSet ranges = new RangeSet();
-        ranges.add(deliveryTag);
+        ranges.add((int) deliveryTag);
         getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
         //I don't think we need to sync
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Thu May 22 14:43:13 2008
@@ -358,7 +358,7 @@
         if (!_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            ranges.add(message.getDeliveryTag());
+            ranges.add((int) message.getDeliveryTag());
             _0_10session.getQpidSession().messageAcknowledge(ranges);
             _0_10session.getCurrentException();
         }
@@ -375,7 +375,7 @@
         if (_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            ranges.add(message.getDeliveryTag());
+            ranges.add((int) message.getDeliveryTag());
             _0_10session.getQpidSession().messageRelease(ranges);
             _0_10session.getCurrentException();
         }
@@ -394,7 +394,7 @@
         if (!_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            ranges.add(message.getDeliveryTag());
+            ranges.add((int) message.getDeliveryTag());
 
             Acquired acq = 
_0_10session.getQpidSession().messageAcquire(ranges).get();
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
 Thu May 22 14:43:13 2008
@@ -39,7 +39,7 @@
      * 
      * @param transferId
      */
-    public void messageTransfer(long transferId);
+    public void messageTransfer(int transferId);
     
     /**
      * Add the following headers ( [EMAIL PROTECTED] 
org.apache.qpidity.DeliveryProperties}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
 Thu May 22 14:43:13 2008
@@ -27,7 +27,7 @@
     private int _dataSize;
     private DeliveryProperties _currentDeliveryProps;
     private MessageProperties _currentMessageProps;
-    private long _transferId;
+    private int _transferId;
     private Header _header;
 
     public void setHeader(Header header) {
@@ -44,12 +44,12 @@
         _currentMessageProps = new MessageProperties();
     }
 
-    public ByteBufferMessage(long transferId)
+    public ByteBufferMessage(int transferId)
     {
         _transferId = transferId;
     }
 
-    public long getMessageTransferId()
+    public int getMessageTransferId()
     {
         return _transferId;
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
 Thu May 22 14:43:13 2008
@@ -89,7 +89,7 @@
      * does not have a transfer id. Hence this method is not
      * applicable to this implementation.    
      */
-    public long getMessageTransferId()
+    public int getMessageTransferId()
     {
         throw new UnsupportedOperationException();
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
 Thu May 22 14:43:13 2008
@@ -26,7 +26,7 @@
                _adaptee = listener;
     }
 
-    public void messageTransfer(long transferId)
+    public void messageTransfer(int transferId)
     {
         _currentMsg = new ByteBufferMessage(transferId);
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
 Thu May 22 14:43:13 2008
@@ -61,7 +61,7 @@
      * does not have a transfer id. Hence this method is not
      * applicable to this implementation.    
      */
-    public long getMessageTransferId()
+    public int getMessageTransferId()
     {
         throw new UnsupportedOperationException();
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
 Thu May 22 14:43:13 2008
@@ -199,7 +199,7 @@
 
     private void waitFor(List<Message> received, int count) throws 
InterruptedException
     {
-        long timeout = 6000;
+        long timeout = 20000;
 
         synchronized (received)
         {

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java
 Thu May 22 14:43:13 2008
@@ -3,119 +3,75 @@
 import org.apache.qpid.SerialException;
 
 /**
- * This class provides basic
- * serial number arithmetic as defined in RFC 1982.
+ * This class provides basic serial number comparisons as defined in
+ * RFC 1982.
  */
 
 public class Serial
 {
-    private long _maxIncrement;
-    private long _max;
-    private long _maxComparison;
-
-    public Serial(long serialbits)
-    {
-        if( serialbits < 2)
-        {
-            throw new IllegalArgumentException("Meaningful serial number space 
has SERIAL_BITS >= 2, wrong value "
-                    + serialbits);
-        }
-        _max = (long) Math.pow(2.0 , serialbits) - 1;
-        _maxIncrement = (long) Math.pow(2.0, serialbits - 1) - 1;
-        _maxComparison = (long) Math.pow(2.0, serialbits -1);
-    }
 
     /**
      * Compares two numbers using serial arithmetic.
      *
-     * @param serial1 The first serial number
-     * @param serial2 The second serial number
-     * @return 0 if the 2 serials numbers are equal, a positive number if 
serial1 is greater
-     *         than serial2, and a negative number if serial2 is greater than 
serial1.
-     * @throws IllegalArgumentException serial1 or serial2 is out of range
-     * @throws SerialException serial1 and serial2 are not comparable.
+     * @param s1 the first serial number
+     * @param s2 the second serial number
+     *
+     * @return a negative integer, zero, or a positive integer as the
+     * first argument is less than, equal to, or greater than the
+     * second
      */
-    public int compare(long serial1, long serial2) throws 
IllegalArgumentException, SerialException
+    public static final int compare(int s1, int s2)
     {
-        int result;
-        if (serial1 < 0 || serial1 > _max)
-        {
-            throw new IllegalArgumentException(serial1 + " out of range");
-        }
-        if (serial2 < 0 || serial2 > _max)
-        {
-            throw new IllegalArgumentException(serial2 + " out of range");
-        }        
-        double diff;
-        if( serial1 < serial2 )
-        {
-           diff = serial2 - serial1; 
-           if( diff < _maxComparison )
-           {
-             result = -1;
-           }
-           else if ( diff > _maxComparison )
-           {
-               result = 1;
-           }
-           else
-           {
-               throw new SerialException("Cannot compare " + serial1 + " and " 
+ serial2);
-           }
-        }
-        else if( serial1 > serial2 )
+        return s1 - s2;
+    }
+
+    public static final boolean lt(int s1, int s2)
+    {
+        return compare(s1, s2) < 0;
+    }
+
+    public static final boolean le(int s1, int s2)
+    {
+        return compare(s1, s2) <= 0;
+    }
+
+    public static final boolean gt(int s1, int s2)
+    {
+        return compare(s1, s2) > 0;
+    }
+
+    public static final boolean ge(int s1, int s2)
+    {
+        return compare(s1, s2) >= 0;
+    }
+
+    public static final boolean eq(int s1, int s2)
+    {
+        return s1 == s2;
+    }
+
+    public static final int min(int s1, int s2)
+    {
+        if (lt(s1, s2))
         {
-           diff = serial1 - serial2;
-           if( diff > _maxComparison )
-           {
-             result = -1;
-           }
-           else if( diff < _maxComparison )
-           {
-               result = 1;
-           }           
-           else
-           {
-               throw new SerialException("Cannot compare " + serial1 + " and " 
+ serial2);
-           }
+            return s1;
         }
         else
         {
-            result = 0;
+            return s2;
         }
-        return result;
     }
 
- 
-    /**
-     * Increments a serial numbers by the addition of a positive integer n,
-     * Serial numbers may be incremented by the addition of a positive
-     * integer n, where n is taken from the range of integers
-     * [0 .. (2^(SERIAL_BITS - 1) - 1)].  For a sequence number s, the
-     * result of such an addition, s', is defined as
-     *              s' = (s + n) modulo (2 ^ SERIAL_BITS)
-     * @param serial The serila number to be incremented
-     * @param n      The integer to be added to the serial number
-     * @return The incremented serial number
-     * @throws IllegalArgumentException serial number or n is out of range
-     */
-    public long increment(long serial, long n) throws IllegalArgumentException
+    public static final int max(int s1, int s2)
     {
-        if (serial < 0 || serial > _max)
-        {
-            throw new IllegalArgumentException("Serial number: " + serial + " 
is out of range");
-        }
-        if( n < 0 || n > _maxIncrement )
+        if (gt(s1, s2))
         {
-            throw new IllegalArgumentException("Increment: " + n + " is out of 
range");
+            return s1;
         }
-        long result = serial + n;
-        // apply modulo (2 ^ SERIAL_BITS)
-        if(result > _max)
+        else
         {
-            result = result - _max - 1;
+            return s2;
         }
-        return result;
     }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
 Thu May 22 14:43:13 2008
@@ -121,6 +121,6 @@
      *
      * @return the message transfer id.
      */
-    public long getMessageTransferId();
+    public int getMessageTransferId();
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
 Thu May 22 14:43:13 2008
@@ -20,13 +20,15 @@
  */
 package org.apache.qpidity.transport;
 
+import org.apache.qpidity.transport.network.Frame;
 import org.apache.qpidity.transport.util.Logger;
 
 import java.nio.ByteBuffer;
 
-import java.util.List;
 import java.util.ArrayList;
-
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.qpidity.transport.network.Frame.*;
 import static org.apache.qpidity.transport.util.Functions.*;
@@ -51,6 +53,7 @@
     // session may be null
     private Session session;
 
+    private Lock commandLock = new ReentrantLock();
     private boolean first = true;
     private ByteBuffer data = null;
 
@@ -156,7 +159,17 @@
 
     public void method(Method m)
     {
+        if (m.getEncodedTrack() == Frame.L4)
+        {
+            commandLock.lock();
+        }
+
         emit(m);
+
+        if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
+        {
+            commandLock.unlock();
+        }
     }
 
     public void header(Header header)
@@ -190,6 +203,7 @@
         emit(new Data(data, first, true));
         first = true;
         data = null;
+        commandLock.unlock();
     }
 
     protected void invoke(Method m)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
 Thu May 22 14:43:13 2008
@@ -77,14 +77,14 @@
 
     public void received(ConnectionEvent event)
     {
-        log.debug("RECV: %s", event);
+        log.debug("RECV: [%s] %s", this, event);
         Channel channel = getChannel(event.getChannel());
         channel.received(event.getProtocolEvent());
     }
 
     public void send(ConnectionEvent event)
     {
-        log.debug("SEND: %s", event);
+        log.debug("SEND: [%s] %s", this, event);
         sender.send(event);
     }
 
@@ -135,4 +135,9 @@
         sender.close();
     }
 
+    public String toString()
+    {
+        return String.format("conn:%x", System.identityHashCode(this));
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
 Thu May 22 14:43:13 2008
@@ -39,15 +39,15 @@
     }
 
     // XXX: command subclass?
-    private long id;
+    private int id;
     private boolean sync = false;
 
-    public final long getId()
+    public final int getId()
     {
         return id;
     }
 
-    void setId(long id)
+    void setId(int id)
     {
         this.id = id;
     }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
 Thu May 22 14:43:13 2008
@@ -64,4 +64,9 @@
         delegate.error(this);
     }
 
+    public String toString()
+    {
+        return String.format("protocol error: %s", getMessage());
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
 Thu May 22 14:43:13 2008
@@ -20,7 +20,7 @@
  */
 package org.apache.qpidity.transport;
 
-import static java.lang.Math.*;
+import static org.apache.qpid.util.Serial.*;
 
 
 /**
@@ -31,28 +31,28 @@
 
 public class Range
 {
-    private final long lower;
-    private final long upper;
+    private final int lower;
+    private final int upper;
 
-    public Range(long lower, long upper)
+    public Range(int lower, int upper)
     {
         this.lower = lower;
         this.upper = upper;
     }
 
-    public long getLower()
+    public int getLower()
     {
         return lower;
     }
 
-    public long getUpper()
+    public int getUpper()
     {
         return upper;
     }
 
-    public boolean includes(long value)
+    public boolean includes(int value)
     {
-        return lower <= value && value <= upper;
+        return le(lower, value) && le(value, upper);
     }
 
     public boolean includes(Range range)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
 Thu May 22 14:43:13 2008
@@ -25,6 +25,7 @@
 import java.util.ListIterator;
 import java.util.LinkedList;
 
+import static org.apache.qpid.util.Serial.*;
 
 /**
  * RangeSet
@@ -72,7 +73,7 @@
                 it.remove();
                 range = range.span(next);
             }
-            else if (range.getUpper() < next.getLower())
+            else if (lt(range.getUpper(), next.getLower()))
             {
                 it.previous();
                 it.add(range);
@@ -83,12 +84,12 @@
         it.add(range);
     }
 
-    public void add(long lower, long upper)
+    public void add(int lower, int upper)
     {
         add(new Range(lower, upper));
     }
 
-    public void add(long value)
+    public void add(int value)
     {
         add(value, value);
     }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 Thu May 22 14:43:13 2008
@@ -33,6 +33,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.qpidity.transport.Option.*;
+import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.util.Serial.*;
 
 /**
  * Session
@@ -42,6 +44,11 @@
 
 public class Session extends Invoker
 {
+
+    private static final Logger log = Logger.get(Session.class);
+
+    private static boolean ENABLE_REPLAY = false;
+
     static
     {
         String enableReplay = "enable_command_replay";
@@ -54,8 +61,6 @@
             ENABLE_REPLAY = false;
         }
     }
-    private static boolean ENABLE_REPLAY = false;
-    private static final Logger log = Logger.get(Session.class);
 
     private byte[] name;
     private long timeout = 60000;
@@ -64,15 +69,15 @@
     Channel channel;
 
     // incoming command count
-    long commandsIn = 0;
+    int commandsIn = 0;
     // completed incoming commands
     private final RangeSet processed = new RangeSet();
     private Range syncPoint = null;
 
     // outgoing command count
-    private long commandsOut = 0;
-    private Map<Long,Method> commands = new HashMap<Long,Method>();
-    private long maxComplete = -1;
+    private int commandsOut = 0;
+    private Map<Integer,Method> commands = new HashMap<Integer,Method>();
+    private int maxComplete = commandsOut - 1;
 
     private AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -86,22 +91,22 @@
         return name;
     }
 
-    public Map<Long,Method> getOutstandingCommands()
+    public Map<Integer,Method> getOutstandingCommands()
     {
         return commands;
     }
 
-    public long getCommandsOut()
+    public int getCommandsOut()
     {
         return commandsOut;
     }
 
-    public long getCommandsIn()
+    public int getCommandsIn()
     {
         return commandsIn;
     }
 
-    public long nextCommandId()
+    public int nextCommandId()
     {
         return commandsIn++;
     }
@@ -116,12 +121,12 @@
         processed(command.getId());
     }
 
-    public void processed(long command)
+    public void processed(int command)
     {
         processed(new Range(command, command));
     }
 
-    public void processed(long lower, long upper)
+    public void processed(int lower, int upper)
     {
 
         processed(new Range(lower, upper));
@@ -155,7 +160,7 @@
 
     void syncPoint()
     {
-        long id = getCommandsIn() - 1;
+        int id = getCommandsIn() - 1;
         log.debug("%s synced to %d", this, id);
         Range range = new Range(0, id - 1);
         boolean flush;
@@ -179,7 +184,7 @@
         channel.setSession(this);
     }
 
-    public Method getCommand(long id)
+    public Method getCommand(int id)
     {
         synchronized (commands)
         {
@@ -187,18 +192,18 @@
         }
     }
 
-    void complete(long lower, long upper)
+    void complete(int lower, int upper)
     {
         log.debug("%s complete(%d, %d)", this, lower, upper);
         synchronized (commands)
         {
-            for (long id = maxComplete; id <= upper; id++)
+            for (int id = max(maxComplete, lower); le(id, upper); id++)
             {
                 commands.remove(id);
             }
-            if (lower <= maxComplete + 1)
+            if (le(lower, maxComplete + 1))
             {
-                maxComplete = Math.max(maxComplete, upper);
+                maxComplete = max(maxComplete, upper);
             }
             commands.notifyAll();
             log.debug("%s   commands remaining: %s", this, commands);
@@ -211,7 +216,7 @@
         {
             synchronized (commands)
             {
-                long next = commandsOut++;
+                int next = commandsOut++;
                 if (next == 0)
                 {
                     sessionCommandPoint(0, 0);
@@ -276,9 +281,9 @@
         log.debug("%s sync()", this);
         synchronized (commands)
         {
-            long point = commandsOut - 1;
+            int point = commandsOut - 1;
 
-            if (maxComplete < point)
+            if (lt(maxComplete, point))
             {
                 ExecutionSync sync = new ExecutionSync();
                 sync.setSync(true);
@@ -287,7 +292,7 @@
 
             long start = System.currentTimeMillis();
             long elapsed = 0;
-            while (!closed.get() && elapsed < timeout && maxComplete < point)
+            while (!closed.get() && elapsed < timeout && lt(maxComplete, 
point))
             {
                 try {
                     log.debug("%s   waiting for[%d]: %d, %s", this, point,
@@ -301,7 +306,7 @@
                 }
             }
 
-            if (maxComplete < point)
+            if (lt(maxComplete, point))
             {
                 if (closed.get())
                 {
@@ -315,10 +320,10 @@
         }
     }
 
-    private Map<Long,ResultFuture<?>> results =
-        new HashMap<Long,ResultFuture<?>>();
+    private Map<Integer,ResultFuture<?>> results =
+        new HashMap<Integer,ResultFuture<?>>();
 
-    void result(long command, Struct result)
+    void result(int command, Struct result)
     {
         ResultFuture<?> future;
         synchronized (results)
@@ -332,7 +337,7 @@
     {
         synchronized (commands)
         {
-            long command = commandsOut;
+            int command = commandsOut;
             ResultFuture<T> future = new ResultFuture<T>(klass);
             synchronized (results)
             {
@@ -446,4 +451,9 @@
         }
     }
 
+    public String toString()
+    {
+        return String.format("ssn:%s", str(name));
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
 Thu May 22 14:43:13 2008
@@ -208,7 +208,7 @@
             RangeSet ranges = new RangeSet();
             for (int i = 0; i < count; i++)
             {
-                ranges.add(readUint32(), readUint32());
+                ranges.add(readSequenceNo(), readSequenceNo());
             }
             return ranges;
         }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
 Thu May 22 14:43:13 2008
@@ -246,8 +246,8 @@
             writeUint16(ranges.size() * 8);
             for (Range range : ranges)
             {
-                writeUint32(range.getLower());
-                writeUint32(range.getUpper());
+                writeSequenceNo(range.getLower());
+                writeSequenceNo(range.getUpper());
             }
         }
     }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
 Thu May 22 14:43:13 2008
@@ -150,7 +150,7 @@
     {
         StringBuilder str = new StringBuilder();
         str.append(String.format
-                   ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(),
+                   ("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(),
                     getTrack(), getType(),
                     isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
                     isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
 Thu May 22 14:43:13 2008
@@ -73,4 +73,14 @@
         return str.toString();
     }
 
+    public static final String str(byte[] bytes)
+    {
+        return str(ByteBuffer.wrap(bytes));
+    }
+
+    public static final String str(byte[] bytes, int limit)
+    {
+        return str(ByteBuffer.wrap(bytes), limit);
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/util/SerialTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/util/SerialTest.java?rev=659271&r1=659270&r2=659271&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/util/SerialTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/util/SerialTest.java
 Thu May 22 14:43:13 2008
@@ -13,84 +13,18 @@
 {
 
     /**
-     * The simplest meaningful serial number space has SERIAL_BITS == 2.  In
-     * this space, the integers that make up the serial number space are 0,
-     * 1, 2, and 3.  That is, 3 == 2^SERIAL_BITS - 1.
-     *
-     * In this space, the largest integer that it is meaningful to add to a
-     * sequence number is 2^(SERIAL_BITS - 1) - 1, or 1.
-     *
-     * Then, as defined 0+1 == 1, 1+1 == 2, 2+1 == 3, and 3+1 == 0.
-     * Further, 1 > 0, 2 > 1, 3 > 2, and 0 > 3.  It is undefined whether
-     * 2 > 0 or 0 > 2, and whether 1 > 3 or 3 > 1.
+     * Test the key boundaries where wraparound occurs.
      */
-    public void testTrivialSample()
+    public void testBoundaries()
     {
-        Serial serial = new Serial(2);
-        assertEquals( serial.increment(0, 1), 1);
-        assertEquals( serial.increment(1, 1), 2);
-        assertEquals( serial.increment(2, 1), 3);
-        assertEquals( serial.increment(3, 1), 0);
-        try
-        {
-            serial.increment(4, 1);
-            fail("IllegalArgumentException was not trhown");
-        }
-        catch (IllegalArgumentException e)
-        {
-           // expected
-        }
-        try
-        {
-            assertTrue( serial.compare(1, 0) > 0);
-            assertTrue( serial.compare(2, 1) > 0);
-            assertTrue( serial.compare(3, 2) > 0);
-            assertTrue( serial.compare(0, 3) > 0);
-            assertTrue( serial.compare(0, 1) < 0);
-            assertTrue( serial.compare(1, 2) < 0);
-            assertTrue( serial.compare(2, 3) < 0);
-            assertTrue( serial.compare(3, 0) < 0);
-        }
-        catch (SerialException e)
-        {
-            fail("Unexpected exception " + e);
-        }
-        try
-        {
-            serial.compare(2, 0);
-            fail("AMQSerialException not thrown as expected");
-        }
-        catch (SerialException e)
-        {
-           // expected
-        }
-        try
-        {
-            serial.compare(0, 2);
-            fail("AMQSerialException not thrown as expected");
-        }
-        catch (SerialException e)
-        {
-           // expected
-        }
-        try
-        {
-            serial.compare(3, 1);
-            fail("AMQSerialException not thrown as expected");
-        }
-        catch (SerialException e)
-        {
-           // expected
-        }
-        try
-        {
-            serial.compare(3, 1);
-            fail("AMQSerialException not thrown as expected");
-        }
-        catch (SerialException e)
-        {
-           // expected
-        }
+        assertTrue(Serial.gt(1, 0));
+        assertTrue(Serial.lt(0, 1));
+
+        assertTrue(Serial.gt(Integer.MAX_VALUE+1, Integer.MAX_VALUE));
+        assertTrue(Serial.lt(Integer.MAX_VALUE, Integer.MAX_VALUE+1));
+
+        assertTrue(Serial.gt(0xFFFFFFFF + 1, 0xFFFFFFFF));
+        assertTrue(Serial.lt(0xFFFFFFFF, 0xFFFFFFFF + 1));
     }
 
     /**
@@ -98,38 +32,30 @@
      * For any sequence number s and any integer n such that addition of n
      * to s is well defined, (s + n) >= s.  Further (s + n) == s only when
      * n == 0, in all other defined cases, (s + n) > s.
-     * strategy:
-     * Create a serial number with 32 bits and check in a loop that adding 
integers
-     * respect the corollary
      */
     public void testCorollary1()
     {
-        Serial serial = new Serial(32);
-        Random random = new Random();
-        long number = random.nextInt((int) Math.pow(2.0 , 32.0) - 1);
-        for(int i = 1; i<= 10000; i++ )
+        int wrapcount = 0;
+
+        int s = 0;
+
+        for (int i = 0; i < 67108664; i++)
         {
-           long nextInt = random.nextInt((int) Math.pow(2.0 , 32.0) - 1);
-           long inc = serial.increment(number, nextInt);
-            int res =0;
-            try
-            {
-                res=serial.compare(inc, number);
-            }
-            catch (SerialException e)
-            {
-                fail("un-expected exception " + e);
-            }
-            if( res < 1 )
+            for (int n = 1; n < 4096; n += 512)
             {
-               fail("Corollary 1 violated " + number + " + " + nextInt + " < " 
+ number);
+                assertTrue(Serial.gt(s+n, s));
+                assertTrue(Serial.lt(s, s+n));
             }
-            else if( res == 0 && nextInt > 0)
+
+            s += 1024;
+
+            if (s == 0)
             {
-               fail("Corollary 1 violated " + number + " + " + nextInt + " = " 
+ number);
+                wrapcount += 1;
             }
         }
+
+        assertTrue(wrapcount > 0);
     }
 
-    
 }


Reply via email to