Author: rhs
Date: Tue Aug  7 21:36:31 2007
New Revision: 563738

URL: http://svn.apache.org/viewvc?view=rev&rev=563738
Log:
implemented Session.sync()

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
    incubator/qpid/trunk/qpid/java/common/generate
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Decoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Encoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
    incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
 Tue Aug  7 21:36:31 2007
@@ -24,7 +24,7 @@
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Header;
 import org.apache.qpidity.Option;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
 
 /**
  * <p>A session is associated with a connection.
@@ -290,7 +290,7 @@
      * @param range Range of acknowledged messages.
      * @throws QpidException If the acknowledgement of the messages fails due 
to some error.
      */
-    public void messageAcknowledge(Range<Long>... range) throws QpidException;
+    public void messageAcknowledge(RangeSet ranges) throws QpidException;
 
     /**
      * Reject ranges of acquired messages.
@@ -300,7 +300,7 @@
      * @param range Range of rejected messages.
      * @throws QpidException If those messages cannot be rejected dus to some 
error
      */
-    public void messageReject(Range<Long>... range) throws QpidException;
+    public void messageReject(RangeSet ranges) throws QpidException;
 
     /**
      * Try to acquire ranges of messages hence releasing them form the queue.
@@ -314,7 +314,7 @@
      * @return Ranges of explicitly acquired messages.
      * @throws QpidException If this message cannot be acquired dus to some 
error
      */
-    public Range<Long>[] messageAcquire(Range<Long>... range) throws 
QpidException;
+    public RangeSet messageAcquire(RangeSet range) throws QpidException;
 
     /**
      * Give up responsibility for processing ranges of messages.
@@ -323,7 +323,7 @@
      * @param range Ranges of messages to be released.
      * @throws QpidException If this message cannot be released dus to some 
error.
      */
-    public void messageRelease(Range<Long>... range) throws QpidException;
+    public void messageRelease(RangeSet range) throws QpidException;
 
     // -----------------------------------------------
     //            Local transaction methods

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
 Tue Aug  7 21:36:31 2007
@@ -113,25 +113,25 @@
 
     }
 
-    public void messageAcknowledge(Range<Long>... range) throws QpidException
+    public void messageAcknowledge(RangeSet ranges) throws QpidException
     {
         // TODO
 
     }
 
-    public void messageReject(Range<Long>... range) throws QpidException
+    public void messageReject(RangeSet ranges) throws QpidException
     {
         // TODO
 
     }
 
-    public Range<Long>[] messageAcquire(Range<Long>... range) throws 
QpidException
+    public RangeSet messageAcquire(RangeSet ranges) throws QpidException
     {
         // TODO
         return null;
     }
 
-    public void messageRelease(Range<Long>... range) throws QpidException
+    public void messageRelease(RangeSet ranges) throws QpidException
     {
         // TODO
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Tue Aug  7 21:36:31 2007
@@ -19,7 +19,7 @@
 
 import org.apache.qpidity.jms.message.QpidMessage;
 import org.apache.qpidity.impl.MessagePartListenerAdapter;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Option;
 import org.apache.qpidity.filter.MessageFilter;
@@ -568,8 +568,9 @@
     {
         if (_preAcquire)
         {
-            Range<Long> range = new Range<Long>(message.getMessageID(), 
message.getMessageID());
-            getSession().getQpidSession().messageRelease(range);
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageID());
+            getSession().getQpidSession().messageRelease(ranges);
         }
     }
 
@@ -585,12 +586,13 @@
         boolean result = false;
         if (!_preAcquire)
         {
-            Range<Long> range = new Range<Long>(message.getMessageID(), 
message.getMessageID());
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageID());
 
-            Range<Long>[] rangeResult = 
getSession().getQpidSession().messageAcquire(range);
-            if (rangeResult.length > 0)
+            RangeSet acquired = 
getSession().getQpidSession().messageAcquire(ranges);
+            if (acquired.size() > 0)
             {
-                result = 
rangeResult[0].getLower().compareTo(message.getMessageID()) == 0;
+                result = acquired.iterator().next().getLower() == 
message.getMessageID();
             }
         }
         return result;
@@ -606,8 +608,9 @@
     {
         if (!_preAcquire)
         {
-            Range<Long> range = new Range<Long>(message.getMessageID(), 
message.getMessageID());
-            getSession().getQpidSession().messageAcknowledge(range);
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageID());
+            getSession().getQpidSession().messageAcknowledge(ranges);
         }
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
 Tue Aug  7 21:36:31 2007
