Author: gsim
Date: Mon Jul 23 05:29:17 2007
New Revision: 558700

URL: http://svn.apache.org/viewvc?view=rev&rev=558700
Log:
Added initial 'execution-layer' to try out methods form the 0-10 execution 
class.


Modified:
    
incubator/qpid/trunk/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
    
incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
    incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
    incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
    incubator/qpid/trunk/qpid/python/qpid/client.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/tests_0-9/basic.py
    incubator/qpid/trunk/qpid/specs/amqp-dtx-preview.0-9.xml

Modified: 
incubator/qpid/trunk/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
 (original)
+++ 
incubator/qpid/trunk/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java
 Mon Jul 23 05:29:17 2007
@@ -348,6 +348,8 @@
             return generateConstructor(thisClass, method, version, 4, 4);
         if (token.equals("${mb_server_operation_invoke}"))
             return generateServerOperationsInvoke(thisClass, method, version, 
4, 4);
+        if (token.equals("${mb_server_operation_invoke2}"))
+            return generateServerOperationsInvoke2(thisClass, method, version, 
4, 4);
         if (token.equals("${mb_buffer_param}"))
             return method.fieldMap.size() > 0 ? " buffer" : "/*buffer*/";
         if (token.equals("${hv_latest_major}"))
@@ -719,7 +721,7 @@
                     sb.append(cr);
                 sb.append(indent + "// ==================== class " + 
handlerClassName +
                           " ====================" + cr);
-                sb.append(indent + "class " + handlerClassName);
+                sb.append(indent + "class " + handlerClassName + " : public 
virtual Invocable");
                 if (thisClass.versionSet.size() != globalVersionSet.size())
                     sb.append(" // AMQP Version(s) " + thisClass.versionSet + 
cr);
                 else
@@ -1085,11 +1087,11 @@
         String tab = Utils.createSpaces(tabSize);
         String namespace = version != null ? version.namespace() + "::" : "";
         StringBuffer sb = new StringBuffer();
-        sb.append(indent+tab+(method.isResponse(version) ? "" : "return 
")+"channel.send(new ");
+        sb.append(indent+tab+(method.isResponse(version) ? "" : "return 
")+"channel.send(make_shared_ptr(new ");
         sb.append(namespace + methodBodyClassName + "( channel.getVersion()");
         if (method.isResponse(version)) sb.append(", responseTo");
         sb.append(generateMethodParameterList(fieldMap, indentSize + 
(5*tabSize), true, false, true));
-        sb.append("));\n");
+        sb.append(")));\n");
         return sb.toString();           
     }
     
@@ -1502,6 +1504,50 @@
                             }
                     }
             }
+        return sb.toString();       
+    }
+    
+    protected String generateServerOperationsInvoke2(AmqpClass thisClass, 
AmqpMethod method,
+                                                    AmqpVersion version, int 
indentSize, int tabSize)
+        throws AmqpTypeMappingException
+    {
+        String indent = Utils.createSpaces(indentSize);
+        String tab = Utils.createSpaces(tabSize);
+        StringBuffer sb = new StringBuffer();
+        String ptrType = "AMQP_ServerOperations:: " + thisClass.name + 
"Handler*";
+        if (isSpecialCase(thisClass.name, method.name)) {
+            sb.append(indent + "bool invoke(Invocable*)" + cr);
+            sb.append(indent + "{" + cr);
+            sb.append(indent + tab + "return false;" + cr);
+            sb.append(indent + "}" + cr);
+        } else if (method.serverMethodFlagMap.size() > 0) { // At least one 
AMQP version defines this method as a server method
+            
+            Iterator<Boolean> bItr = 
method.serverMethodFlagMap.keySet().iterator();
+            while (bItr.hasNext())
+                {
+                    if (bItr.next()) // This is a server operation
+                        {
+                            boolean fieldMapNotEmptyFlag = 
method.fieldMap.size() > 0;
+                            sb.append(indent + "bool invoke(Invocable* 
target)" + cr);
+                            sb.append(indent + "{" + cr);
+                            sb.append(indent + tab + ptrType + " ptr = 
dynamic_cast<" + ptrType + ">(target);" + cr);
+                            sb.append(indent + tab + "if (ptr) {" + cr);
+                            sb.append(indent + tab + tab + "ptr->");
+                            
sb.append(parseForReservedWords(Utils.firstLower(method.name),null) + "(" + cr);
+                            if (fieldMapNotEmptyFlag)
+                                {
+                                        
sb.append(generateFieldList(method.fieldMap, version, false, false, indentSize 
+ 4*tabSize));
+                                        sb.append(indent + tab + tab + tab + 
tab);                    
+                                }
+                            sb.append(");" + cr);
+                            sb.append(indent + tab + tab + "return true;" + 
cr);
+                            sb.append(indent + tab + "} else {" + cr);
+                            sb.append(indent + tab + tab + "return false;" + 
cr);
+                            sb.append(indent + tab + "}" + cr);
+                            sb.append(indent + "}" + cr);
+                        }
+                }
+        }
         return sb.toString();       
     }
     

Modified: 
incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl 
(original)
+++ 
incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/AMQP_ServerOperations.h.tmpl 
Mon Jul 23 05:29:17 2007
@@ -36,6 +36,13 @@
 
 class MethodContext;
 
+class Invocable 
+{
+protected:
+    Invocable() {}        
+    virtual ~Invocable() {}        
+};
+
 class AMQP_ServerOperations
 {
 protected:

Modified: 
incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl 
(original)
+++ incubator/qpid/trunk/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl Mon 
Jul 23 05:29:17 2007
@@ -100,6 +100,8 @@
 
 ${mb_server_operation_invoke}
 
+${mb_server_operation_invoke2}
+
 }; // class ${CLASS}${METHOD}Body
 
 ${version_namespace_end}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Mon Jul 23 
05:29:17 2007
@@ -56,12 +56,12 @@
 void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
     channel.open();
     // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
-    client.openOk(std::string()/* ID */);//GRS, context.getRequestId());
+    client.openOk(std::string()/* ID */);
 } 
         
 void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
     channel.flow(active);
-    client.flowOk(active);//GRS, context.getRequestId());
+    client.flowOk(active);
 }         
 
 void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} 
@@ -70,7 +70,7 @@
     const string& /*replyText*/,
     uint16_t /*classId*/, uint16_t /*methodId*/)
 {
-    client.closeOk();//GRS context.getRequestId());
+    client.closeOk();
     // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
     connection.closeChannel(channel.getId()); 
 } 
