Author: gsim
Date: Wed May  9 10:00:32 2007
New Revision: 536584

URL: http://svn.apache.org/viewvc?view=rev&rev=536584
Log:
* Added support for channel.flow:
    cpp/tests/ChannelTest.cpp
    cpp/lib/broker/SessionHandlerImpl.cpp
    cpp/lib/broker/BrokerChannel.h
    cpp/lib/broker/BrokerChannel.cpp

* Fixed client connection closing process:
    cpp/lib/common/sys/apr/Socket.cpp
    cpp/lib/client/Connector.h
    cpp/lib/client/Connector.cpp
    cpp/lib/client/Connection.h
    cpp/lib/client/Connection.cpp

* Use amq.direct rather than default exchange in P2P test 
  (to interop with java)
    cpp/tests/BasicP2Ptest.h


Modified:
    incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp
    incubator/qpid/branches/M2/cpp/lib/client/Connection.h
    incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp
    incubator/qpid/branches/M2/cpp/lib/client/Connector.h
    incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp
    incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h
    incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp

Modified: incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp Wed May  9 
10:00:32 2007
@@ -43,7 +43,8 @@
     accumulatedAck(0),
     store(_store),
     messageBuilder(this, _store, _stagingThreshold),
-    version(_version){
+    version(_version),
+    flowActive(true){
 
     outstanding.reset();
 }
@@ -142,7 +143,7 @@
 
 bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
     if(!connection || connection != msg->getPublisher()){//check for no_local
-        if(ackExpected && !parent->checkPrefetch(msg)){
+        if(!parent->flowActive || (ackExpected && 
!parent->checkPrefetch(msg))){
             blocked = true;
         }else{
             blocked = false;
@@ -256,4 +257,16 @@
 
 void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, 
u_int64_t deliveryTag){
     msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+}
+
+void Channel::flow(bool active){
+    Mutex::ScopedLock locker(deliveryLock);
+    bool requestDelivery(!flowActive && active);
+    flowActive = active;
+    if (requestDelivery) {
+        //there may be messages that can be now be delivered
+        for(consumer_iterator j = consumers.begin(); j != consumers.end(); 
j++){
+            j->second->requestDispatch();
+        }
+    }
 }

Modified: incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h Wed May  9 
10:00:32 2007
@@ -89,6 +89,7 @@
             MessageBuilder messageBuilder;//builder for in-progress message
             Exchange::shared_ptr exchange;//exchange to which any in-progress 
message was published to
            qpid::framing::ProtocolVersion version; // version used for this 
channel
+            bool flowActive;
 
             virtual void complete(Message::shared_ptr& msg);
             void deliver(Message::shared_ptr& msg, const string& tag, 
Queue::shared_ptr& queue, bool ackExpected);            
@@ -118,6 +119,7 @@
             void handlePublish(Message* msg, Exchange::shared_ptr exchange);
             void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
             void handleContent(qpid::framing::AMQContentBody::shared_ptr 
content);
+            void flow(bool active);
         };
 
         struct InvalidAckException{};

Modified: incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp Wed May  9 
10:00:32 2007
@@ -233,12 +233,11 @@
     }
 } 
         
-void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool 
/*active*/){
-
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool 
active){
+    parent->getChannel(channel)->flow(active);
+    parent->client->getChannel().flowOk(channel, active);    
 }         
-void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, 
bool /*active*/){
-
-} 
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, 
bool /*active*/){} 
         
 void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, 
u_int16_t /*replyCode*/, const string& /*replyText*/, 
                                                    u_int16_t /*classId*/, 
u_int16_t /*methodId*/){

Modified: incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp Wed May  9 
10:00:32 2007
@@ -30,9 +30,11 @@
 using namespace qpid::sys;
 using namespace qpid::sys;
 
-u_int16_t Connection::channelIdCounter;
-
-Connection::Connection(  bool debug, u_int32_t _max_frame_size, 
qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), 
closed(true),
+Connection::Connection(  bool _debug, u_int32_t _max_frame_size, 
qpid::framing::ProtocolVersion* _version) : 
+    debug(_debug),
+    channelIdCounter(0),
+    max_frame_size(_max_frame_size), 
+    closed(true),
     version(_version->getMajor(),_version->getMinor())
 {
     connector = new Connector(version, debug, _max_frame_size);
@@ -96,7 +98,7 @@
     }else{
         THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
     }
-    
+    closed = false;
 }
 
 void Connection::close(){
@@ -108,6 +110,7 @@
         
         sendAndReceive(new AMQFrame(version, 0, new 
ConnectionCloseBody(version, code, text, classId, methodId)), 
method_bodies.connection_close_ok);
         connector->close();
+        closed = true;
     }
 }
 

Modified: incubator/qpid/branches/M2/cpp/lib/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connection.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connection.h Wed May  9 10:00:32 
2007
@@ -68,7 +68,8 @@
 
         typedef std::map<int, Channel*>::iterator iterator;
 
-       static u_int16_t channelIdCounter;
+        const bool debug;
+       u_int16_t channelIdCounter;
 
        std::string host;
        int port;

