Author: aconway
Date: Wed Jun 25 13:51:30 2008
New Revision: 671655

URL: http://svn.apache.org/viewvc?rev=671655&view=rev
Log:
Additions to the client API:
 - SubscriptionManager::get(queue) to get a single message from a queue.
 - Set FlowControl per-subscription.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_run
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Jun 25 13:51:30 2008
@@ -444,6 +444,7 @@
   qpid/client/Demux.h \
   qpid/client/Dispatcher.h \
   qpid/client/Execution.h \
+  qpid/client/FlowControl.h \
   qpid/client/Future.h \
   qpid/client/FutureCompletion.h \
   qpid/client/FutureResult.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h?rev=671655&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h Wed Jun 25 
13:51:30 2008
@@ -0,0 +1,73 @@
+#ifndef QPID_CLIENT_FLOWCONTROL_H
+#define QPID_CLIENT_FLOWCONTROL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * Flow control works by associating a finite amount of "credit"
+ * associated with a subscription.
+ *
+ * Credit includes a message count and a byte count. Each message
+ * received decreases the message count by one, and the byte count by
+ * the size of the message. Either count can have the special value
+ * UNLIMITED which is never decreased.
+ *
+ * A subscription's credit is exhausted when the message count is 0 or
+ * the byte count is too small for the next available message. The
+ * subscription will not receive any further messages until is credit
+ * is renewed.
+ *
+ * In "window mode" credit is automatically renewed when a message is
+ * acknowledged (@see AckPolicy) In non-window mode credit is not
+ * automatically renewed, it must be explicitly re-set (@see
+ * SubscriptionManager)
+ */
+struct FlowControl {
+    static const uint32_t UNLIMITED=0xFFFFFFFF;
+    FlowControl(uint32_t messages_=0, uint32_t bytes_=0, bool window_=false)
+        : messages(messages_), bytes(bytes_), window(window_) {}
+    
+    static FlowControl messageCredit(uint32_t messages_) { return 
FlowControl(messages_,UNLIMITED,false); }
+    static FlowControl messageWindow(uint32_t messages_) { return 
FlowControl(messages_,UNLIMITED,true); }
+    static FlowControl byteCredit(uint32_t bytes_) { return 
FlowControl(UNLIMITED,bytes_,false); }
+    static FlowControl byteWindow(uint32_t bytes_) { return 
FlowControl(UNLIMITED,bytes_,true); }
+    static FlowControl unlimited() { return FlowControl(UNLIMITED, UNLIMITED, 
false); }
+    static FlowControl zero() { return FlowControl(0, 0, false); }
+
+    /** Message credit: subscription can accept up to this many messages. */
+    uint32_t messages;
+    /** Byte credit: subscription can accept up to this many bytes of message 
content. */
+    uint32_t bytes;
+    /** Window mode. If true credit is automatically renewed as messages are 
acknowledged. */
+    bool window;
+
+    bool operator==(const FlowControl& x) {
+        return messages == x.messages && bytes == x.bytes && window == 
x.window;
+    };
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_FLOWCONTROL_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Wed Jun 25 
13:51:30 2008
@@ -56,7 +56,7 @@
      */
     Message pop();
 
-    /** Synonym for get(). */
+    /** Synonym for pop(). */
     Message get() { return pop(); }
     
     /** Return true if local queue is empty. */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed 
Jun 25 13:51:30 2008
@@ -25,6 +25,7 @@
 #include <qpid/client/Dispatcher.h>
 #include <qpid/client/Session.h>
 #include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
 #include <set>
 #include <sstream>
 
@@ -34,35 +35,48 @@
 
 SubscriptionManager::SubscriptionManager(const Session& s)
     : dispatcher(s), session(s),
-      messages(UNLIMITED), bytes(UNLIMITED), window(true),
+      flowControl(UNLIMITED, UNLIMITED, false),
       acceptMode(0), acquireMode(0),
       autoStop(true)
 {}
 
 void SubscriptionManager::subscribeInternal(
-    const std::string& q, const std::string& dest)
+    const std::string& q, const std::string& dest, const FlowControl& fc)
 {
     session.messageSubscribe( 
         arg::queue=q, arg::destination=dest,
         arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
-    setFlowControl(dest, messages, bytes, window); 
+    if (fc.messages || fc.bytes) // No need to set if all 0.
+        setFlowControl(dest, fc);
 }
 
 void SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const std::string& d)
 {
+    subscribe(listener, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+    MessageListener& listener, const std::string& q, const FlowControl& fc, 
const std::string& d)
+{
     std::string dest=d.empty() ? q:d;
     dispatcher.listen(dest, &listener, autoAck);
-    return subscribeInternal(q, dest);
+    return subscribeInternal(q, dest, fc);
 }
 
 void SubscriptionManager::subscribe(
     LocalQueue& lq, const std::string& q, const std::string& d)
 {
+    subscribe(lq, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+    LocalQueue& lq, const std::string& q, const FlowControl& fc, const 
std::string& d)
+{
     std::string dest=d.empty() ? q:d;
     lq.session=session;
     lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
-    return subscribeInternal(q, dest);
+    return subscribeInternal(q, dest, fc);
 }
 
 void SubscriptionManager::setFlowControl(
@@ -74,14 +88,20 @@
     session.sync();
 }
 
+void SubscriptionManager::setFlowControl(const std::string& dest, const 
FlowControl& fc) {
+    setFlowControl(dest, fc.messages, fc.bytes, fc.window);
+}
+
+void SubscriptionManager::setFlowControl(const FlowControl& fc) { 
flowControl=fc; }
+
 void SubscriptionManager::setFlowControl(
     uint32_t messages_,  uint32_t bytes_, bool window_)
 {
-    messages=messages_;
-    bytes=bytes_;
-    window=window_;
+    setFlowControl(FlowControl(messages_, bytes_, window_));
 }
 
+const FlowControl& SubscriptionManager::getFlowControl() const { return 
flowControl; }
+
 void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
 
 void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
@@ -109,6 +129,12 @@
     dispatcher.stop();
 }
 
+Message SubscriptionManager::get(const std::string& queue) {
+    LocalQueue lq;
+    subscribe(lq, queue, FlowControl::messageCredit(1), 
framing::Uuid(true).str());
+    return lq.get();
+}
+
 }} // namespace qpid::client
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Wed Jun 
25 13:51:30 2008
@@ -27,8 +27,8 @@
 #include <qpid/client/Session.h>
 #include <qpid/client/MessageListener.h>
 #include <qpid/client/LocalQueue.h>