@@ -104,7 +104,7 @@
         }
     }
     if(!nowait){
-        client.declareOk();//GRS context.getRequestId());
+        client.declareOk();
     }
 }
                 
@@ -114,16 +114,16 @@
     Exchange::shared_ptr exchange(broker.getExchanges().get(name));
     if (exchange->isDurable()) broker.getStore().destroy(*exchange);
     broker.getExchanges().destroy(name);
-    if(!nowait) client.deleteOk();//GRS context.getRequestId());
+    if(!nowait) client.deleteOk();
 } 
 
 void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const 
string& name)
 {
     try {
         Exchange::shared_ptr exchange(broker.getExchanges().get(name));
-        client.queryOk(exchange->getType(), exchange->isDurable(), false, 
exchange->getArgs());//GRS, context.getRequestId());
+        client.queryOk(exchange->getType(), exchange->isDurable(), false, 
exchange->getArgs());
     } catch (const ChannelException& e) {
-        client.queryOk("", false, true, FieldTable());//GRS, 
context.getRequestId());        
+        client.queryOk("", false, true, FieldTable());        
     }
 }
 
@@ -144,18 +144,18 @@
     }
 
     if (!exchange) {
-        client.queryOk(true, false, false, false, false);//GRS, 
context.getRequestId());
+        client.queryOk(true, false, false, false, false);
     } else if (!queueName.empty() && !queue) {
-        client.queryOk(false, true, false, false, false);//GRS, 
context.getRequestId());
+        client.queryOk(false, true, false, false, false);
     } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 
0 ? &args : &args)) {
-            client.queryOk(false, false, false, false, false);//GRS, 
context.getRequestId());
+            client.queryOk(false, false, false, false, false);
     } else {
         //need to test each specified option individually
         bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 
0);
         bool keyMatched = key.empty() || 
exchange->isBound(Queue::shared_ptr(), &key, 0);
         bool argsMatched = args.count() == 0 || 
exchange->isBound(Queue::shared_ptr(), 0, &args);
 
-        client.queryOk(false, false, !queueMatched, !keyMatched, 
!argsMatched);//GRS, context.getRequestId());
+        client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);
     }
 }
 
@@ -196,7 +196,7 @@
     if (!nowait) {
         string queueName = queue->getName();
         client.declareOk(
-            queueName, queue->getMessageCount(), 
queue->getConsumerCount());//GRS, context.getRequestId());
+            queueName, queue->getMessageCount(), queue->getConsumerCount());
     }
 } 
         
@@ -214,7 +214,7 @@
                 broker.getStore().bind(*exchange, *queue, routingKey, 
arguments);
             }
         }
-        if(!nowait) client.bindOk();//GRS context.getRequestId());    
+        if(!nowait) client.bindOk();    
     }else{
         throw ChannelException(
             404, "Bind failed. No such exchange: " + exchangeName);
@@ -238,14 +238,14 @@
         broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
     }
 
-    client.unbindOk();//GRS context.getRequestId());    
+    client.unbindOk();    
 }
         
 void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& 
