Author: rhs
Date: Wed Aug  8 15:32:03 2007
New Revision: 564037

URL: http://svn.apache.org/viewvc?view=rev&rev=564037
Log:
fixed executionSync

Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
 Wed Aug  8 15:32:03 2007
@@ -33,6 +33,7 @@
 {
 
     private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
+    private static final byte CLASS = 1;
 
     final private byte instance;
     final private byte major;
@@ -64,9 +65,9 @@
     {
         ByteBuffer buf = ByteBuffer.allocate(8);
         buf.put(AMQP);
-        buf.put((byte) 1);
-        buf.put((byte) 1);
-        buf.put( major);
+        buf.put(CLASS);
+        buf.put(instance);
+        buf.put(major);
         buf.put(minor);
         buf.flip();
         return buf;

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
 Wed Aug  8 15:32:03 2007
@@ -47,6 +47,19 @@
         return ranges.iterator();
     }
 
+    public boolean includes(Range range)
+    {
+        for (Range r : this)
+        {
+            if (r.includes(range))
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     public void add(Range range)
     {
         ListIterator<Range> it = ranges.listIterator();

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
 Wed Aug  8 15:32:03 2007
@@ -39,6 +39,7 @@
     private long commandsIn = 0;
     // completed incoming commands
     private final RangeSet processed = new RangeSet();
+    private Range syncPoint = null;
 
     // outgoing command count
     private long commandsOut = 0;
@@ -70,24 +71,56 @@
         return processed;
     }
 
-    public void processed(long command)
+    public void processed(Method command)
     {
-        processed.add(command);
+        processed(command.getId());
     }
 
-    public void processed(long lower, long upper)
+    public void processed(long command)
     {
-        processed.add(lower, upper);
+        processed(new Range(command, command));
     }
 
-    public void processed(Range range)
+    public void processed(long lower, long upper)
     {
-        processed.add(range);
+        processed(new Range(lower, upper));
     }
 
-    public void processed(Method command)
+    public void processed(Range range)
     {
-        processed(command.getId());
+        boolean flush;
+        synchronized (processed)
+        {
+            processed.add(range);
+            flush = syncPoint != null && processed.includes(syncPoint);
+        }
+        if (flush)
+        {
+            flushProcessed();
+        }
+    }
+
+    void flushProcessed()
+    {
+        executionComplete(0, processed);
+    }
+
+    void syncPoint()
+    {
+        Range range = new Range(0, getCommandsIn() - 1);
+        boolean flush;
+        synchronized (processed)
+        {
+            flush = processed.includes(range);
+            if (!flush)
+            {
+                syncPoint = range;
+            }
+        }
+        if (flush)
+        {
+            flushProcessed();
+        }
     }
 
     public void attach(Channel channel)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
 Wed Aug  8 15:32:03 2007
@@ -48,9 +48,14 @@
         System.out.println("outstanding commands: " + 
ssn.getOutstandingCommands());
     }
 
+    @Override public void executionFlush(Session ssn, ExecutionFlush flush)
+    {
+        ssn.flushProcessed();
+    }
+
     @Override public void executionSync(Session ssn, ExecutionSync sync)
     {
-        ssn.executionComplete(0, ssn.getProcessed());
+        ssn.syncPoint();
     }
 
 }


Reply via email to