Author: astitcher
Date: Tue Feb 13 13:52:30 2007
New Revision: 507249

URL: http://svn.apache.org/viewvc?view=rev&rev=507249
Log:
 [EMAIL PROTECTED]:  andrew | 2007-02-09 15:51:10 +0000
 Removed currently unused request tracking logic
 [EMAIL PROTECTED]:  andrew | 2007-02-13 21:51:30 +0000
 Implemented receiveing batched Message.ok in c++ broker
 Implemented batched response frames in python client code

Modified:
    incubator/qpid/branches/qpid.0-9/   (props changed)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
    incubator/qpid/branches/qpid.0-9/python/qpid/message.py
    incubator/qpid/branches/qpid.0-9/python/qpid/peer.py
    incubator/qpid/branches/qpid.0-9/python/tests/message.py

Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Tue Feb 13 13:52:30 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1104
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1125

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp Tue Feb 
13 13:52:30 2007
@@ -24,12 +24,12 @@
 using std::bind2nd;
 using namespace qpid::broker;
 
-void AccumulatedAck::update(u_int64_t tag, bool multiple){
-    if(multiple){
-        if(tag > range) range = tag;
-        //else don't care, it is already counted
-    }else if(tag > range){
-        individual.push_back(tag);
+void AccumulatedAck::update(u_int64_t firstTag, u_int64_t lastTag){
+    if (firstTag-1 == range) {
+       range = lastTag;
+    } else {
+       for (u_int64_t tag = firstTag; tag<=lastTag; tag++)
+               individual.push_back(tag);
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h Tue Feb 13 
13:52:30 2007
@@ -43,7 +43,7 @@
              */
             std::list<u_int64_t> individual;
 
-            void update(u_int64_t tag, bool multiple);
+            void update(u_int64_t firstTag, u_int64_t lastTag);
             void consolidate();
             void clear();
             bool covers(u_int64_t tag) const;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb 
13 13:52:30 2007
@@ -326,11 +326,7 @@
 } 
         
 void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t 
deliveryTag, bool multiple){
-    try{
-        channel.ack(deliveryTag, multiple);
-    }catch(InvalidAckException& e){
-        throw ConnectionException(530, "Received ack for unrecognised delivery 
tag");
-    }
+       channel.ack(deliveryTag, multiple);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t 
/*deliveryTag*/, bool /*requeue*/){} 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb 
13 13:52:30 2007
@@ -235,24 +235,37 @@
 
 // TODO astitcher 2007-02-08 This only deals correctly with non batched 
responses
 void Channel::ack(){
-       ack(getRequestInProgress(), false);
+       ack(getFirstAckRequest(), getLastAckRequest());
 }
 
-void Channel::ack(u_int64_t deliveryTag, bool multiple)
-{
+// Used by Basic
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+       if (multiple)
+               ack(0, deliveryTag);
+       else
+               ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){
     if(transactional){
-        accumulatedAck.update(deliveryTag, multiple);
+       //FIXME astitcher This only works for Basic style acks
+        accumulatedAck.update(lastTag, lastTag);
+
         //TODO: I think the outstanding prefetch size & count should be 
updated at this point...
         //TODO: ...this may then necessitate dispatching to consumers
     }else{
         Mutex::ScopedLock locker(deliveryLock);//need to synchronize with 
possible concurrent delivery
     
-        ack_iterator i = find_if(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+        ack_iterator i = find_if(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+               ack_iterator j = (firstTag == 0) ?
+                       unacked.begin() :
+               find_if(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+               
         if(i == unacked.end()){
-            throw InvalidAckException();
-        }else if(multiple){     
+            throw ConnectionException(530, "Received ack for unrecognised 
delivery tag");
+        }else if(i!=j){
             ack_iterator end = ++i;
-            for_each(unacked.begin(), end, 
mem_fun_ref(&DeliveryRecord::discard));
+            for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
             unacked.erase(unacked.begin(), end);
 
             //recalculate the prefetch:

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Feb 13 
13:52:30 2007
@@ -138,6 +138,7 @@
     void rollback();
     void ack();
     void ack(u_int64_t deliveryTag, bool multiple);
+    void ack(u_int64_t deliveryTag, u_int64_t endTag);
     void recover(bool requeue);
     void deliver(Message::shared_ptr& msg, const string& consumerTag, 
u_int64_t deliveryTag);            
     void handlePublish(Message* msg);
@@ -152,8 +153,6 @@
         boost::shared_ptr<framing::AMQMethodBody> method,
         const framing::MethodContext& context);
 };
-
-struct InvalidAckException{};
 
 }} // namespace broker
 

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp 
Tue Feb 13 13:52:30 2007
@@ -61,7 +61,6 @@
     assertMethodOk(*request);
     AMQRequestBody::Data& requestData = request->getData();
     responder.received(requestData);
-    requestInProgress = requestData.requestId;
     handleMethodInContext(request, MethodContext(this, request));
 }
 
@@ -71,8 +70,6 @@
     // Review - any cases where this is not the case?
     AMQResponseBody::Data& responseData = response->getData();
     requester.processed(responseData);
-    // For a response this is taken to be the request being responded to (for 
convenience)
-    requestInProgress = responseData.requestId;
     handleMethod(response);
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h 
Tue Feb 13 13:52:30 2007
@@ -85,7 +85,8 @@
         boost::shared_ptr<qpid::framing::AMQMethodBody> method,
         const MethodContext& context) = 0;
 
-    RequestId getRequestInProgress() { return requestInProgress; }
+    RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+    RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
     RequestId getNextSendRequestId() { return requester.getNextId(); }
 
   private:
@@ -94,7 +95,6 @@
     ProtocolVersion version;
     Requester requester;
     Responder responder;
-    RequestId requestInProgress; 
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp Tue 
Feb 13 13:52:30 2007
@@ -29,23 +29,12 @@
 void Requester::sending(AMQRequestBody::Data& request) {
     request.requestId = ++lastId;
     request.responseMark = responseMark;
-    requests.insert(request.requestId);
 }
 
 void Requester::processed(const AMQResponseBody::Data& response) {
     responseMark = response.responseId;
-    RequestId id = response.requestId;
-    RequestId end = id + response.batchOffset + 1;
-    for ( ; id < end; ++id) {
-        std::set<RequestId>::iterator i = requests.find(id);
-        if (i != requests.end())
-            requests.erase(i);
-        else {
-            THROW_QPID_ERROR(
-                PROTOCOL_ERROR,
-                boost::format("Response with non-existent request id=%d")%id);
-        }
-    }
+    firstAckRequest = response.requestId;
+    lastAckRequest = firstAckRequest + response.batchOffset;
 }
 
 }} // namespace qpid::framing

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Tue Feb 
13 13:52:30 2007
@@ -46,13 +46,18 @@
     /** Called after processing a response. */
     void processed(const AMQResponseBody::Data&);
 
-       /** Get the next id to be used. */
+       /** Get the next request id to be used. */
        RequestId getNextId() { return lastId + 1; }
+       /** Get the first request acked by this response */
+       RequestId getFirstAckRequest() { return firstAckRequest; }
+       /** Get the last request acked by this response */
+       RequestId getLastAckRequest() { return lastAckRequest; }
 
   private:
-    std::set<RequestId> requests; /** Sent but not responded to */
     RequestId lastId;
     ResponseId responseMark;
+    ResponseId firstAckRequest;
+    ResponseId lastAckRequest;
 };
 
 }} // namespace qpid::framing

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/message.py?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/message.py Tue Feb 13 13:52:30 
2007
@@ -47,9 +47,12 @@
     else:
       for r in self.method.responses:
         if attr == r.name:
