On 01/11/2010 04:20 AM, denny86 wrote:
Gordon Sim wrote:
The simplest way (in my view) is to pass in an instance of
SubscriptionSettings to each SubscriptionManager::subscribe() call.
However if the default settings work for you (and it sounds like they
might?) then you don't even need to do that. Just call subscribe with
your listener and the queue name.
Default setting is working fine.
But problem is that when i set ACCEPT_MODE_EXPLICIT for multiple queues,
program crashes even if auto acquire is off or on.
The default accept mode is ACCEPT_MODE_EXPLICIT. When you say above that
the default settings work fine does that mean they work fine even for
multiple queues?
If so what is the change that causes the multiple queues case not to work?
If not can you try the attached test program? It will listen for a
configurable number of messages on queues a, b and c (all bound to
amq.fanout for convenience). I can consume 10 million messages from each
queue without seeing any issue (neither crashes nor memory growth) on linux.
Now i am trying with ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED, but
program crashes after sometime. But one advantage using
ACQUIRE_MODE_PRE_ACQUIRED is that queue is correctly dequeued after
accepting the msg, and that queue depth is reported 0 as per queue-stats
tool. If i would have set ACQUIRE_MODE_NOT_ACQUIRED, queue depth would be
greater and also program crashes in this case also. Only mode working with
multiple queues from my experiences, is ACCEPT_MODE_NONE. I would confirm
this info with more research from my side.
Thanks& Regards,
Denny
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/client/Connection.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include <cstdlib>
#include <iostream>
using namespace qpid::client;
using namespace qpid::sys;
struct MyListener : MessageListener
{
const uint expected;
uint count;
Subscription subscription;
MyListener(uint expected);
void received(Message&);
};
MyListener::MyListener(uint e) : expected(e), count(0) {}
void MyListener::received(Message&)
{
if (++count >= expected) subscription.cancel();
}
int main(int argc, char** argv) {
int count = argc>1 ? atoi(argv[1]) : 1000000;
ConnectionSettings settings;
if (argc>2) settings.host = argv[2];
if (argc>3) settings.port = atoi(argv[3]);
Connection connection;
try {
connection.open(settings);
Session session = connection.newSession();
session.queueDeclare(arg::queue="a");
session.exchangeBind(arg::exchange="amq.fanout", arg::queue="a");
session.queueDeclare(arg::queue="b");
session.exchangeBind(arg::exchange="amq.fanout", arg::queue="b");
session.queueDeclare(arg::queue="c");
session.exchangeBind(arg::exchange="amq.fanout", arg::queue="c");
SubscriptionManager subscriptions(session);
SubscriptionSettings settings;
settings.autoAck = 100;
settings.acquireMode = ACQUIRE_MODE_PRE_ACQUIRED;
settings.acceptMode = ACCEPT_MODE_EXPLICIT;
MyListener a(count);
a.subscription = subscriptions.subscribe(a, "a", settings);
MyListener b(count);
b.subscription = subscriptions.subscribe(b, "b", settings);
MyListener c(count);
c.subscription = subscriptions.subscribe(c, "c", settings);
AbsTime start = now();
subscriptions.run();
AbsTime end = now();
Duration d(start, end);
double time = d / TIME_MSEC;
std::cout << "Received " << count << " messages on all three queues in " << time << "ms" << std::endl;
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
return 1;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/client/Connection.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/sys/Time.h>
#include <cstdlib>
#include <iostream>
using namespace qpid::client;
using namespace qpid::sys;
using namespace qpid::framing;
using std::string;
int main(int argc, char** argv) {
int count = argc>1 ? atoi(argv[1]) : 100000;
ConnectionSettings settings;
if (argc>2) settings.host = argv[2];
if (argc>3) settings.port = atoi(argv[3]);
Connection connection;
try {
connection.open(settings);
Session session = connection.newSession();
session.queueDeclare(arg::queue="a");
session.exchangeBind(arg::exchange="amq.fanout", arg::queue="a");
session.queueDeclare(arg::queue="b");
session.exchangeBind(arg::exchange="amq.fanout", arg::queue="b");
session.queueDeclare(arg::queue="c");
session.exchangeBind(arg::exchange="amq.fanout", arg::queue="c");
Message message;
AbsTime start = now();
for (int i = 0; i < count; ++i) {
async(session).messageTransfer(arg::destination="amq.fanout", arg::content=message);
}
session.sync();
AbsTime end = now();
Duration d(start, end);
double time = d / TIME_MSEC;
std::cout << "Sent " << count << " messages to all three queues in " << time << "ms" << std::endl;
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
return 1;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]