queueName, bool nowait){
 
     Queue::shared_ptr queue = getQueue(queueName);
     int count = queue->purge();
-    if(!nowait) client.purgeOk( count);//GRS, context.getRequestId());
+    if(!nowait) client.purgeOk( count);
 } 
         
 void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const 
string& queue, 
@@ -270,7 +270,7 @@
     }
 
     if(!nowait)
-        client.deleteOk(count);//GRS, context.getRequestId());
+        client.deleteOk(count);
 } 
               
         
@@ -280,7 +280,7 @@
     //TODO: handle global
     channel.setPrefetchSize(prefetchSize);
     channel.setPrefetchCount(prefetchCount);
-    client.qosOk();//GRS context.getRequestId());
+    client.qosOk();
 } 
         
 void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, 
@@ -300,7 +300,7 @@
     channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, 
newTag, connection.getFrameMax())),
         newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
 
-    if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId());
+    if(!nowait) client.consumeOk(newTag);
 
     //allow messages to be dispatched if required as there is now a consumer:
     queue->requestDispatch();
@@ -309,7 +309,7 @@
 void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool 
nowait){
     channel.cancel(consumerTag);
 
-    if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId());
+    if(!nowait) client.cancelOk(consumerTag);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, 
@@ -333,7 +333,7 @@
     if(!channel.get(out, queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
 
-        client.getEmpty(clusterId);//GRS, context.getRequestId());
+        client.getEmpty(clusterId);
     }
 } 
         
@@ -351,19 +351,19 @@
 void BrokerAdapter::TxHandlerImpl::select()
 {
     channel.startTx();
-    client.selectOk();//GRS context.getRequestId());
+    client.selectOk();
 }
 
 void BrokerAdapter::TxHandlerImpl::commit()
 {
     channel.commit();
-    client.commitOk();//GRS context.getRequestId());
+    client.commitOk();
 }
 
 void BrokerAdapter::TxHandlerImpl::rollback()
 {    
     channel.rollback();
-    client.rollbackOk();//GRS context.getRequestId());
+    client.rollbackOk();
     channel.recover(false);    
 }
               
@@ -378,7 +378,7 @@
 //
 void BrokerAdapter::ChannelHandlerImpl::ping()
 {
-    client.ok();//GRS context.getRequestId());
+    client.ok();
     client.pong();
 }
 
@@ -386,7 +386,7 @@
 void
 BrokerAdapter::ChannelHandlerImpl::pong()
 {
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Mon Jul 23 
05:29:17 2007
@@ -79,6 +79,7 @@
 
     DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
     DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+    ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, 
"Wrong adapter for execution layer method!"); }
 
     ConnectionHandler* getConnectionHandler() { 
         throw ConnectionException(503, "Can't access connection class on 
non-zero channel!");        

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp Mon Jul 23 
05:29:17 2007
@@ -78,10 +78,10 @@
                            const string& consumerTag, uint64_t deliveryTag, 
                            uint32_t framesize)
 {
-    channel.send(
+    channel.send(make_shared_ptr(
        new BasicDeliverBody(
             channel.getVersion(), consumerTag, deliveryTag,
-            getRedelivered(), getExchange(), getRoutingKey()));
+            getRedelivered(), getExchange(), getRoutingKey())));
     sendContent(channel, framesize);
 }
 
@@ -92,12 +92,12 @@
                              uint64_t deliveryTag, 
                              uint32_t framesize)
 {
-    channel.send(
+    channel.send(make_shared_ptr(
         new BasicGetOkBody(
             channel.getVersion(),
             responseTo,
             deliveryTag, getRedelivered(), getExchange(),
-            getRoutingKey(), messageCount)); 
+            getRoutingKey(), messageCount))); 
     sendContent(channel, framesize);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp Mon 
Jul 23 05:29:17 2007
@@ -85,7 +85,7 @@
     if (ref){
 
         // Open
-        channel.send(new MessageOpenBody(channel.getVersion(), ref->getId()));
+        channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), 
ref->getId())));
         // Appends
         for(Reference::Appends::const_iterator a = ref->getAppends().begin();
             a != ref->getAppends().end();
@@ -98,8 +98,8 @@
             string::size_type contentStart = 0;
             while (sizeleft) {
                 string::size_type contentSize = sizeleft <= framesize ? 
sizeleft : framesize-overhead;
-                channel.send(new MessageAppendBody(channel.getVersion(), 
ref->getId(),
-                                                   string(content, 
contentStart, contentSize)));
+                channel.send(make_shared_ptr(new 
MessageAppendBody(channel.getVersion(), ref->getId(),
+                                                                   
string(content, contentStart, contentSize))));
                 sizeleft -= contentSize;
                 contentStart += contentSize;
             }
@@ -108,7 +108,7 @@
        
     // The transfer
     if ( transfer->size()<=framesize ) {
-       channel.send(
+       channel.send(make_shared_ptr(
             new MessageTransferBody(channel.getVersion(), 
                                     transfer->getTicket(),
                                     consumerTag,
@@ -132,7 +132,7 @@
                                     transfer->getSecurityToken(),
                                     transfer->getApplicationHeaders(),
                                     body,
-                                    transfer->getMandatory()));
+                                    transfer->getMandatory())));
     } else {
         // Thing to do here is to construct a simple reference message then 
deliver that instead
         // fragmentation will be taken care of in the delivery if necessary;
@@ -172,7 +172,7 @@
     }
     // Close any reference data
     if (ref)
