Author: astitcher
Date: Thu Feb  8 16:52:46 2007
New Revision: 505108

URL: http://svn.apache.org/viewvc?view=rev&rev=505108
Log:
 [EMAIL PROTECTED]:  andrew | 2007-02-09 00:52:04 +0000
 Got ack working for the non batched case
 Small tidy up in broker Channel

Modified:
    incubator/qpid/branches/qpid.0-9/   (props changed)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp

Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Thu Feb  8 16:52:46 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1091
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1102

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Thu Feb  
8 16:52:46 2007
@@ -280,7 +280,7 @@
         BasicMessage* msg = new BasicMessage(
             &connection, exchangeName, routingKey, mandatory, immediate,
             context.methodBody);
-        channel.handlePublish(msg, exchange);
+        channel.handlePublish(msg);
     }else{
         throw ChannelException(
             404, "Exchange not found '" + exchangeName + "'");

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Thu Feb  
8 16:52:46 2007
@@ -140,7 +140,9 @@
 {
     Mutex::ScopedLock locker(deliveryLock);
 
-    u_int64_t deliveryTag = currentDeliveryTag++;
+       // Key the delivered messages to the id of the request in which they're 
sent 
+    u_int64_t deliveryTag = getNextSendRequestId();
+    
     if(ackExpected){
         unacked.push_back(DeliveryRecord(msg, queue, consumerTag, 
deliveryTag));
         outstanding.size += msg->contentSize();
@@ -188,24 +190,24 @@
     if(blocked) queue->dispatch();
 }
 
-void Channel::handleInlineTransfer(
-    Message::shared_ptr msg, Exchange::shared_ptr& exch)
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
 {
+    Exchange::shared_ptr exchange =
+        connection.broker.getExchanges().get(msg->getExchange());
     if(transactional){
         TxPublish* deliverable = new TxPublish(msg);
-        exch->route(*deliverable, msg->getRoutingKey(), 
&(msg->getApplicationHeaders()));
+        exchange->route(*deliverable, msg->getRoutingKey(), 
&(msg->getApplicationHeaders()));
         txBuffer.enlist(new DeletingTxOp(deliverable));
     }else{
         DeliverableMessage deliverable(msg);
-        exch->route(deliverable, msg->getRoutingKey(), 
&(msg->getApplicationHeaders()));
+        exchange->route(deliverable, msg->getRoutingKey(), 
&(msg->getApplicationHeaders()));
     }
 }
 
 // FIXME aconway 2007-02-05: Drop exchange member, calculate from
 // message in ::complete().
-void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
+void Channel::handlePublish(Message* _message){
     Message::shared_ptr message(_message);
-    exchange = _exchange;
     messageBuilder.initialise(message);
 }
 
@@ -237,6 +239,11 @@
         exchange->route(deliverable, msg->getRoutingKey(),
                         &(msg->getApplicationHeaders()));
     }
+}
+
+// TODO astitcher 2007-02-08 This only deals correctly with non batched 
responses
+void Channel::ack(){
+       ack(getRequestInProgress(), false);
 }
 
 void Channel::ack(u_int64_t deliveryTag, bool multiple){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Thu Feb  8 
16:52:46 2007
@@ -91,7 +91,6 @@
     AccumulatedAck accumulatedAck;
     MessageStore* const store;
     MessageBuilder messageBuilder;//builder for in-progress message
-    Exchange::shared_ptr exchange;//exchange to which any in-progress message 
was published to
     bool opened;
 
     boost::scoped_ptr<BrokerAdapter> adapter;
@@ -131,15 +130,16 @@
     void close();
     void commit();
     void rollback();
+    void ack();
     void ack(u_int64_t deliveryTag, bool multiple);
     void recover(bool requeue);
     void deliver(Message::shared_ptr& msg, const string& consumerTag, 
u_int64_t deliveryTag);            
-    void handlePublish(Message* msg, Exchange::shared_ptr exchange);
+    void handlePublish(Message* msg);
     void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
     void handleContent(boost::shared_ptr<framing::AMQContentBody>);
     void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
     
-    void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& 
exchange);
+    void handleInlineTransfer(Message::shared_ptr msg);
     
     // For ChannelAdapter
     void handleMethodInContext(

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Thu 
Feb  8 16:52:46 2007
@@ -101,8 +101,9 @@
 void
 MessageHandlerImpl::empty( const MethodContext& )
 {
-    // FIXME astitcher 2007-01-11: 0-9 feature
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+    // Shouldn't ever receive this as it is a response to get
+    // TODO astitcher 2007-02-09 What is the correct exception to throw here?
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
 }
 
 void
@@ -112,12 +113,9 @@
                          const string& /*destination*/,
                          bool noAck )
 {
-    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
     Queue::shared_ptr queue =
         connection.getQueue(queueName, context.channel->getId());
     
-    // FIXME: get is probably Basic specific
     if(channel.get(queue, !noAck))
         client.ok(context);
     else 
@@ -133,9 +131,9 @@
 }
 
 void
