On 04/19/2010 07:53 PM, Clive Lilley wrote:
Hello,

I have been using the QPID C++ broker and C++ client API for the last
several weeks and I have come across an issue today that I thought I had
better raise.

I have been using MessageListeners to receive messages in my C++ client
applications, something like

/Connection conn = Connection(host,port);
Session session = conn.newSession();
SubscriptionManager subMgmt = SubscriptionManager(session);
subMgmt.subscribe(MyMsgListener1, myQueue1Name);
subMgmt.subscriber(MyMsgListener2, myQueue2Name);
subMgmt.run();/

And everything works as expected. But if I try to use LocalQueues I
don't get the behaviour I would have expected. If I change the above
code to

/ Connection conn = Connection(host,port);
Session session = conn.newSession();
SubscriptionManager subMgmt = SubscriptionManager(session);
subMgmt.subscribe(MyLocalQueue1, myQueue1Name);
subMgmt.subscribe(MyLocalQueue2, myQueue2Name);
subMgmt.run();

/My application only reads messages off /MyLocalQueue2, /even though the
queue, myQueue1Name, in the broker is filling up with messages. If I
comment out the MyLocalQueue2 subscribe line then I can read messages
from MyLocalQueue1.

So it seems that a SubscriptionManager can only be configured with a
single LocalQueue. If I change my code to the following

/ Connection conn = Connection(host,port);
Session session = conn.newSession();
SubscriptionManager subMgmt1 = SubscriptionManager(session);
// SubscriptionManager subMgmt2 = SubscriptionManager(session);/
/ subMgmt1.subscribe(MyLocalQueue1, myQueue1Name);
subMgmt2.subscribe(MyLocalQueue2, myQueue2Name);
subMgmt1.run();
// subMgmt2.run();/

then everything works fine. Is this the expected behaviour?

No, that is not expected. I tried a little example of my own that *does* receive messages from two separate LocalQueues (see attached).

What version of the code are you using, and does the attached test work for you (create queue-1, queue-2, run the test then send some messages to those queues)?

If this does work for you, what are the key differences between your case and this test app? Are you using separate threads to process each LocalQueue?

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/LocalQueue.h>
#include <qpid/sys/Time.h>

#include <cstdlib>
#include <iostream>

using namespace qpid::client;
using namespace qpid::sys;

using std::string;

int main(int argc, char** argv) {
    ConnectionSettings settings;
    if (argc>1) settings.host = argv[1];
    if (argc>2) settings.port = atoi(argv[2]);
    Connection connection;
    try {
        connection.open(settings);
        Session session =  connection.newSession();
        SubscriptionManager subscriptions(session);
        LocalQueue q1;
        LocalQueue q2;
        subscriptions.subscribe(q1, "queue-1");
        subscriptions.subscribe(q2, "queue-2");
        Message received;
        while (true) {
            if (q1.get(received, TIME_MSEC*5)) std::cout << "Received on q1: " << received.getData() << std::endl;
            if (q2.get(received, TIME_MSEC*5)) std::cout << "Received on q2: " << received.getData() << std::endl;
        }

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to