-        channel.send(new MessageCloseBody(channel.getVersion(), ref->getId()));
+        channel.send(make_shared_ptr(new 
MessageCloseBody(channel.getVersion(), ref->getId())));
 }
 
 void MessageMessage::deliver(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h Mon Jul 
23 05:29:17 2007
@@ -71,6 +71,7 @@
     TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
     DtxCoordinationHandler* getDtxCoordinationHandler() { throw 
ConnectionException(503, "Class can't be accessed over channel 0"); }
     DtxDemarcationHandler* getDtxDemarcationHandler() { throw 
ConnectionException(503, "Class can't be accessed over channel 0"); }
+    ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, 
"Class can't be accessed over channel 0"); }
     framing::ProtocolVersion getVersion() const;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Mon Jul 23 
05:29:17 2007
@@ -52,7 +52,7 @@
 void DtxHandlerImpl::select()
 {
     channel.selectDtx();
-    dClient.selectOk();//GRS context.getRequestId());
+    dClient.selectOk();
 }
 
 void DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -66,7 +66,7 @@
             if (suspend) {
                 throw ConnectionException(503, "End and suspend cannot both be 
set.");
             } else {
-                dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId());
+                dClient.endOk(XA_RBROLLBACK);
             }
         } else {
             if (suspend) {
@@ -74,10 +74,10 @@
             } else {
                 channel.endDtx(xid, false);
             }
-            dClient.endOk(XA_OK);//GRS, context.getRequestId());
+            dClient.endOk(XA_OK);
         }
     } catch (const DtxTimeoutException& e) {
-        dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId());        
+        dClient.endOk(XA_RBTIMEOUT);        
     }
 }
 
@@ -95,9 +95,9 @@
         } else {
             channel.startDtx(xid, broker.getDtxManager(), join);
         }
-        dClient.startOk(XA_OK);//GRS, context.getRequestId());
+        dClient.startOk(XA_OK);
     } catch (const DtxTimeoutException& e) {
-        dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId());        
+        dClient.startOk(XA_RBTIMEOUT);        
     }
 }
 
@@ -108,9 +108,9 @@
 {
     try {
         bool ok = broker.getDtxManager().prepare(xid);
-        cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, 
context.getRequestId());
+        cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);
     } catch (const DtxTimeoutException& e) {
-        cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId());        
+        cClient.prepareOk(XA_RBTIMEOUT);        
     }
 }
 
@@ -120,9 +120,9 @@
 {
     try {
         bool ok = broker.getDtxManager().commit(xid, onePhase);
-        cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, 
context.getRequestId());
+        cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);
     } catch (const DtxTimeoutException& e) {
-        cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId());        
+        cClient.commitOk(XA_RBTIMEOUT);        
     }
 }
 
@@ -132,9 +132,9 @@
 {
     try {
         broker.getDtxManager().rollback(xid);
-        cClient.rollbackOk(XA_OK);//GRS, context.getRequestId());
+        cClient.rollbackOk(XA_OK);
     } catch (const DtxTimeoutException& e) {
-        cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId());       
 
+        cClient.rollbackOk(XA_RBTIMEOUT);        
     }
 }
 
@@ -171,7 +171,7 @@
 
     FieldTable response;
     response.setString("xids", data);
-    cClient.recoverOk(response);//GRS, context.getRequestId());
+    cClient.recoverOk(response);
 }
 
 void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -184,7 +184,7 @@
 void DtxHandlerImpl::getTimeout(const string& xid)
 {
     uint32_t timeout = broker.getDtxManager().getTimeout(xid);
-    cClient.getTimeoutOk(timeout);//GRS, context.getRequestId());    
+    cClient.getTimeoutOk(timeout);    
 }
 
 
@@ -193,7 +193,7 @@
                                 u_int32_t timeout)
 {
     broker.getDtxManager().setTimeout(xid, timeout);
-    cClient.setTimeoutOk();//GRS context.getRequestId());    
+    cClient.setTimeoutOk();    
 }
 
 void DtxHandlerImpl::setResponseTo(framing::RequestId r)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp Mon Jul 
23 05:29:17 2007
@@ -47,13 +47,13 @@
             uint32_t offset = 0;
             for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
                 string data = (*i)->getData().substr(offset, framesize);
