Author: astitcher
Date: Tue Feb 13 13:52:30 2007
New Revision: 507249
URL: http://svn.apache.org/viewvc?view=rev&rev=507249
Log:
[EMAIL PROTECTED]: andrew | 2007-02-09 15:51:10 +0000
Removed currently unused request tracking logic
[EMAIL PROTECTED]: andrew | 2007-02-13 21:51:30 +0000
Implemented receiveing batched Message.ok in c++ broker
Implemented batched response frames in python client code
Modified:
incubator/qpid/branches/qpid.0-9/ (props changed)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
incubator/qpid/branches/qpid.0-9/python/qpid/message.py
incubator/qpid/branches/qpid.0-9/python/qpid/peer.py
incubator/qpid/branches/qpid.0-9/python/tests/message.py
Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Tue Feb 13 13:52:30 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1104
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1125
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.cpp Tue Feb
13 13:52:30 2007
@@ -24,12 +24,12 @@
using std::bind2nd;
using namespace qpid::broker;
-void AccumulatedAck::update(u_int64_t tag, bool multiple){
- if(multiple){
- if(tag > range) range = tag;
- //else don't care, it is already counted
- }else if(tag > range){
- individual.push_back(tag);
+void AccumulatedAck::update(u_int64_t firstTag, u_int64_t lastTag){
+ if (firstTag-1 == range) {
+ range = lastTag;
+ } else {
+ for (u_int64_t tag = firstTag; tag<=lastTag; tag++)
+ individual.push_back(tag);
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/AccumulatedAck.h Tue Feb 13
13:52:30 2007
@@ -43,7 +43,7 @@
*/
std::list<u_int64_t> individual;
- void update(u_int64_t tag, bool multiple);
+ void update(u_int64_t firstTag, u_int64_t lastTag);
void consolidate();
void clear();
bool covers(u_int64_t tag) const;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb
13 13:52:30 2007
@@ -326,11 +326,7 @@
}
void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t
deliveryTag, bool multiple){
- try{
- channel.ack(deliveryTag, multiple);
- }catch(InvalidAckException& e){
- throw ConnectionException(530, "Received ack for unrecognised delivery
tag");
- }
+ channel.ack(deliveryTag, multiple);
}
void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t
/*deliveryTag*/, bool /*requeue*/){}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb
13 13:52:30 2007
@@ -235,24 +235,37 @@
// TODO astitcher 2007-02-08 This only deals correctly with non batched
responses
void Channel::ack(){
- ack(getRequestInProgress(), false);
+ ack(getFirstAckRequest(), getLastAckRequest());
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple)
-{
+// Used by Basic
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){
if(transactional){
- accumulatedAck.update(deliveryTag, multiple);
+ //FIXME astitcher This only works for Basic style acks
+ accumulatedAck.update(lastTag, lastTag);
+
//TODO: I think the outstanding prefetch size & count should be
updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
}else{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with
possible concurrent delivery
- ack_iterator i = find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ ack_iterator i = find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+
if(i == unacked.end()){
- throw InvalidAckException();
- }else if(multiple){
+ throw ConnectionException(530, "Received ack for unrecognised
delivery tag");
+ }else if(i!=j){
ack_iterator end = ++i;
- for_each(unacked.begin(), end,
mem_fun_ref(&DeliveryRecord::discard));
+ for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Feb 13
13:52:30 2007
@@ -138,6 +138,7 @@
void rollback();
void ack();
void ack(u_int64_t deliveryTag, bool multiple);
+ void ack(u_int64_t deliveryTag, u_int64_t endTag);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag,
u_int64_t deliveryTag);
void handlePublish(Message* msg);
@@ -152,8 +153,6 @@
boost::shared_ptr<framing::AMQMethodBody> method,
const framing::MethodContext& context);
};
-
-struct InvalidAckException{};
}} // namespace broker
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
Tue Feb 13 13:52:30 2007
@@ -61,7 +61,6 @@
assertMethodOk(*request);
AMQRequestBody::Data& requestData = request->getData();
responder.received(requestData);
- requestInProgress = requestData.requestId;
handleMethodInContext(request, MethodContext(this, request));
}
@@ -71,8 +70,6 @@
// Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
requester.processed(responseData);
- // For a response this is taken to be the request being responded to (for
convenience)
- requestInProgress = responseData.requestId;
handleMethod(response);
}
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
Tue Feb 13 13:52:30 2007
@@ -85,7 +85,8 @@
boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const MethodContext& context) = 0;
- RequestId getRequestInProgress() { return requestInProgress; }
+ RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+ RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
@@ -94,7 +95,6 @@
ProtocolVersion version;
Requester requester;
Responder responder;
- RequestId requestInProgress;
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp Tue
Feb 13 13:52:30 2007
@@ -29,23 +29,12 @@
void Requester::sending(AMQRequestBody::Data& request) {
request.requestId = ++lastId;
request.responseMark = responseMark;
- requests.insert(request.requestId);
}
void Requester::processed(const AMQResponseBody::Data& response) {
responseMark = response.responseId;
- RequestId id = response.requestId;
- RequestId end = id + response.batchOffset + 1;
- for ( ; id < end; ++id) {
- std::set<RequestId>::iterator i = requests.find(id);
- if (i != requests.end())
- requests.erase(i);
- else {
- THROW_QPID_ERROR(
- PROTOCOL_ERROR,
- boost::format("Response with non-existent request id=%d")%id);
- }
- }
+ firstAckRequest = response.requestId;
+ lastAckRequest = firstAckRequest + response.batchOffset;
}
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Tue Feb
13 13:52:30 2007
@@ -46,13 +46,18 @@
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
- /** Get the next id to be used. */
+ /** Get the next request id to be used. */
RequestId getNextId() { return lastId + 1; }
+ /** Get the first request acked by this response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+ /** Get the last request acked by this response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
private:
- std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;
ResponseId responseMark;
+ ResponseId firstAckRequest;
+ ResponseId lastAckRequest;
};
}} // namespace qpid::framing
Modified: incubator/qpid/branches/qpid.0-9/python/qpid/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/message.py?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/message.py Tue Feb 13 13:52:30
2007
@@ -47,9 +47,12 @@
else:
for r in self.method.responses:
if attr == r.name:
- result = lambda *args, **kwargs: \
- self.channel.respond(Method(r, r.arguments(*args,
**kwargs)),
- self.frame)
+ def respond(*args, **kwargs):
+ batch=0
+ if kwargs.has_key("batchoffset"):
+ batch=kwargs.pop("batchoffset")
+ self.channel.respond(Method(r, r.arguments(*args, **kwargs)),
batch, self.frame)
+ result = respond
break
else:
raise AttributeError(attr)
Modified: incubator/qpid/branches/qpid.0-9/python/qpid/peer.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/peer.py?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/peer.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/peer.py Tue Feb 13 13:52:30
2007
@@ -155,12 +155,15 @@
self.write = writer
self.sequence = Sequence(1)
- def respond(self, method, request):
+ def respond(self, method, batch, request):
if isinstance(request, Method):
self.write(method)
else:
- # XXX: batching
- frame = Response(self.sequence.next(), request.id, 0, method)
+ # allow batching from frame at either end
+ if batch<0:
+ frame = Response(self.sequence.next(), request.id+batch, -batch,
method)
+ else:
+ frame = Response(self.sequence.next(), request.id, batch, method)
self.write(frame)
class Closed(Exception): pass
@@ -237,8 +240,8 @@
def request(self, method, listener, content = None):
self.requester.request(method, listener, content)
- def respond(self, method, request):
- self.responder.respond(method, request)
+ def respond(self, method, batch, request):
+ self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
content = kwargs.pop("content", None)
Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=507249&r1=507248&r2=507249
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Tue Feb 13
13:52:30 2007
@@ -384,15 +384,11 @@
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- # TODO: replace with below when we have batching
- if(i in [11, 12, 13, 15, 17, 19]):
+
+ if (i==13):
+ msg.ok(batchoffset=-2)
+ if(i in [15, 17, 19]):
msg.ok()
-
- #todo: when batching is available, test ack multiple
- #if(i == 13):
- # channel.message_ack(delivery_tag=reply.delivery_tag,
multiple=True)
- #if(i in [15, 17, 19]):
- # channel.message_ack(delivery_tag=reply.delivery_tag)
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")