+#include <qpid/client/FlowControl.h>
 #include <qpid/sys/Runnable.h>
-
 #include <set>
 #include <sstream>
 
@@ -48,13 +48,11 @@
     typedef sys::Mutex::ScopedLock Lock;
     typedef sys::Mutex::ScopedUnlock Unlock;
 
-    void subscribeInternal(const std::string& q, const std::string& dest);
+    void subscribeInternal(const std::string& q, const std::string& dest, 
const FlowControl&);
     
     qpid::client::Dispatcher dispatcher;
     qpid::client::AsyncSession session;
-    uint32_t messages;
-    uint32_t bytes;
-    bool window;
+    FlowControl flowControl;
     AckPolicy autoAck;
     bool acceptMode;
     bool acquireMode;
@@ -72,6 +70,38 @@
      * 
      [EMAIL PROTECTED] listener Listener object to receive messages.
      [EMAIL PROTECTED] queue Name of the queue to subscribe to.
+     [EMAIL PROTECTED] flow initial FlowControl for the subscription.
+     [EMAIL PROTECTED] tag Unique destination tag for the listener.
+     * If not specified, the queue name is used.
+     */
+    void subscribe(MessageListener& listener,
+                   const std::string& queue,
+                   const FlowControl& flow,
+                   const std::string& tag=std::string());
+
+    /**
+     * Subscribe a LocalQueue to receive messages from queue.
+     * 
+     * Incoming messages are stored in the queue for you to retrieve.
+     * 
+     [EMAIL PROTECTED] queue Name of the queue to subscribe to.
+     [EMAIL PROTECTED] flow initial FlowControl for the subscription.
+     [EMAIL PROTECTED] tag Unique destination tag for the listener.
+     * If not specified, the queue name is used.
+     */
+    void subscribe(LocalQueue& localQueue,
+                   const std::string& queue,
+                   const FlowControl& flow,
+                   const std::string& tag=std::string());
+
+    /**
+     * Subscribe a MessagesListener to receive messages from queue.
+     *
+     * Provide your own subclass of MessagesListener to process
+     * incoming messages. It will be called for each message received.
+     * 
+     [EMAIL PROTECTED] listener Listener object to receive messages.
+     [EMAIL PROTECTED] queue Name of the queue to subscribe to.
      [EMAIL PROTECTED] tag Unique destination tag for the listener.
      * If not specified, the queue name is used.
      */
@@ -92,6 +122,11 @@
                    const std::string& queue,
                    const std::string& tag=std::string());
 
+    /**
+     * Get a single message from a queue.
+     */
+    Message get(const std::string& queue);
+
     /** Cancel a subscription. */
     void cancel(const std::string tag);
 
@@ -107,9 +142,17 @@
     /** Cause run() to return */
     void stop();
 
-
     static const uint32_t UNLIMITED=0xFFFFFFFF;
 