-                channel.send(new AMQContentBody(data)); 
+                channel.send(make_shared_ptr(new AMQContentBody(data))); 
                 offset += framesize;
             }
             uint32_t remainder = (*i)->size() % framesize;
             if (remainder) {
                 string data = (*i)->getData().substr(offset, remainder);
-                channel.send(new AMQContentBody(data)); 
+                channel.send(make_shared_ptr(new AMQContentBody(data))); 
             }
         } else {
             AMQBody::shared_ptr contentBody =

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp Mon Jul 
23 05:29:17 2007
@@ -52,12 +52,12 @@
             string data;
             store->loadContent(*msg, data, offset,
                                remaining > framesize ? framesize : remaining);
-            channel.send(new AMQContentBody(data));
+            channel.send(make_shared_ptr(new AMQContentBody(data)));
         }
     } else {
         string data;
         store->loadContent(*msg, data, 0, expectedSize);  
-        channel.send(new AMQContentBody(data));
+        channel.send(make_shared_ptr(new AMQContentBody(data)));
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Mon 
Jul 23 05:29:17 2007
@@ -45,14 +45,14 @@
 MessageHandlerImpl::cancel(const string& destination )
 {
     channel.cancel(destination);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void
 MessageHandlerImpl::open(const string& reference)
 {
     references.open(reference);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void
@@ -60,14 +60,14 @@
 {
     MessageAppendBody::shared_ptr 
body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
     references.get(body->getReference())->append(body);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void
 MessageHandlerImpl::close(const string& reference)
 {
        Reference::shared_ptr ref = references.get(reference);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
     
     // Send any transfer messages to their correct exchanges and okay them
     const Reference::Messages& msgs = ref->getMessages();
@@ -85,7 +85,7 @@
 {
     // Initial implementation (which is conforming) is to do nothing here
     // and return offset zero for the resume
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void
@@ -95,7 +95,7 @@
     // Initial (null) implementation
     // open reference and return 0 offset
     references.open(reference);
-    client.offset(0);//GRS, );//GRS, context.getRequestId());
+    client.offset(0);
 }
 
 void
@@ -123,7 +123,7 @@
     channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, 
destination, connection.getFrameMax())),
         tag, queue, !noAck, exclusive,
         noLocal ? &connection : 0, &filter);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
     // Dispatch messages as there is now a consumer.
     queue->requestDispatch();
 }
@@ -138,9 +138,9 @@
     
     GetAdapter out(adapter, queue, destination, connection.getFrameMax());
     if(channel.get(out, queue, !noAck))
-        client.ok();//GRS context.getRequestId());
+        client.ok();
     else 
-        client.empty();//GRS context.getRequestId());
+        client.empty();
 }
 
 void
@@ -166,22 +166,19 @@
     //TODO: handle global
     channel.setPrefetchSize(prefetchSize);
     channel.setPrefetchCount(prefetchCount);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void
 MessageHandlerImpl::recover(bool requeue)
 {
     channel.recover(requeue);
-    client.ok();//GRS context.getRequestId());
+    client.ok();
 }
 
 void
-MessageHandlerImpl::reject(uint16_t /*code*/,
-                           const string& /*text*/ )
+MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
 {
-    //channel.ack();
-    // channel.requeue();
 }
 
 void

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Mon Jul 
23 05:29:17 2007
@@ -36,7 +36,7 @@
 
 
 void SemanticHandler::handle(framing::AMQFrame& frame) 
-{
+{    
     //TODO: assembly etc when move to 0-10 framing
     //
     //have potentially three separate tracks at this point:
@@ -56,14 +56,39 @@
     //if ready to execute (i.e. if segment is complete or frame is
     //message content):
     handleBody(frame.getBody());
-    //if the frameset is complete, we can move the execution-mark
-    //forward (not for execution controls)
-    //note: need to be more sophisticated than this if we execute
-    //commands that arrive within an active message frameset
 }
 
 //ChannelAdapter virtual methods:
 void 
SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody>
 method, 
+                                            const 
qpid::framing::MethodContext& context)
+{
+    if (!method->invoke(this)) {
+        //else do the usual:
+        handleL4(method, context);
+        //(if the frameset is complete) we can move the execution-mark
+        //forward 
+        ++(incoming.hwm);
+
+        //note: need to be more sophisticated than this if we execute
+        //commands that arrive within an active message frameset (that
+        //can't happen until 0-10 framing is implemented)
+    }
+}
+
+void SemanticHandler::complete(u_int32_t mark) 
+{
+    //just record it for now (will eventually need to use it to ack messages):
+    outgoing.lwm = SequenceNumber(mark);
+}
+
+void SemanticHandler::flush()
+{
+    //flush doubles as a sync to begin with - send an execution.complete
+    incoming.lwm = incoming.hwm;
+    send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), 
incoming.hwm.getValue())));
+}
+
+void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> 
method, 
                                             const 