-          result = lambda *args, **kwargs: \
-                   self.channel.respond(Method(r, r.arguments(*args, 
**kwargs)),
-                                        self.frame)
+          def respond(*args, **kwargs):
+            batch=0
+            if kwargs.has_key("batchoffset"):
+              batch=kwargs.pop("batchoffset")
+            self.channel.respond(Method(r, r.arguments(*args, **kwargs)), 
batch, self.frame)
+          result = respond
           break
       else:
         raise AttributeError(attr)

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/peer.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/peer.py?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/peer.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/peer.py Tue Feb 13 13:52:30 
2007
@@ -155,12 +155,15 @@
     self.write = writer
     self.sequence = Sequence(1)
 
-  def respond(self, method, request):
+  def respond(self, method, batch, request):
     if isinstance(request, Method):
       self.write(method)
     else:
-      # XXX: batching
-      frame = Response(self.sequence.next(), request.id, 0, method)
+      # allow batching from frame at either end
+      if batch<0:
+        frame = Response(self.sequence.next(), request.id+batch, -batch, 
method)
+      else:
+        frame = Response(self.sequence.next(), request.id, batch, method)
       self.write(frame)
 
 class Closed(Exception): pass
@@ -237,8 +240,8 @@
   def request(self, method, listener, content = None):
     self.requester.request(method, listener, content)
 
-  def respond(self, method, request):
-    self.responder.respond(method, request)
+  def respond(self, method, batch, request):
+    self.responder.respond(method, batch, request)
 
   def invoke(self, type, args, kwargs):
     content = kwargs.pop("content", None)

Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Tue Feb 13 
13:52:30 2007
@@ -384,15 +384,11 @@
             self.assertEqual(reply.method.name, "ok")
             msg = self.client.queue(tag).get(timeout=1)
             self.assertEqual("Message %d" % i, msg.body)
-            # TODO: replace with below when we have batching
-            if(i in [11, 12, 13, 15, 17, 19]):
+            
+            if (i==13):
+              msg.ok(batchoffset=-2)
+            if(i in [15, 17, 19]):
               msg.ok()
-
-            #todo: when batching is available, test ack multiple
-            #if(i == 13):
-            #    channel.message_ack(delivery_tag=reply.delivery_tag, 
multiple=True)
-            #if(i in [15, 17, 19]):
-            #    channel.message_ack(delivery_tag=reply.delivery_tag)
 
         reply = channel.message_get(no_ack=True, queue="test-get")
         self.assertEqual(reply.method.klass.name, "message")


Reply via email to