+    /** Set the flow control for destination. */
+    void setFlowControl(const std::string& destintion, const FlowControl& 
flow);
+
+    /** Set the default initial flow control for subscriptions that do not 
specify it. */
+    void setFlowControl(const FlowControl& flow);
+
+    /** Get the default flow control for new subscriptions that do not specify 
it. */
+    const FlowControl& getFlowControl() const;
+
     /** Set the flow control for destination tag.
      [EMAIL PROTECTED] tag: name of the destination.
      [EMAIL PROTECTED] messages: message credit.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Jun 25 
13:51:30 2008
@@ -203,7 +203,7 @@
     ClientSessionFixture fix;
     SimpleListener mylistener;
     fix.session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
-    fix.subs.subscribe(mylistener, "myq", "myq");
+    fix.subs.subscribe(mylistener, "myq");
     sys::Thread runner(fix.subs);//start dispatcher thread
     string data("msg");
     Message msg(data, "myq");
@@ -222,5 +222,30 @@
     }
 }
 
+QPID_AUTO_TEST_CASE(testLocalQueue) {
+    ClientSessionFixture fix;
+    fix.session.queueDeclare(queue="lq", exclusive=true, autoDelete=true);
+    LocalQueue lq;
+    fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, 
false));
+    fix.session.messageTransfer(content=Message("foo0", "lq"));
+    fix.session.messageTransfer(content=Message("foo1", "lq"));
+    fix.session.messageTransfer(content=Message("foo2", "lq"));
+    BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
+    BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
+    BOOST_CHECK(lq.empty());    // Credit exhausted.
+    fix.subs.setFlowControl("lq", FlowControl::unlimited());
+    BOOST_CHECK_EQUAL("foo2", lq.pop().getData());    
+}
+
+QPID_AUTO_TEST_CASE(testGet) {
+    ClientSessionFixture fix;
+    fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
+    fix.session.messageTransfer(content=Message("foo0", "getq"));
+    fix.session.messageTransfer(content=Message("foo1", "getq"));
+    BOOST_CHECK_EQUAL("foo0", fix.subs.get("getq").getData());
+    BOOST_CHECK_EQUAL("foo1", fix.subs.get("getq").getData());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Wed Jun 25 13:51:30 2008
@@ -1,7 +1,6 @@
 #!/bin/sh
-# Check for requirements, run AIS tests if found.
-#
 
+# Check AIS requirements tests if found.
 id -nG | grep '\<ais\>' >/dev/null || \
     NOGROUP="You are not a member of the ais group."
 ps -u root | grep aisexec >/dev/null || \
@@ -24,4 +23,11 @@
     exit 0;                    # A warning, not a failure.
 fi
 
-echo `dirname $0`/ais_run | newgrp ais
+# Run the tests
+srcdir=`dirname $0`
+$srcdir/start_cluster 4
+./ais_test
+ret=$?
+$srcdir/stop_cluster 
+exit $ret
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Wed Jun 25 13:51:30 2008
@@ -11,7 +11,7 @@
 
 # ais_check checks conditions for AIS tests and runs if ok.
 TESTS+=ais_check
-EXTRA_DIST+=ais_check ais_run start_cluster stop_cluster
+EXTRA_DIST+=ais_check start_cluster stop_cluster
 
 check_PROGRAMS+=ais_test
 ais_test_SOURCES=ais_test.cpp Cpg.cpp 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp Wed Jun 25 
13:51:30 2008
@@ -62,14 +62,14 @@
     ClusterConnections cluster;
     BOOST_REQUIRE(cluster.size() > 1);
 
-    Session broker0 = cluster[0]->newSession(ASYNC);
+    Session broker0 = cluster[0]->newSession();
     broker0.exchangeDeclare(exchange="ex");
     broker0.queueDeclare(queue="q");
     broker0.queueBind(exchange="ex", queue="q", routingKey="key");
     broker0.close();
     
     for (size_t i = 1; i < cluster.size(); ++i) {
-        Session s = cluster[i]->newSession(ASYNC);
+        Session s = cluster[i]->newSession();
         s.messageTransfer(content=TransferContent("data", "key", "ex"));
         s.messageSubscribe(queue="q", destination="q");
         s.messageFlow(destination="q", unit=0, value=1);//messages
@@ -81,4 +81,28 @@
     }    
 }
 
+QPID_AUTO_TEST_CASE(testMessageReplication) {
+    // Enqueue on one broker, dequeue on another.
+    ClusterConnections cluster;
+    BOOST_REQUIRE(cluster.size() > 1);
+
+    Session broker0 = cluster[0]->newSession();
+    broker0.queueDeclare(queue="q");
+    broker0.messageTransfer(content=TransferContent("data", "q"));
+    broker0.close();
+    
+    Session broker1 = cluster[1]->newSession();
+    broker1.
+        s.messageSubscribe(queue="q", destination="q");
+        s.messageFlow(destination="q", unit=0, value=1);//messages
+        FrameSet::shared_ptr msg = s.get();
+        BOOST_CHECK(msg->isA<MessageTransferBody>());
+        BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+        s.getExecution().completed(msg->getId(), true, true);
+        cluster[i]->close();
+    }    
+}
+
+// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, 
failover.
+
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=671655&r1=671654&r2=671655&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Wed Jun 25 13:51:30 
2008
@@ -3,6 +3,12 @@
 # Print the cluster's URL.
 #
 
+# Execute command with the ais group set.
+with_ais_group() {
+    id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the 
ais group."; exit 1; }
+    echo $* | newgrp ais
+}
+
 test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
 test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; }
 
@@ -13,7 +19,7 @@
 OPTS="--load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log 
--cluster-name=$CLUSTER --no-data-dir --auth=no $*"
 
 for (( i=0; i<SIZE; ++i )); do
-    PORT=`../qpidd $OPTS`  || exit 1
+    PORT=`with_ais_group ../qpidd $OPTS`  || exit 1
     echo $PORT >> cluster.ports
 done
     


Reply via email to