Author: breed Date: Fri Nov 5 06:43:56 2010 New Revision: 1031453 URL: http://svn.apache.org/viewvc?rev=1031453&view=rev Log: ZOOKEEPER-916. Problem receiving messages from subscribed channels in c++ client
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1031453&r1=1031452&r2=1031453&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Nov 5 06:43:56 2010 @@ -149,6 +149,8 @@ BUGFIXES: ZOOKEEPER-884. Remove LedgerSequence references from BookKeeper documentation and comments in tests (fpj via breed) + ZOOKEEPER-916. Problem receiving messages from subscribed channels in c++ client (ivan via breed) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) @@ -185,6 +187,8 @@ IMPROVEMENTS: ZOOKEEPER-864. Hedwig C++ client improvements (Ivan Kelly via breed) + ZOOKEEPER-862. Hedwig created ledgers with hardcoded Bookkeeper ensemble and quorum size. Make these a server config parameter instead. (Erwin T via breed) + NEW FEATURES: ZOOKEEPER-729. Java client API to recursively delete a subtree. (Kay Kay via henry) Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp?rev=1031453&r1=1031452&r2=1031453&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp (original) +++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp Fri Nov 5 06:43:56 2010 @@ -218,10 +218,13 @@ void DuplexChannel::connect() { void DuplexChannel::startReceiving() { if (LOG.isDebugEnabled()) { - LOG.debugStream() << "DuplexChannel::startReceiving channel(" << this << ")"; + LOG.debugStream() << "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving; } boost::lock_guard<boost::mutex> lock(receiving_lock); + if (receiving) { + return; + } receiving = true; DuplexChannel::readSize(shared_from_this()); Modified: hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp?rev=1031453&r1=1031452&r2=1031453&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp (original) +++ hadoop/zookeeper/trunk/src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp Fri Nov 5 06:43:56 2010 @@ -47,6 +47,7 @@ private: CPPUNIT_TEST(testPubSubContinuousOverClose); // CPPUNIT_TEST(testPubSubContinuousOverServerDown); CPPUNIT_TEST(testMultiTopic); + CPPUNIT_TEST(testBigMessage); CPPUNIT_TEST(testMultiTopicMultiSubscriber); CPPUNIT_TEST_SUITE_END(); @@ -183,6 +184,7 @@ public: CPPUNIT_ASSERT(pass); } + /* void testPubSubContinuousOverServerDown() { std::string topic = "pubSubTopic"; std::string sid = "MySubscriberid-1"; @@ -328,6 +330,44 @@ public: } CPPUNIT_ASSERT(passA && passB); } + + static const int BIG_MESSAGE_SIZE = 16436*2; // MTU to lo0 is 16436 by default on linux + + void testBigMessage() { + std::string topic = "pubSubTopic"; + std::string sid = "MySubscriberid-6"; + + Hedwig::Configuration* conf = new TestServerConfiguration(hw1); + std::auto_ptr<Hedwig::Configuration> confptr(conf); + + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr<Hedwig::Client> clientptr(client); + + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid); + Hedwig::MessageHandlerCallbackPtr handler(cb); + + sub.startDelivery(topic, sid, handler); + + char buf[BIG_MESSAGE_SIZE]; + std::string bigmessage(buf, BIG_MESSAGE_SIZE); + pub.publish(topic, bigmessage); + pub.publish(topic, "Test Message 1"); + bool pass = false; + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() > 0) { + if (cb->getLastMessage() == "Test Message 1") { + pass = true; + break; + } + } + } + CPPUNIT_ASSERT(pass); + } }; CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PubSubTestSuite, "PubSub" );