-MessageHandlerImpl::ok( const MethodContext& )
+MessageHandlerImpl::ok(const MethodContext& /*context*/)
 {
-    // TODO: Need to ack the transfers acknowledged so far for flow control 
purp oses
+    channel.ack();
 }
 
 void
@@ -162,7 +160,6 @@
 MessageHandlerImpl::recover(const MethodContext& context,
                             bool requeue)
 {
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented");
     channel.recover(requeue);
     client.ok(context);
 }
@@ -188,7 +185,7 @@
 void
 MessageHandlerImpl::transfer(const MethodContext& context,
                              u_int16_t /*ticket*/,
-                             const string& destination,
+                             const string& /* destination */,
                              bool /*redelivered*/,
                              bool /*immediate*/,
                              u_int64_t /*ttl*/,
@@ -211,8 +208,6 @@
                              qpid::framing::Content body,
                              bool /*mandatory*/)
 {
-    Exchange::shared_ptr exchange(
-        broker.getExchanges().get(destination)); 
     MessageTransferBody::shared_ptr transfer(
         boost::shared_polymorphic_downcast<MessageTransferBody>(
             context.methodBody));
@@ -220,7 +215,7 @@
         new MessageMessage(&connection, transfer));
     
     if (body.isInline()) 
-        channel.handleInlineTransfer(message, exchange);
+        channel.handleInlineTransfer(message);
     else 
         references.get(body.getValue()).addMessage(message);
     client.ok(context);

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.cpp 
Thu Feb  8 16:52:46 2007
@@ -55,7 +55,9 @@
 
 void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
     assertMethodOk(*request);
-    responder.received(request->getData());
+    AMQRequestBody::Data& requestData = request->getData();
+    responder.received(requestData);
+    requestInProgress = requestData.requestId;
     handleMethodInContext(request, MethodContext(this, request));
 }
 
@@ -63,7 +65,10 @@
     assertMethodOk(*response);
     // TODO aconway 2007-01-30: Consider a response handled on receipt.
     // Review - any cases where this is not the case?
-    requester.processed(response->getData());
+    AMQResponseBody::Data& responseData = response->getData();
+    requester.processed(responseData);
+    // For a response this is taken to be the request being responded to (for 
convenience)
+    requestInProgress = responseData.requestId;
     handleMethod(response);
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h 
Thu Feb  8 16:52:46 2007
@@ -87,6 +87,7 @@
         const MethodContext& context) = 0;
 
     RequestId getRequestInProgress() { return requestInProgress; }
+    RequestId getNextSendRequestId() { return requester.getNextId(); }
 
   private:
     ChannelId id;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Thu Feb 
 8 16:52:46 2007
@@ -46,6 +46,9 @@
     /** Called after processing a response. */
     void processed(const AMQResponseBody::Data&);
 
+       /** Get the next id to be used. */
+       RequestId getNextId() { return lastId + 1; }
+
   private:
     std::set<RequestId> requests; /** Sent but not responded to */
     RequestId lastId;

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp?view=diff&rev=505108&r1=505107&r2=505108
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ChannelTest.cpp Thu Feb  8 
16:52:46 2007
@@ -264,7 +264,7 @@
             contentSize += data[i].size();
         }
         header->setContentSize(contentSize);
-        channel.handlePublish(msg, exchange);
+        channel.handlePublish(msg);
         channel.handleHeader(header);
 
         for (int i = 0; i < 3; i++) {


Reply via email to