qpid::framing::MethodContext& context)
 {
     try{

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Mon Jul 23 
05:29:17 2007
@@ -25,6 +25,7 @@
 #include "BrokerChannel.h"
 #include "Connection.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceNumber.h"
 
@@ -34,11 +35,18 @@
 class BrokerAdapter;
 class framing::ChannelAdapter;
 
-class SemanticHandler : private framing::ChannelAdapter, public 
framing::FrameHandler {
+class SemanticHandler : private framing::ChannelAdapter, 
+    public framing::FrameHandler, 
+    public framing::AMQP_ServerOperations::ExecutionHandler
+{
     Connection& connection;
     Channel channel;
     std::auto_ptr<BrokerAdapter> adapter;
-    framing::SequenceNumber executionMark;
+    framing::Window incoming;
+    framing::Window outgoing;
+
+    void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, 
+                               const qpid::framing::MethodContext& context);
 
     //ChannelAdapter virtual methods:
     void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> 
method, 
@@ -50,6 +58,10 @@
 public:
     SemanticHandler(framing::ChannelId id, Connection& c);
     void handle(framing::AMQFrame& frame);
+
+    //execution class method handlers:
+    void complete(u_int32_t cumulativeExecutionMark);    
+    void flush();
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp Mon 
Jul 23 05:29:17 2007
@@ -101,7 +101,7 @@
         consumers.erase(i);
     }
     if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
-        channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, 
true));
+        channel.send(make_shared_ptr(new BasicAckBody(channel.version, 
c.lastDeliveryTag, true)));
     }
     channel.sendAndReceiveSync<BasicCancelOkBody>(
         synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, 
!synch)));
@@ -119,9 +119,9 @@
         Consumer& c = i->second;
         if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
         {
-            channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, 
true));
+            channel.send(make_shared_ptr(new BasicAckBody(channel.version, 
c.lastDeliveryTag, true)));
         }
-        channel.send(new BasicCancelBody(channel.version, i->first, true));
+        channel.send(make_shared_ptr(new BasicCancelBody(channel.version, 
i->first, true)));
     }
     consumers.clear();
 }
@@ -131,8 +131,7 @@
 {
     // Prepare for incoming response
     incoming.addDestination(BASIC_GET, destGet);
-    channel.send(
-        new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+    channel.send(make_shared_ptr(new BasicGetBody(channel.version, 0, 
queue.getName(), ackMode)));
     bool got = destGet.wait(msg);
     return got;
 }
@@ -150,9 +149,7 @@
         *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
     header->setContentSize(msg.getData().size());
 
-    channel.send(
-        new BasicPublishBody(
-            channel.version, 0, e, key, mandatory, immediate));
+    channel.send(make_shared_ptr(new BasicPublishBody(channel.version, 0, e, 
key, mandatory, immediate)));
     channel.send(header);
     string data = msg.getData();
     u_int64_t data_length = data.length();
@@ -160,14 +157,14 @@
         //frame itself uses 8 bytes
         u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
         if(data_length < frag_size){
-            channel.send(new AMQContentBody(data));
+            channel.send(make_shared_ptr(new AMQContentBody(data)));
         }else{
             u_int32_t offset = 0;
             u_int32_t remaining = data_length - offset;
             while (remaining > 0) {
                 u_int32_t length = remaining > frag_size ? frag_size : 
remaining;
                 string frag(data.substr(offset, length));
-                channel.send(new AMQContentBody(frag));                        
  
+                channel.send(make_shared_ptr(new AMQContentBody(frag)));       
                   
                 
                 offset += length;
                 remaining = data_length - offset;
@@ -268,11 +265,11 @@
             //else drop-through
           case AUTO_ACK:
             consumer.lastDeliveryTag = 0;
-            channel.send(
+            channel.send(make_shared_ptr(
                 new BasicAckBody(
                     channel.version,
                     msg.getDeliveryTag(),
-                    multiple));
+                    multiple)));
           case NO_ACK:          // Nothing to do
           case CLIENT_ACK:      // User code must ack.
             break;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Mon Jul 23 
05:29:17 2007
@@ -92,10 +92,10 @@
      connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
     **/
 
-    send(new ConnectionTuneOkBody(
+    sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
              version, proposal->getRequestId(),
              proposal->getChannelMax(), connection->getMaxFrameSize(),
-             proposal->getHeartbeat()));
+             proposal->getHeartbeat())));
     
     uint16_t heartbeat = proposal->getHeartbeat();
     connection->connector->setReadTimeout(heartbeat * 2);
@@ -104,7 +104,7 @@
     // Send connection open.
     std::string capabilities;
     responses.expect();
-    send(new ConnectionOpenBody(version, vhost, capabilities, true));
+    sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, 
capabilities, true)));
     //receive connection.open-ok (or redirect, but ignore that for now
     //esp. as using force=true).
     AMQMethodBody::shared_ptr openResponse = responses.receive();