@@ -22,6 +22,7 @@
 import org.apache.qpidity.jms.message.*;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
@@ -443,10 +444,11 @@
         for (QpidMessage message : _unacknowledgedMessages)
         {
             // release this message
-            Range<Long> range = new Range<Long>(message.getMessageID(), 
message.getMessageID());
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageID());
             try
             {
-                getQpidSession().messageRelease(range);
+                getQpidSession().messageRelease(ranges);
             }
             catch (QpidException e)
             {
@@ -982,10 +984,11 @@
         else
         {
             // acknowledge this message
-            Range<Long> range = new Range<Long>(message.getMessageID(), 
message.getMessageID());
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageID());
             try
             {
-                getQpidSession().messageAcknowledge(range);
+                getQpidSession().messageAcknowledge(ranges);
             }
             catch (QpidException e)
             {
@@ -1016,10 +1019,11 @@
                 for (QpidMessage message : _unacknowledgedMessages)
                 {
                     // acknowledge this message
-                    Range<Long> range = new 
Range<Long>(message.getMessageID(), message.getMessageID());
+                    RangeSet ranges = new RangeSet();
+                    ranges.add(message.getMessageID());
                     try
                     {
-                        getQpidSession().messageAcknowledge(range);
+                        getQpidSession().messageAcknowledge(ranges);
                     }
                     catch (QpidException e)
                     {

Modified: incubator/qpid/trunk/qpid/java/common/generate
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/generate?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/generate (original)
+++ incubator/qpid/trunk/qpid/java/common/generate Tue Aug  7 21:36:31 2007
@@ -51,7 +51,7 @@
   "timestamp": "long",
   "content": "String",
   "uuid": "UUID",
-  "rfc1982-long-set": "Range<Long>[]",
+  "rfc1982-long-set": "RangeSet",
   "long-struct": "Struct"
   }
 

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
 Tue Aug  7 21:36:31 2007
@@ -143,7 +143,7 @@
         return null;
     }
 
-    public Range<Long>[] readRfc1982LongSet()
+    public RangeSet readRfc1982LongSet()
     {
         int count = readShort()/8;
         if (count == 0)
@@ -152,10 +152,10 @@
         }
         else
         {
-            Range<Long>[] ranges = new Range[count];
+            RangeSet ranges = new RangeSet();
             for (int i = 0; i < count; i++)
             {
-                ranges[i] = new Range<Long>(readLong(), readLong());
+                ranges.add(readLong(), readLong());
             }
             return ranges;
         }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
 Tue Aug  7 21:36:31 2007
@@ -150,7 +150,7 @@
         //throw new Error("TODO");
     }
 
-    public void writeRfc1982LongSet(Range<Long>[] ranges)
+    public void writeRfc1982LongSet(RangeSet ranges)
     {
         if (ranges == null)
         {
@@ -158,8 +158,8 @@
         }
         else
         {
-            writeShort(ranges.length * 8);
-            for (Range<Long> range : ranges)
+            writeShort(ranges.size() * 8);
+            for (Range range : ranges)
             {
                 writeLong(range.getLower());
                 writeLong(range.getUpper());

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Decoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Decoder.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Decoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Decoder.java
 Tue Aug  7 21:36:31 2007
@@ -44,7 +44,7 @@
     String readLongstr();
 
     Map<String,?> readTable();
-    Range<Long>[] readRfc1982LongSet();
+    RangeSet readRfc1982LongSet();
     UUID readUuid();
 
     String readContent();

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Encoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Encoder.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Encoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Encoder.java
 Tue Aug  7 21:36:31 2007
@@ -44,7 +44,7 @@
     void writeLongstr(String s);
 
     void writeTable(Map<String,?> table);
-    void writeRfc1982LongSet(Range<Long>[] ranges);
+    void writeRfc1982LongSet(RangeSet ranges);
     void writeUuid(UUID uuid);
 
     void writeContent(String c);

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java
 Tue Aug  7 21:36:31 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpidity;
 
+import static java.lang.Math.*;
+
 
 /**
  * Range
@@ -27,25 +29,57 @@
  * @author Rafael H. Schloming
  */
 
-public class Range<C extends Comparable>
+public class Range
 {
-    private final C lower;
-    private final C upper;
+    private final long lower;
+    private final long upper;
 
-    public Range(C lower, C upper)
+    public Range(long lower, long upper)
     {
         this.lower = lower;
         this.upper = upper;
     }
 
-    public C getLower()
+    public long getLower()
     {
         return lower;
     }
 
-    public C getUpper()
+    public long getUpper()
     {
         return upper;
+    }
+
+    public boolean includes(long value)
+    {
+        return lower <= value && value <= upper;
+    }
+
+    public boolean includes(Range range)
+    {
+        return includes(range.lower) && includes(range.upper);
+    }
+
+    public boolean intersects(Range range)
+    {
+        return (includes(range.lower) || includes(range.upper) ||
+                range.includes(lower) || range.includes(upper));
+    }
+
+    public boolean touches(Range range)
+    {
+        return (includes(range.upper + 1) || includes(range.lower - 1) ||
+                range.includes(upper + 1) || range.includes(lower - 1));
+    }
+
+    public Range span(Range range)
+    {
+        return new Range(min(lower, range.lower), max(upper, range.upper));
+    }
+
+    public String toString()
+    {
+        return "[" + lower + ", " + upper + "]";
     }
 
 }

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java?view=auto&rev=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
 Tue Aug  7 21:36:31 2007
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.LinkedList;
+
+
+/**
+ * RangeSet
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class RangeSet implements Iterable<Range>
+{
+
+    private LinkedList<Range> ranges = new LinkedList<Range>();
+
+    public int size()
+    {
+        return ranges.size();
+    }
+
+    public Iterator<Range> iterator()
+    {
+        return ranges.iterator();
+    }
+
+    public void add(Range range)
+    {
+        ListIterator<Range> it = ranges.listIterator();
+
+        while (it.hasNext())
+        {
+            Range next = it.next();
+            if (range.touches(next))
+            {
+                it.remove();
+                range = range.span(next);
+            }
+            else if (range.getUpper() < next.getLower())
+            {
+                it.previous();
+                it.add(range);
+                return;
+            }
+        }
+
+        it.add(range);
+    }
+
+    public void add(long lower, long upper)
+    {
+        add(new Range(lower, upper));
+    }
+
+    public void add(long value)
+    {
+        add(value, value);
+    }
+
+    public void clear()
+    {
+        ranges.clear();
+    }
+
+    public String toString()
+    {
+        return ranges.toString();
+    }
+
+    public static final void main(String[] args)
+    {
+        RangeSet ranges = new RangeSet();
+        ranges.add(5, 10);
+        System.out.println(ranges);
+        ranges.add(15, 20);
+        System.out.println(ranges);
+        ranges.add(23, 25);
+        System.out.println(ranges);
+        ranges.add(12, 14);
+        System.out.println(ranges);
+        ranges.add(0, 1);
+        System.out.println(ranges);
+        ranges.add(3, 11);
+        System.out.println(ranges);
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
 Tue Aug  7 21:36:31 2007
@@ -37,11 +37,15 @@
 
     // channel may be null
     Channel channel;
-    // outgoing command count
-    private long commandsOut = 0;
+
     // XXX: incoming command count not used
     // incoming command count
     private long commandsIn = 0;
+    // completed incoming commands
+    private final RangeSet processed = new RangeSet();
+
+    // outgoing command count
+    private long commandsOut = 0;
     private Map<Long,Method> commands = new HashMap<Long,Method>();
     private long mark = 0;
 
@@ -55,6 +59,31 @@
         return commandsIn;
     }
 
+    public RangeSet getProcessed()
+    {
+        return processed;
+    }
+
+    public void processed(long command)
+    {
+        processed.add(command);
+    }
+
+    public void processed(long lower, long upper)
+    {
+        processed.add(lower, upper);
+    }
+
+    public void processed(Range range)
+    {
+        processed.add(range);
+    }
+
+    public void processed(Struct command)
+    {
+        processed(command.getId());
+    }
+
     public void attach(Channel channel)
     {
         this.channel = channel;
@@ -63,15 +92,24 @@
 
     public Method getCommand(long id)
     {
-        System.out.println(id + " " + commands);
-        return commands.get(id);
+        synchronized (commands)
+        {
+            return commands.get(id);
+        }
     }
 
     void complete(long lower, long upper)
     {
-        for (long id = lower; id <= upper; id++)
+        synchronized (commands)
         {
-            commands.put(id, null);
+            for (long id = lower; id <= upper; id++)
+            {
+                commands.remove(id);
+            }
+            if (commands.isEmpty())
+            {
+                commands.notifyAll();
+            }
         }
     }
 
@@ -85,8 +123,10 @@
     {
         if (m.getEncodedTrack() == Frame.L4)
         {
-            long cmd = commandsOut++;
-            commands.put(cmd, m);
+            synchronized (commands)
+            {
+                commands.put(commandsOut++, m);
+            }
         }
         channel.method(m);
     }
@@ -114,6 +154,28 @@
     public void end()
     {
         channel.end();
+    }
+
+    public void sync()
+    {
+        synchronized (commands)
+        {
+            if (!commands.isEmpty())
+            {
+                executionSync();
+            }
+
+            while (!commands.isEmpty())
+            {
+                try {
+                    commands.wait();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 
     protected void invoke(Method m, Handler<Struct> handler)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
 Tue Aug  7 21:36:31 2007
@@ -36,15 +36,20 @@
 
     @Override public void executionComplete(Session ssn, ExecutionComplete 
excmp)
     {
-        Range<Long>[] ranges = excmp.getRangedExecutionSet();
+        RangeSet ranges = excmp.getRangedExecutionSet();
         if (ranges != null)
         {
-            for (Range<Long> range : ranges)
+            for (Range range : ranges)
             {
                 ssn.complete(range.getLower(), range.getUpper());
             }
         }
         ssn.complete(excmp.getCumulativeExecutionMark());
+    }
+
+    @Override public void executionSync(Session ssn, ExecutionSync sync)
+    {
+        ssn.executionComplete(0, ssn.getProcessed());
     }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
 Tue Aug  7 21:36:31 2007
@@ -58,6 +58,7 @@
     {
         queues.put(qd.getQueue(), new LinkedList());
         System.out.println("declared queue: " + qd.getQueue());
+        ssn.processed(qd);
     }
 
     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
@@ -120,6 +121,7 @@
                 queue.offer(m);
                 System.out.println("queued " + m);
             }
+            ssn.processed(xfr);
             xfr = null;
             frames = null;
         }
@@ -133,8 +135,8 @@
         }
         else
         {
-            long id = xfr.getId();
-            Range[] ranges = {new Range<Long>(id, id)};
+            RangeSet ranges = new RangeSet();
+            ranges.add(xfr.getId());
             ssn.messageReject(ranges, 0, "no such destination");
         }
     }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
 Tue Aug  7 21:36:31 2007
@@ -32,7 +32,7 @@
 
     @Override public void messageReject(Session ssn, MessageReject reject)
     {
-        for (Range<Long> range : reject.getTransfers())
+        for (Range range : reject.getTransfers())
         {
             for (long l = range.getLower(); l <= range.getUpper(); l++)
             {
@@ -40,6 +40,7 @@
                                    ssn.getCommand((int) l));
             }
         }
+        ssn.processed(reject);
     }
 
     public void headers(Session ssn, Struct ... headers)
@@ -73,6 +74,7 @@
         ssn.sessionOpen(1234);
 
         ssn.queueDeclare("asdf", null, null);
+        ssn.sync();
 
         ssn.messageTransfer("asdf", (short) 0, (short) 1);
         ssn.headers(new DeliveryProperties(),

Modified: incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml?view=diff&rev=563738&r1=563737&r2=563738
==============================================================================
--- incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml (original)
+++ incubator/qpid/trunk/qpid/specs/amqp.0-10-preview.xml Tue Aug  7 21:36:31 
2007
@@ -1272,6 +1272,22 @@
     </doc>
   </domain>
 
+  <domain name="execution-header">
+    <doc>
+      The execution header appears on commands after the class and method id, 
but prior to method
+      arguments.
+    </doc>
+    <struct size="octet" pack="octet">
+      <field name="sync" domain="bit"
+             label="request notification of completion for a specific command">
+        <doc>
+          Indicates that an execution.complete should be sent immediately 
after processing the
+          command.
+        </doc>
+      </field>
+    </struct>
+  </domain>
+
   <!-- Elementary domains -->
   <domain name="bit" type="bit" label="single bit" />
   <domain name="octet" type="octet" label="single octet" />
@@ -7061,6 +7077,19 @@
 
       <field name="command-id" domain="command-id"/>
       <field name="data" domain="long-struct"/>
+    </method>
+
+    <!-- - Method: execution.sync  - - - - - - - - - - - - - - - - - - - - - - 
- - - - - - - - - -->
+
+    <method name="sync" index="50" label="request notification of completion 
for issued commands">
+      <doc>
+        Requests notification (via execution.complete) when all commands 
issued prior to the sync
+        control have been processed. If the recipient of this control has 
already notified the
+        sender that said commands are complete, it may safely ignore the 
control.
+      </doc>
+
+      <chassis name="server" implement="MUST"/>
+      <chassis name="client" implement="MUST"/>
     </method>
 
   </class>


Reply via email to