Author: gsim
Date: Fri Jul 25 03:16:22 2008
New Revision: 679739
URL: http://svn.apache.org/viewvc?rev=679739&view=rev
Log:
Fixed bug in SubscriptionManager::get() where flush was issued before waiting
and if message showed up after flush completed but before wait was finished
there was no credit (due to flush) to deliver it to the waiting client. Added
test for thise case.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=679739&r1=679738&r2=679739&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri
Jul 25 03:16:22 2008
@@ -134,8 +134,11 @@
std::string unique = framing::Uuid(true).str();
subscribe(lq, queue, FlowControl::messageCredit(1), unique);
AutoCancel ac(*this, unique);
+ //first wait for message to be delivered if a timeout has been specified
+ if (timeout && lq.get(result, timeout)) return true;
+ //make sure message is not on queue before final check
sync(session).messageFlush(unique);
- return lq.get(result, timeout);
+ return lq.get(result, 0);
}
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=679739&r1=679738&r2=679739&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Jul 25
03:16:22 2008
@@ -41,6 +41,7 @@
using namespace qpid::framing;
using namespace qpid;
using qpid::sys::Monitor;
+using qpid::sys::Thread;
using qpid::sys::TIME_SEC;
using std::string;
using std::cout;
@@ -238,6 +239,19 @@
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}
+struct DelayedTransfer : sys::Runnable
+{
+ ClientSessionFixture& fixture;
+
+ DelayedTransfer(ClientSessionFixture& f) : fixture(f) {}
+
+ void run()
+ {
+ sleep(1);
+ fixture.session.messageTransfer(content=Message("foo2", "getq"));
+ }
+};
+
QPID_AUTO_TEST_CASE(testGet) {
ClientSessionFixture fix;
fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
@@ -249,6 +263,12 @@
BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
BOOST_CHECK_EQUAL("foo1", got.getData());
BOOST_CHECK(!fix.subs.get(got, "getq"));
+ DelayedTransfer sender(fix);
+ Thread t(sender);
+ //test timed get where message shows up after a short delay
+ BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC));
+ BOOST_CHECK_EQUAL("foo2", got.getData());
+ t.join();
}
QPID_AUTO_TEST_CASE(testOpenFailure) {