@@ -210,6 +210,7 @@
           case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
           case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
           case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+          case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
           default: throw UnknownMethod();
         }
     }
@@ -223,7 +224,7 @@
 void Channel::handleChannel(AMQMethodBody::shared_ptr method, const 
MethodContext& ctxt) {
     switch (method->amqpMethodId()) {
       case ChannelCloseBody::METHOD_ID:
-          send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
+          sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, 
ctxt.getRequestId())));
         peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
         return;
       case ChannelFlowBody::METHOD_ID:
@@ -241,6 +242,18 @@
     throw UnknownMethod();
 }
 
+void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
+    if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
+        Monitor::ScopedLock l(outgoingMonitor);
+        //record the completion mark:
+        outgoing.lwm = 
shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
+        //TODO: notify anyone waiting for completion notification:
+        outgoingMonitor.notifyAll();
+    } else{
+        throw UnknownMethod();
+    }
+}
+
 void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
     messaging->handle(body);
 }
@@ -315,7 +328,7 @@
     AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
 {
     responses.expect();
-    send(toSend);
+    sendCommand(toSend);
     return responses.receive(c, m);
 }
 
@@ -325,7 +338,7 @@
     if(sync)
         return sendAndReceive(body, c, m);
     else {
-        send(body);
+        sendCommand(body);
         return AMQMethodBody::shared_ptr();
     }
 }
@@ -360,5 +373,33 @@
 
 void Channel::run() {
     messaging->run();
+}
+
+void Channel::sendCommand(AMQBody::shared_ptr body)
+{
+    ++(outgoing.hwm);
+    send(body);
+}
+
+bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
+{
+    AbsTime end;
+    if (timeout == 0) {
+        end = AbsTime::FarFuture();
+    } else {
+        end = AbsTime(AbsTime::now(), timeout);
+    }
+
+    Monitor::ScopedLock l(outgoingMonitor);
+    while (end > AbsTime::now() && outgoing.lwm < poi) {
+        outgoingMonitor.wait(end);
+    }
+    return !(outgoing.lwm < poi);
+}
+
+bool Channel::synchWithServer(Duration timeout) 
+{
+    send(make_shared_ptr(new ExecutionFlushBody(version)));
+    return waitForCompletion(outgoing.hwm, timeout);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Mon Jul 23 
05:29:17 2007
@@ -29,6 +29,7 @@
 #include "ResponseHandler.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/Thread.h"
 #include "AckMode.h"
 
@@ -64,6 +65,8 @@
     Connection* connection;
     sys::Thread dispatcher;
     ResponseHandler responses;
+    sys::Monitor outgoingMonitor;
+    framing::Window outgoing;
 
     uint16_t prefetch;
     const bool transactional;
@@ -84,6 +87,7 @@
         framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
     void handleChannel(framing::AMQMethodBody::shared_ptr method, const 
framing::MethodContext& ctxt);
     void handleConnection(framing::AMQMethodBody::shared_ptr method);
+    void handleExecution(framing::AMQMethodBody::shared_ptr method);
 
     void setQos();
 
@@ -114,9 +118,12 @@
                 sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
     }
 
+    void sendCommand(framing::AMQBody::shared_ptr body);
+
     void open(framing::ChannelId, Connection&);
     void closeInternal();
     void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+    bool waitForCompletion(framing::SequenceNumber, sys::Duration);
     
     // FIXME aconway 2007-02-23: Get rid of friendships.
   friend class Connection;
@@ -358,7 +365,10 @@
      */
     void run();
 
-
+    /**
+     * TESTING ONLY FOR NOW!
+     */
+    bool synchWithServer(sys::Duration timeout = 0);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp Mon Jul 23 
05:29:17 2007
@@ -36,6 +36,10 @@
     THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server.");
 }
 
+bool AMQMethodBody::invoke(Invocable*) {
+    return false;
+}
+
 AMQMethodBody::shared_ptr AMQMethodBody::create(
     AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
     Buffer& buffer)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Mon Jul 23 
05:29:17 2007
@@ -52,6 +52,7 @@
     virtual ClassId  amqpClassId() const = 0;
     
     virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
+    virtual bool invoke(Invocable* target);
 
     template <class T> bool isA() {
         return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h Mon Jul 23 
05:29:17 2007
@@ -78,10 +78,6 @@
     RequestId send(shared_ptr<AMQBody> body,
                    Correlator::Action action=Correlator::Action());
 
-    // TODO aconway 2007-04-05:  remove and use make_shared_ptr at call sites.
-    /[EMAIL PROTECTED] Use make_shared_ptr with the other send() override */
-    RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
-
     virtual bool isOpen() const = 0;
     
     RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h Mon Jul 23 
05:29:17 2007
@@ -48,6 +48,12 @@
     friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
 };    
 