Modified: incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp Wed May  9 10:00:32 
2007
@@ -57,9 +57,10 @@
 }
 
 void Connector::close(){
-    closed = true;
-    socket.close();
-    receiver.join();
+    if (markClosed()) {
+        socket.close();
+        receiver.join();
+    }
 }
 
 void Connector::setInputHandler(InputHandler* handler){
@@ -101,14 +102,24 @@
 }
 
 void Connector::handleClosed(){
-    closed = true;
-    socket.close();
-    if(shutdownHandler) shutdownHandler->shutdown();
+    if (markClosed()) {
+        socket.close();
+        if(shutdownHandler) shutdownHandler->shutdown();
+    }
+}
+
+bool Connector::markClosed(){
+    if (closed) {
+        return false;
+    } else {
+        closed = true;
+        return true;
+    }
 }
 
 void Connector::checkIdle(ssize_t status){
     if(timeoutHandler){
-         Time t = now() * TIME_MSEC;
+        Time t = now() * TIME_MSEC;
         if(status == Socket::SOCKET_TIMEOUT) {
             if(idleIn && (t - lastIn > idleIn)){
                 timeoutHandler->idleIn();

Modified: incubator/qpid/branches/M2/cpp/lib/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connector.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connector.h (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connector.h Wed May  9 10:00:32 
2007
@@ -44,7 +44,7 @@
        const int send_buffer_size;
        qpid::framing::ProtocolVersion version;
 
-       bool closed;
+       volatile bool closed;
 
         int64_t lastIn;
         int64_t lastOut;
@@ -73,6 +73,7 @@
 
        void run();
        void handleClosed();
+        bool markClosed();
 
     public:
        Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = 
false, u_int32_t buffer_size = 1024);

Modified: incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp Wed May  9 
10:00:32 2007
@@ -24,6 +24,7 @@
 #include <apr/APRBase.h>
 #include <apr/APRPool.h>
 
+#include <iostream>
 
 using namespace qpid::sys;
 
@@ -55,6 +56,7 @@
 
 void Socket::close() {
     if (socket == 0) return;
+    CHECK_APR_SUCCESS(apr_socket_shutdown(socket, APR_SHUTDOWN_READWRITE));
     CHECK_APR_SUCCESS(apr_socket_close(socket));
     socket = 0;
 }
@@ -76,8 +78,9 @@
     apr_status_t status =
         apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
     if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+    if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
     CHECK_APR_SUCCESS(status);
-     return received;
+    return received;
 }
 
 

Modified: incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h (original)
+++ incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h Wed May  9 10:00:32 2007
@@ -71,7 +71,7 @@
         std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
         int messages = params.getInt("P2P_NUM_MESSAGES");
         if (role == "SENDER") {
-            worker = std::auto_ptr<Worker>(new Sender(options, 
Exchange::DEFAULT_EXCHANGE, queue, messages));
+            worker = std::auto_ptr<Worker>(new Sender(options, 
Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages));
         } else if(role == "RECEIVER"){
             worker = std::auto_ptr<Worker>(new Receiver(options, queue, 
messages));
         } else {

Modified: incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp Wed May  9 10:00:32 
2007
@@ -53,6 +53,7 @@
     CPPUNIT_TEST(testDeliveryAndRecovery);
     CPPUNIT_TEST(testStaging);
     CPPUNIT_TEST(testQueuePolicy);
+    CPPUNIT_TEST(testFlow);
     CPPUNIT_TEST_SUITE_END();
 
     class MockMessageStore : public NullMessageStore
@@ -301,6 +302,37 @@
 
         }
         store.check();
+    }
+
+
+    void testFlow(){
+        DummyHandler handler;
+        Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 
10000);
+
+        const string data("abcdefghijklmn");
+
+        Message::shared_ptr msg(createMessage("test", "my_routing_key", 
"my_message_id", 14));
+        addContent(msg, data);
+        Queue::shared_ptr queue(new Queue("my_queue"));
+        ConnectionToken* owner(0);
+        string tag("no_ack");
+        channel.consume(tag, queue, false, false, owner);
+        channel.flow(false);
+        queue->deliver(msg);
+        CPPUNIT_ASSERT_EQUAL((size_t) 0, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());        
+        channel.flow(true);
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());  
      
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());  
      
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+        BasicDeliverBody::shared_ptr 
deliver(dynamic_pointer_cast<BasicDeliverBody, 
AMQBody>(handler.frames[0]->getBody()));
+        AMQHeaderBody::shared_ptr 
contentHeader(dynamic_pointer_cast<AMQHeaderBody, 
AMQBody>(handler.frames[1]->getBody()));
+        AMQContentBody::shared_ptr 
contentBody(dynamic_pointer_cast<AMQContentBody, 
AMQBody>(handler.frames[2]->getBody()));
+        CPPUNIT_ASSERT(deliver);
+        CPPUNIT_ASSERT(contentHeader);
+        CPPUNIT_ASSERT(contentBody);
+        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
     }
 
     Message* createMessage(const string& exchange, const string& routingKey, 
const string& messageId, u_int64_t contentSize)


Reply via email to