Author: astitcher
Date: Thu Feb 8 16:52:46 2007
New Revision: 505108
URL: http://svn.apache.org/viewvc?view=rev&rev=505108
Log:
[EMAIL PROTECTED]: andrew | 2007-02-09 00:52:04 +0000
Got ack working for the non batched case
Small tidy up in broker Channel
Modified:
incubator/qpid/branches/qpid.0-9/ (props changed)
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Thu Feb 8 16:52:46 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1091
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1102
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Thu Feb
8 16:52:46 2007
@@ -280,7 +280,7 @@
BasicMessage* msg = new BasicMessage(
&connection, exchangeName, routingKey, mandatory, immediate,
context.methodBody);
- channel.handlePublish(msg, exchange);
+ channel.handlePublish(msg);
}else{
throw ChannelException(
404, "Exchange not found '" + exchangeName + "'");
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Thu Feb
8 16:52:46 2007
@@ -140,7 +140,9 @@
{
Mutex::ScopedLock locker(deliveryLock);
- u_int64_t deliveryTag = currentDeliveryTag++;
+ // Key the delivered messages to the id of the request in which they're
sent
+ u_int64_t deliveryTag = getNextSendRequestId();
+
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, consumerTag,
deliveryTag));
outstanding.size += msg->contentSize();
@@ -188,24 +190,24 @@
if(blocked) queue->dispatch();
}
-void Channel::handleInlineTransfer(
- Message::shared_ptr msg, Exchange::shared_ptr& exch)
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
{
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
- exch->route(*deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
+ exchange->route(*deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
- exch->route(deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
+ exchange->route(deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
}
}
// FIXME aconway 2007-02-05: Drop exchange member, calculate from
// message in ::complete().
-void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
+void Channel::handlePublish(Message* _message){
Message::shared_ptr message(_message);
- exchange = _exchange;
messageBuilder.initialise(message);
}
@@ -237,6 +239,11 @@
exchange->route(deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
}
+}
+
+// TODO astitcher 2007-02-08 This only deals correctly with non batched
responses
+void Channel::ack(){
+ ack(getRequestInProgress(), false);
}
void Channel::ack(u_int64_t deliveryTag, bool multiple){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Thu Feb 8
16:52:46 2007
@@ -91,7 +91,6 @@
AccumulatedAck accumulatedAck;
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
- Exchange::shared_ptr exchange;//exchange to which any in-progress message
was published to
bool opened;
boost::scoped_ptr<BrokerAdapter> adapter;
@@ -131,15 +130,16 @@
void close();
void commit();
void rollback();
+ void ack();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag,
u_int64_t deliveryTag);
- void handlePublish(Message* msg, Exchange::shared_ptr exchange);
+ void handlePublish(Message* msg);
void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
- void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr&
exchange);
+ void handleInlineTransfer(Message::shared_ptr msg);
// For ChannelAdapter
void handleMethodInContext(
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Thu
Feb 8 16:52:46 2007
@@ -101,8 +101,9 @@
void
MessageHandlerImpl::empty( const MethodContext& )
{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+ // Shouldn't ever receive this as it is a response to get
+ // TODO astitcher 2007-02-09 What is the correct exception to throw here?
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
}
void
@@ -112,12 +113,9 @@
const string& /*destination*/,
bool noAck )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
Queue::shared_ptr queue =
connection.getQueue(queueName, context.channel->getId());
- // FIXME: get is probably Basic specific
if(channel.get(queue, !noAck))
client.ok(context);
else
@@ -133,9 +131,9 @@
}
void
-MessageHandlerImpl::ok( const MethodContext& )
+MessageHandlerImpl::ok(const MethodContext& /*context*/)
{
- // TODO: Need to ack the transfers acknowledged so far for flow control
purp oses
+ channel.ack();
}
void
@@ -162,7 +160,6 @@
MessageHandlerImpl::recover(const MethodContext& context,
bool requeue)
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented");
channel.recover(requeue);
client.ok(context);
}
@@ -188,7 +185,7 @@
void
MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& destination,
+ const string& /* destination */,
bool /*redelivered*/,
bool /*immediate*/,
u_int64_t /*ttl*/,
@@ -211,8 +208,6 @@
qpid::framing::Content body,
bool /*mandatory*/)
{
- Exchange::shared_ptr exchange(
- broker.getExchanges().get(destination));
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
@@ -220,7 +215,7 @@
new MessageMessage(&connection, transfer));
if (body.isInline())
- channel.handleInlineTransfer(message, exchange);
+ channel.handleInlineTransfer(message);
else
references.get(body.getValue()).addMessage(message);
client.ok(context);
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
Thu Feb 8 16:52:46 2007
@@ -55,7 +55,9 @@
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
- responder.received(request->getData());
+ AMQRequestBody::Data& requestData = request->getData();
+ responder.received(requestData);
+ requestInProgress = requestData.requestId;
handleMethodInContext(request, MethodContext(this, request));
}
@@ -63,7 +65,10 @@
assertMethodOk(*response);
// TODO aconway 2007-01-30: Consider a response handled on receipt.
// Review - any cases where this is not the case?
- requester.processed(response->getData());
+ AMQResponseBody::Data& responseData = response->getData();
+ requester.processed(responseData);
+ // For a response this is taken to be the request being responded to (for
convenience)
+ requestInProgress = responseData.requestId;
handleMethod(response);
}
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
Thu Feb 8 16:52:46 2007
@@ -87,6 +87,7 @@
const MethodContext& context) = 0;
RequestId getRequestInProgress() { return requestInProgress; }
+ RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
ChannelId id;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Thu Feb
8 16:52:46 2007
@@ -46,6 +46,9 @@
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
+ /** Get the next id to be used. */
+ RequestId getNextId() { return lastId + 1; }
+
private:
std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Thu Feb 8
16:52:46 2007
@@ -264,7 +264,7 @@
contentSize += data[i].size();
}
header->setContentSize(contentSize);
- channel.handlePublish(msg, exchange);
+ channel.handlePublish(msg);
channel.handleHeader(header);
for (int i = 0; i < 3; i++) {