+struct Window 
+{
+    SequenceNumber hwm;
+    SequenceNumber lwm;
+};
+
 }} // namespace qpid::framing
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp Mon Jul 23 05:29:17 
2007
@@ -35,6 +35,7 @@
 #include "qpid/client/ClientMessage.h"
 #include "qpid/client/MessageListener.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
 
 using namespace qpid::client;
 using namespace qpid::sys;
@@ -117,6 +118,11 @@
        msg.setData(data);
        channel.publish(msg, exchange, "MyTopic");
        if (opts.trace) std::cout << "Published message: " << data << std::endl;
+        if (opts.trace) {
+            std::cout << "Publication " 
+                      << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " 
DID " : " did NOT ") 
+                      << "complete"  << std::endl;
+        }
 
        {
             Monitor::ScopedLock l(monitor);

Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Mon Jul 23 05:29:17 2007
@@ -140,6 +140,9 @@
   def connection_close(self, ch, msg):
     self.client.peer.close(msg)
 
+  def execution_complete(self, ch, msg):
+    ch.completion.complete(msg.cumulative_execution_mark)
+
   def close(self, reason):
     self.client.closed = True
     self.client.reason = reason

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Mon Jul 23 05:29:17 2007
@@ -186,6 +186,8 @@
     self.requester = Requester(self.write)
     self.responder = Responder(self.write)
 
+    self.completion = ExecutionCompletion()
+
     # Use reliable framing if version == 0-9.
     self.reliable = (spec.major == 0 and spec.minor == 9)
     self.synchronous = True
@@ -247,6 +249,7 @@
     self.responder.respond(method, batch, request)
 
   def invoke(self, type, args, kwargs):
+    self.completion.next_command(type)
     content = kwargs.pop("content", None)
     frame = Method(type, type.arguments(*args, **kwargs))
     if self.reliable:
@@ -337,3 +340,30 @@
 
   def is_complete(self):
     return self.completed.isSet()
+
+class ExecutionCompletion:
+  def __init__(self):
+    self.completed = threading.Event()
+    self.sequence = Sequence(0)
+    self.command_id = 0
+    self.mark = 0
+
+  def next_command(self, method):
+    #the following test is a hack until the track/sub-channel is available
+    if method.klass.name != "execution":
+      self.command_id = self.sequence.next()
+
+  def complete(self, mark):
+    self.mark = mark
+    self.completed.set()    
+    self.completed.clear()    
+
+  def wait(self, point_of_interest=-1, timeout=None):
+    """
+    todo: really want to allow different threads to call this with
+    different points of interest on the same channel, this is a quick
+    hack for now
+    """
+    if point_of_interest == -1: point_of_interest = self.command_id
+    self.completed.wait(timeout)
+    return point_of_interest <= self.mark

Modified: incubator/qpid/trunk/qpid/python/tests_0-9/basic.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/basic.py?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/basic.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/basic.py Mon Jul 23 05:29:17 2007
@@ -127,7 +127,7 @@
         channel.basic_publish(routing_key="test-queue-4", 
content=Content("One"))
 
         myqueue = self.client.queue("my-consumer")
-        msg = myqueue.get(timeout=5)
+        msg = myqueue.get(timeout=1)
         self.assertEqual("One", msg.content.body)
        
         #cancel should stop messages being delivered

Modified: incubator/qpid/trunk/qpid/specs/amqp-dtx-preview.0-9.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/amqp-dtx-preview.0-9.xml?view=diff&rev=558700&r1=558699&r2=558700
==============================================================================
--- incubator/qpid/trunk/qpid/specs/amqp-dtx-preview.0-9.xml (original)
+++ incubator/qpid/trunk/qpid/specs/amqp-dtx-preview.0-9.xml Mon Jul 23 
05:29:17 2007
@@ -1040,4 +1040,38 @@
     </method>
   </class>
 
+  <class name="execution" handler="execution" index="140">
+    <doc>
+      This class allows for efficiently communicating information
+      about completion of processing.  
+    </doc>
+
+    <chassis name="server" implement="MUST"/>
+    <chassis name="client" implement="MUST"/>
+
+    <method name="flush" index="10" label="request an execution.complete 
return method">
+      <chassis name="server" implement="MUST"/>
+      <chassis name="client" implement="MUST"/>
+    </method>
+
+    <method name="complete" index="20">
+      <chassis name="server" implement="MUST"/>
+      <chassis name="client" implement="MUST"/>
+
+      
+      <field name="cumulative-execution-mark" domain="long" label="Low-water 
mark for command ids">
+        <doc>
+          The low-water mark for executed command-ids. All ids below this mark 
have been executed;
+          above this mark, there are gaps containing unexecuted command ids 
(i.e. discontinuous). By
+          definition, the first id above this mark (if it exists) is an 
unexecuted command-id.
+        </doc>
+      </field>
+
+
+      <!-- The ranged mark on the complete method has been temporarily removed 
-->
+    </method>
+
+  </class>
+
+
 </amqp>


Reply via email to