Author: gsim
Date: Wed May 9 10:00:32 2007
New Revision: 536584
URL: http://svn.apache.org/viewvc?view=rev&rev=536584
Log:
* Added support for channel.flow:
cpp/tests/ChannelTest.cpp
cpp/lib/broker/SessionHandlerImpl.cpp
cpp/lib/broker/BrokerChannel.h
cpp/lib/broker/BrokerChannel.cpp
* Fixed client connection closing process:
cpp/lib/common/sys/apr/Socket.cpp
cpp/lib/client/Connector.h
cpp/lib/client/Connector.cpp
cpp/lib/client/Connection.h
cpp/lib/client/Connection.cpp
* Use amq.direct rather than default exchange in P2P test
(to interop with java)
cpp/tests/BasicP2Ptest.h
Modified:
incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp
incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp
incubator/qpid/branches/M2/cpp/lib/client/Connection.h
incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp
incubator/qpid/branches/M2/cpp/lib/client/Connector.h
incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp
incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h
incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp
Modified: incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.cpp Wed May 9
10:00:32 2007
@@ -43,7 +43,8 @@
accumulatedAck(0),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
- version(_version){
+ version(_version),
+ flowActive(true){
outstanding.reset();
}
@@ -142,7 +143,7 @@
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
+ if(!parent->flowActive || (ackExpected &&
!parent->checkPrefetch(msg))){
blocked = true;
}else{
blocked = false;
@@ -256,4 +257,16 @@
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
u_int64_t deliveryTag){
msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
+}
+
+void Channel::flow(bool active){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool requestDelivery(!flowActive && active);
+ flowActive = active;
+ if (requestDelivery) {
+ //there may be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end();
j++){
+ j->second->requestDispatch();
+ }
+ }
}
Modified: incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/M2/cpp/lib/broker/BrokerChannel.h Wed May 9
10:00:32 2007
@@ -89,6 +89,7 @@
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress
message was published to
qpid::framing::ProtocolVersion version; // version used for this
channel
+ bool flowActive;
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag,
Queue::shared_ptr& queue, bool ackExpected);
@@ -118,6 +119,7 @@
void handlePublish(Message* msg, Exchange::shared_ptr exchange);
void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void handleContent(qpid::framing::AMQContentBody::shared_ptr
content);
+ void flow(bool active);
};
struct InvalidAckException{};
Modified: incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/broker/SessionHandlerImpl.cpp Wed May 9
10:00:32 2007
@@ -233,12 +233,11 @@
}
}
-void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool
/*active*/){
-
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool
active){
+ parent->getChannel(channel)->flow(active);
+ parent->client->getChannel().flowOk(channel, active);
}
-void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/,
bool /*active*/){
-
-}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/,
bool /*active*/){}
void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel,
u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/,
u_int16_t /*methodId*/){
Modified: incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connection.cpp Wed May 9
10:00:32 2007
@@ -30,9 +30,11 @@
using namespace qpid::sys;
using namespace qpid::sys;
-u_int16_t Connection::channelIdCounter;
-
-Connection::Connection( bool debug, u_int32_t _max_frame_size,
qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size),
closed(true),
+Connection::Connection( bool _debug, u_int32_t _max_frame_size,
qpid::framing::ProtocolVersion* _version) :
+ debug(_debug),
+ channelIdCounter(0),
+ max_frame_size(_max_frame_size),
+ closed(true),
version(_version->getMajor(),_version->getMinor())
{
connector = new Connector(version, debug, _max_frame_size);
@@ -96,7 +98,7 @@
}else{
THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
}
-
+ closed = false;
}
void Connection::close(){
@@ -108,6 +110,7 @@
sendAndReceive(new AMQFrame(version, 0, new
ConnectionCloseBody(version, code, text, classId, methodId)),
method_bodies.connection_close_ok);
connector->close();
+ closed = true;
}
}
Modified: incubator/qpid/branches/M2/cpp/lib/client/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connection.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connection.h Wed May 9 10:00:32
2007
@@ -68,7 +68,8 @@
typedef std::map<int, Channel*>::iterator iterator;
- static u_int16_t channelIdCounter;
+ const bool debug;
+ u_int16_t channelIdCounter;
std::string host;
int port;
Modified: incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connector.cpp Wed May 9 10:00:32
2007
@@ -57,9 +57,10 @@
}
void Connector::close(){
- closed = true;
- socket.close();
- receiver.join();
+ if (markClosed()) {
+ socket.close();
+ receiver.join();
+ }
}
void Connector::setInputHandler(InputHandler* handler){
@@ -101,14 +102,24 @@
}
void Connector::handleClosed(){
- closed = true;
- socket.close();
- if(shutdownHandler) shutdownHandler->shutdown();
+ if (markClosed()) {
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }
+}
+
+bool Connector::markClosed(){
+ if (closed) {
+ return false;
+ } else {
+ closed = true;
+ return true;
+ }
}
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
- Time t = now() * TIME_MSEC;
+ Time t = now() * TIME_MSEC;
if(status == Socket::SOCKET_TIMEOUT) {
if(idleIn && (t - lastIn > idleIn)){
timeoutHandler->idleIn();
Modified: incubator/qpid/branches/M2/cpp/lib/client/Connector.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/client/Connector.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/client/Connector.h (original)
+++ incubator/qpid/branches/M2/cpp/lib/client/Connector.h Wed May 9 10:00:32
2007
@@ -44,7 +44,7 @@
const int send_buffer_size;
qpid::framing::ProtocolVersion version;
- bool closed;
+ volatile bool closed;
int64_t lastIn;
int64_t lastOut;
@@ -73,6 +73,7 @@
void run();
void handleClosed();
+ bool markClosed();
public:
Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug =
false, u_int32_t buffer_size = 1024);
Modified: incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp (original)
+++ incubator/qpid/branches/M2/cpp/lib/common/sys/apr/Socket.cpp Wed May 9
10:00:32 2007
@@ -24,6 +24,7 @@
#include <apr/APRBase.h>
#include <apr/APRPool.h>
+#include <iostream>
using namespace qpid::sys;
@@ -55,6 +56,7 @@
void Socket::close() {
if (socket == 0) return;
+ CHECK_APR_SUCCESS(apr_socket_shutdown(socket, APR_SHUTDOWN_READWRITE));
CHECK_APR_SUCCESS(apr_socket_close(socket));
socket = 0;
}
@@ -76,8 +78,9 @@
apr_status_t status =
apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
CHECK_APR_SUCCESS(status);
- return received;
+ return received;
}
Modified: incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h (original)
+++ incubator/qpid/branches/M2/cpp/tests/BasicP2PTest.h Wed May 9 10:00:32 2007
@@ -71,7 +71,7 @@
std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
int messages = params.getInt("P2P_NUM_MESSAGES");
if (role == "SENDER") {
- worker = std::auto_ptr<Worker>(new Sender(options,
Exchange::DEFAULT_EXCHANGE, queue, messages));
+ worker = std::auto_ptr<Worker>(new Sender(options,
Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages));
} else if(role == "RECEIVER"){
worker = std::auto_ptr<Worker>(new Receiver(options, queue,
messages));
} else {
Modified: incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp?view=diff&rev=536584&r1=536583&r2=536584
==============================================================================
--- incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp (original)
+++ incubator/qpid/branches/M2/cpp/tests/ChannelTest.cpp Wed May 9 10:00:32
2007
@@ -53,6 +53,7 @@
CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
+ CPPUNIT_TEST(testFlow);
CPPUNIT_TEST_SUITE_END();
class MockMessageStore : public NullMessageStore
@@ -301,6 +302,37 @@
}
store.check();
+ }
+
+
+ void testFlow(){
+ DummyHandler handler;
+ Channel channel(qpid::framing::highestProtocolVersion, &handler, 7,
10000);
+
+ const string data("abcdefghijklmn");
+
+ Message::shared_ptr msg(createMessage("test", "my_routing_key",
"my_message_id", 14));
+ addContent(msg, data);
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner(0);
+ string tag("no_ack");
+ channel.consume(tag, queue, false, false, owner);
+ channel.flow(false);
+ queue->deliver(msg);
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ channel.flow(true);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+ BasicDeliverBody::shared_ptr
deliver(dynamic_pointer_cast<BasicDeliverBody,
AMQBody>(handler.frames[0]->getBody()));
+ AMQHeaderBody::shared_ptr
contentHeader(dynamic_pointer_cast<AMQHeaderBody,
AMQBody>(handler.frames[1]->getBody()));
+ AMQContentBody::shared_ptr
contentBody(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
}
Message* createMessage(const string& exchange, const string& routingKey,
const string& messageId, u_int64_t contentSize)