Author: rhs
Date: Wed Aug 8 15:32:03 2007
New Revision: 564037
URL: http://svn.apache.org/viewvc?view=rev&rev=564037
Log:
fixed executionSync
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
Wed Aug 8 15:32:03 2007
@@ -33,6 +33,7 @@
{
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
+ private static final byte CLASS = 1;
final private byte instance;
final private byte major;
@@ -64,9 +65,9 @@
{
ByteBuffer buf = ByteBuffer.allocate(8);
buf.put(AMQP);
- buf.put((byte) 1);
- buf.put((byte) 1);
- buf.put( major);
+ buf.put(CLASS);
+ buf.put(instance);
+ buf.put(major);
buf.put(minor);
buf.flip();
return buf;
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java
Wed Aug 8 15:32:03 2007
@@ -47,6 +47,19 @@
return ranges.iterator();
}
+ public boolean includes(Range range)
+ {
+ for (Range r : this)
+ {
+ if (r.includes(range))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public void add(Range range)
{
ListIterator<Range> it = ranges.listIterator();
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
Wed Aug 8 15:32:03 2007
@@ -39,6 +39,7 @@
private long commandsIn = 0;
// completed incoming commands
private final RangeSet processed = new RangeSet();
+ private Range syncPoint = null;
// outgoing command count
private long commandsOut = 0;
@@ -70,24 +71,56 @@
return processed;
}
- public void processed(long command)
+ public void processed(Method command)
{
- processed.add(command);
+ processed(command.getId());
}
- public void processed(long lower, long upper)
+ public void processed(long command)
{
- processed.add(lower, upper);
+ processed(new Range(command, command));
}
- public void processed(Range range)
+ public void processed(long lower, long upper)
{
- processed.add(range);
+ processed(new Range(lower, upper));
}
- public void processed(Method command)
+ public void processed(Range range)
{
- processed(command.getId());
+ boolean flush;
+ synchronized (processed)
+ {
+ processed.add(range);
+ flush = syncPoint != null && processed.includes(syncPoint);
+ }
+ if (flush)
+ {
+ flushProcessed();
+ }
+ }
+
+ void flushProcessed()
+ {
+ executionComplete(0, processed);
+ }
+
+ void syncPoint()
+ {
+ Range range = new Range(0, getCommandsIn() - 1);
+ boolean flush;
+ synchronized (processed)
+ {
+ flush = processed.includes(range);
+ if (!flush)
+ {
+ syncPoint = range;
+ }
+ }
+ if (flush)
+ {
+ flushProcessed();
+ }
}
public void attach(Channel channel)
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java?view=diff&rev=564037&r1=564036&r2=564037
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
Wed Aug 8 15:32:03 2007
@@ -48,9 +48,14 @@
System.out.println("outstanding commands: " +
ssn.getOutstandingCommands());
}
+ @Override public void executionFlush(Session ssn, ExecutionFlush flush)
+ {
+ ssn.flushProcessed();
+ }
+
@Override public void executionSync(Session ssn, ExecutionSync sync)
{
- ssn.executionComplete(0, ssn.getProcessed());
+ ssn.syncPoint();
}
}