Author: aconway
Date: Thu Feb 22 15:23:52 2007
New Revision: 510705
URL: http://svn.apache.org/viewvc?view=rev&rev=510705
Log:
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class.
* cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API.
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp (with props)
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h (with props)
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp?view=auto&rev=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp Thu Feb 22
15:23:52 2007
@@ -0,0 +1,255 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "Basic.h"
+#include "AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "framing/FieldTable.h"
+#include "Connection.h"
+
+using namespace std;
+
+namespace qpid {
+namespace client {
+
+using namespace sys;
+using namespace framing;
+
+Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {}
+
+void Basic::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if (synch) {
+ BasicConsumeOkBody::shared_ptr response =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
+ channel.responses.getResponse());
+ tag = response->getConsumerTag();
+ }
+ // FIXME aconway 2007-02-20: Race condition!
+ // We could receive the first message for the consumer
+ // before we create the consumer below.
+ // Move consumer creation to handler for BasicConsumeOkBody
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+}
+
+
+void Basic::cancel(const std::string& tag, bool synch) {
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag,
true));
+ channel.sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(channel.version, tag, !synch));
+}
+
+void Basic::cancelAll(){
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag,
true));
+ }
+ }
+}
+
+
+
+bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) {
+ // Expect a message starting with a BasicGetOk
+ incoming.startGet();
+ channel.send(new BasicGetBody(channel.version, 0, queue.getName(),
ackMode));
+ return incoming.waitGet(msg);
+}
+
+
+void Basic::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ const string e = exchange.getName();
+ string key = routingKey;
+ channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory,
immediate));
+ //break msg up into header frame and content frame(s) and send these
+ channel.send(msg.getHeader());
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ channel.send(new AMQContentBody(data));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size :
remaining;
+ string frag(data.substr(offset, length));
+ channel.send(new AMQContentBody(frag));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void Basic::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case BasicDeliverBody::METHOD_ID:
+ case BasicReturnBody::METHOD_ID:
+ case BasicGetOkBody::METHOD_ID:
+ case BasicGetEmptyBody::METHOD_ID:
+ incoming.add(method);
+ return;
+ }
+ throw Channel::UnknownMethod();
+}
+
+void Basic::deliver(Consumer& consumer, Message& msg){
+ //record delivery tag:
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer.listener->received(msg);
+
+ if(channel.isOpen()){
+ bool multiple(false);
+ switch(consumer.ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer.count) < channel.getPrefetch())
+ break;
+ //else drop-through
+ case AUTO_ACK:
+ consumer.lastDeliveryTag = 0;
+ channel.send(
+ new BasicAckBody(
+ channel.version, msg.getDeliveryTag(), multiple));
+ case NO_ACK: // Nothing to do
+ case CLIENT_ACK: // User code must ack.
+ break;
+ // TODO aconway 2007-02-22: Provide a way for user
+ // to ack!
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+
+void Basic::run() {
+ while(channel.isOpen()) {
+ try {
+ Message msg = incoming.waitDispatch();
+ if(msg.getMethod()->isA<BasicReturnBody>()) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler == 0) {
+ // TODO aconway 2007-02-20: proper logging.
+ cout << "Message returned: " << msg.getData() << endl;
+ }
+ else
+ handler->returned(msg);
+ }
+ else {
+ BasicDeliverBody::shared_ptr deliverBody =
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(
+ msg.getMethod());
+ std::string tag = deliverBody->getConsumerTag();
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" + tag);
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
+ }
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::Basic::run() terminated by: " << e.toString()
+ << "(" << typeid(e).name() << ")" << endl;
+ }
+ }
+}
+
+void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
+ returnsHandler = handler;
+}
+
+void Basic::setQos(){
+ channel.sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(new
TxSelectBody(channel.version));
+}
+
+
+// TODO aconway 2007-02-22: NOTES:
+// Move incoming to BasicChannel - check for uses.
+
+}} // namespace qpid::client
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h?view=auto&rev=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h Thu Feb 22 15:23:52
2007
@@ -0,0 +1,195 @@
+#ifndef _client_Basic_h
+#define _client_Basic_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "IncomingMessage.h"
+#include "sys/Runnable.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class FieldTable;
+}
+
+namespace client {
+
+class Channel;
+class Message;
+class Queue;
+class Exchange;
+class MessageListener;
+class ReturnedMessageHandler;
+
+/**
+ * The available acknowledgements modes.
+ *
+ * \ingroup clientapi
+ */
+enum AckMode {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+
+/**
+ * Represents the AMQP Basic class for sending and receiving messages.
+ */
+class Basic : public sys::Runnable
+{
+ public:
+ Basic(Channel& parent);
+
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see AckMode
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(const std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * AckMode)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ /**
+ * Deliver messages from the broker to the appropriate MessageListener.
+ */
+ void run();
+
+
+ private:
+
+ struct Consumer{
+ MessageListener* listener;
+ AckMode ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+
+ typedef std::map<std::string, Consumer> ConsumerMap;
+
+ void handle(boost::shared_ptr<framing::AMQMethodBody>);
+ void setQos();
+ void cancelAll();
+ void deliver(Consumer& consumer, Message& msg);
+
+ sys::Mutex lock;
+ Channel& channel;
+ IncomingMessage incoming;
+ ConsumerMap consumers;
+ ReturnedMessageHandler* returnsHandler;
+
+ // FIXME aconway 2007-02-22: Remove friendship.
+ friend class Channel;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_Basic_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Thu Feb
22 15:23:52 2007
@@ -37,6 +37,7 @@
using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
+ basic(*this),
connection(0),
prefetch(_prefetch),
transactional(_transactional)
@@ -113,19 +114,16 @@
bool Channel::isOpen() const { return connection; }
+void Channel::setQos() {
+ basic.setQos();
+ // FIXME aconway 2007-02-22: message
+}
+
void Channel::setPrefetch(u_int16_t _prefetch){
prefetch = _prefetch;
setQos();
}
-void Channel::setQos(){
- sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(version, 0, prefetch, false));
- if(transactional){
- sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
- }
-}
-
void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
@@ -177,114 +175,6 @@
new QueueBindBody(version, 0, q, e, key,!synch, args));
}
-void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- new BasicConsumeBody(
- version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if (synch) {
- BasicConsumeOkBody::shared_ptr response =
- shared_polymorphic_downcast<BasicConsumeOkBody>(
- responses.getResponse());
- tag = response->getConsumerTag();
- }
- // FIXME aconway 2007-02-20: Race condition!
- // We could receive the first message for the consumer
- // before we create the consumer below.
- // Move consumer creation to handler for BasicConsumeOkBody
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- THROW_QPID_ERROR(CLIENT_ERROR,
- "Consumer already exists with tag="+tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
- }
-}
-
-void Channel::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(version, tag, !synch));
-}
-
-void Channel::cancelAll(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
- {
- Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
- {
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- }
- }
-}
-
-bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
- // Expect a message starting with a BasicGetOk
- incoming.startGet();
- send(new BasicGetBody(version, 0, queue.getName(), ackMode));
- return incoming.waitGet(msg);
-}
-
-
-void Channel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- // FIXME aconway 2007-01-30: Rework for 0-9 message class.
- const string e = exchange.getName();
- string key = routingKey;
-
- send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
- //break msg up into header frame and content frame(s) and send these
- send(msg.header);
- string data = msg.getData();
- u_int64_t data_length = data.length();
- if(data_length > 0){
- u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself
uses 8 bytes
- if(data_length < frag_size){
- send(new AMQContentBody(data));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size :
remaining;
- string frag(data.substr(offset, length));
- send(new AMQContentBody(frag));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
void Channel::commit(){
sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
}
@@ -294,46 +184,53 @@
}
void Channel::handleMethodInContext(
- AMQMethodBody::shared_ptr body, const MethodContext&)
+ AMQMethodBody::shared_ptr method, const MethodContext&)
{
- //channel.flow, channel.close, basic.deliver, basic.return or a
- //response to a synchronous request
if(responses.isWaiting()) {
- responses.signalResponse(body);
+ responses.signalResponse(method);
return;
}
-
- if(body->isA<BasicDeliverBody>()
- || body->isA<BasicReturnBody>()
- || body->isA<BasicGetOkBody>()
- || body->isA<BasicGetEmptyBody>())
-
- {
- incoming.add(body);
- return;
+ try {
+ switch (method->amqpClassId()) {
+ case BasicDeliverBody::CLASS_ID: basic.handle(method); break;
+ case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+ case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+ default: throw UnknownMethod();
+ }
}
- else if(body->isA<ChannelCloseBody>()) {
- peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+ catch (const UnknownMethod&) {
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
}
- else if(body->isA<ChannelFlowBody>()){
- // TODO aconway 2007-01-24: not implemented yet.
+}
+
+void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+ switch (method->amqpMethodId()) {
+ case ChannelCloseBody::METHOD_ID:
+ peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
+ return;
+ case ChannelFlowBody::METHOD_ID:
+ // FIXME aconway 2007-02-22: Not yet implemented.
+ return;
}
- else if(body->isA<ConnectionCloseBody>()){
+ throw UnknownMethod();
+}
+
+void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
+ if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
connection->close();
- }
- else {
- connection->close(
- 504, "Unrecognised method",
- body->amqpClassId(), body->amqpMethodId());
- }
+ return;
+ }
+ throw UnknownMethod();
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- incoming.add(body);
+ basic.incoming.add(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- incoming.add(body);
+ basic.incoming.add(body);
}
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -341,82 +238,7 @@
}
void Channel::start(){
- dispatcher = Thread(this);
-}
-
-void Channel::deliver(Consumer& consumer, Message& msg){
- //record delivery tag:
- consumer.lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer.listener->received(msg);
-
- if(isOpen()){
- bool multiple(false);
- switch(consumer.ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer.count) < prefetch) break;
- //else drop-through
- case AUTO_ACK:
- send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
- consumer.lastDeliveryTag = 0;
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-void Channel::run() {
- while(isOpen()) {
- try {
- Message msg = incoming.waitDispatch();
- if(msg.getMethod()->isA<BasicReturnBody>()) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler == 0) {
- // TODO aconway 2007-02-20: proper logging.
- cout << "Message returned: " << msg.getData() << endl;
- }
- else
- handler->returned(msg);
- }
- else {
- BasicDeliverBody::shared_ptr deliverBody =
- boost::shared_polymorphic_downcast<BasicDeliverBody>(
- msg.getMethod());
- std::string tag = deliverBody->getConsumerTag();
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" + tag);
- consumer = i->second;
- }
- deliver(consumer, msg);
- }
- }
- catch (const ShutdownException&) {
- /* Orderly shutdown */
- }
- catch (const Exception& e) {
- // FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::Channel::run() terminated by: " << e.toString()
- << "(" << typeid(e).name() << ")" << endl;
- }
- }
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- Mutex::ScopedLock l(lock);
- returnsHandler = handler;
+ basicDispatcher = Thread(basic);
}
// Close called by local application.
@@ -452,13 +274,13 @@
void Channel::closeInternal() {
if (isOpen());
{
- cancelAll();
- incoming.shutdown();
+ basic.cancelAll();
+ basic.incoming.shutdown();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- dispatcher.join();
+ basicDispatcher.join();
}
void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Thu Feb 22
15:23:52 2007
@@ -21,23 +21,15 @@
* under the License.
*
*/
-#include <map>
-#include <string>
-#include <queue>
-#include <boost/scoped_ptr.hpp>
#include "sys/types.h"
-
#include <framing/amqp_framing.h>
#include <ClientExchange.h>
-#include <IncomingMessage.h>
#include <ClientMessage.h>
-#include <MessageListener.h>
#include <ClientQueue.h>
#include <ResponseHandler.h>
-#include <ReturnedMessageHandler.h>
-#include "Runnable.h"
#include "ChannelAdapter.h"
#include "Thread.h"
+#include "Basic.h"
namespace qpid {
@@ -50,27 +42,6 @@
class Connection;
-/**
- * The available acknowledgements modes
- *
- * \ingroup clientapi
- */
-enum ack_modes {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
/**
* Represents an AMQP channel, i.e. loosely a session of work. It
@@ -79,41 +50,34 @@
*
* \ingroup clientapi
*/
-class Channel : public framing::ChannelAdapter,
- public sys::Runnable
+class Channel : public framing::ChannelAdapter
{
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<std::string, Consumer> ConsumerMap;
+ private:
+ // TODO aconway 2007-02-22: Remove friendship.
+ friend class Basic;
+ // FIXME aconway 2007-02-22: friend class Message;
+ struct UnknownMethod {};
+
sys::Mutex lock;
+ Basic basic;
Connection* connection;
- sys::Thread dispatcher;
- IncomingMessage incoming;
+ sys::Thread basicDispatcher;
ResponseHandler responses;
- ConsumerMap consumers;
- ReturnedMessageHandler* returnsHandler;
u_int16_t prefetch;
const bool transactional;
framing::ProtocolVersion version;
- void retrieve(Message& msg);
- void deliver(Consumer& consumer, Message& msg);
-
void handleHeader(framing::AMQHeaderBody::shared_ptr body);
void handleContent(framing::AMQContentBody::shared_ptr body);
void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
-
void handleMethodInContext(
- framing::AMQMethodBody::shared_ptr,
- const framing::MethodContext& method);
+ framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
+ void handleChannel(framing::AMQMethodBody::shared_ptr method);
+ void handleConnection(framing::AMQMethodBody::shared_ptr method);
+
void setQos();
- void cancelAll();
void protocolInit(
const std::string& uid, const std::string& pwd,
@@ -148,18 +112,18 @@
public:
/**
- * Creates a channel object.
- *
- * @param transactional if true, the publishing and acknowledgement
- * of messages will be transactional and can be committed or
- * aborted in atomic units (@see commit(), @see rollback())
- *
- * @param prefetch specifies the number of unacknowledged
- * messages the channel is willing to have sent to it
- * asynchronously
+ * Creates a channel object.
+ *
+ * @param transactional if true, the publishing and acknowledgement
+ * of messages will be transactional and can be committed or
+ * aborted in atomic units (@see commit(), @see rollback())
+ *
+ * @param prefetch specifies the number of unacknowledged
+ * messages the channel is willing to have sent to it
+ * asynchronously
*/
- Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
+ Channel(bool transactional = false, u_int16_t prefetch = 500);
+ ~Channel();
/**
* Declares an exchange.
@@ -221,85 +185,16 @@
* @param synch if true this call will block until a response
* is received from the broker
*/
- void bind(const Exchange& exchange, const Queue& queue, const std::string&
key,
- const framing::FieldTable& args, bool synch = true);
- /**
- * Creates a 'consumer' for a queue. Messages in (or arriving
- * at) that queue will be delivered to consumers
- * asynchronously.
- *
- * @param queue a Queue instance representing the queue to
- * consume from
- *
- * @param tag an identifier to associate with the consumer
- * that can be used to cancel its subscription (if empty, this
- * will be assigned by the broker)
- *
- * @param listener a pointer to an instance of an
- * implementation of the MessageListener interface. Messages
- * received from this queue for this consumer will result in
- * invocation of the received() method on the listener, with
- * the message itself passed in.
- *
- * @param ackMode the mode of acknowledgement that the broker
- * should assume for this consumer. @see ack_modes
- *
- * @param noLocal if true, this consumer will not be sent any
- * message published by this connection
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const framing::FieldTable* fields = 0);
-
- /**
- * Cancels a subscription previously set up through a call to consume().
- *
- * @param tag the identifier used (or assigned) in the consume
- * request that set up the subscription to be cancelled.
- *
- * @param synch if true this call will block until a response
- * is received from the broker
- */
- void cancel(const std::string& tag, bool synch = true);
- /**
- * Synchronous pull of a message from a queue.
- *
- * @param msg a message object that will contain the message
- * headers and content if the call completes.
- *
- * @param queue the queue to consume from
- *
- * @param ackMode the acknowledgement mode to use (@see
- * ack_modes)
- *
- * @return true if a message was succcessfully dequeued from
- * the queue, false if the queue was empty.
- */
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+ void bind(const Exchange& exchange, const Queue& queue,
+ const std::string& key, const framing::FieldTable& args,
+ bool synch = true);
+
/**
- * Publishes (i.e. sends a message to the broker).
- *
- * @param msg the message to publish
- *
- * @param exchange the exchange to publish the message to
- *
- * @param routingKey the routing key to publish with
- *
- * @param mandatory if true and the exchange to which this
- * publish is directed has no matching bindings, the message
- * will be returned (see setReturnedMessageHandler()).
- *
- * @param immediate if true and there is no consumer to
- * receive this message on publication, the message will be
- * returned (see setReturnedMessageHandler()).
+ * Get a Basic object which provides functions to send and
+ * receive messages using the AMQP 0-8 Basic class methods.
+ [EMAIL PROTECTED] Basic
*/
- void publish(const Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
+ Basic& getBasic() { return basic; }
/**
* For a transactional channel this will commit all
@@ -314,6 +209,7 @@
* object is created (@see Channel()).
*/
void commit();
+
/**
* For a transactional channel, this will rollback any
* publications or acknowledgements. It will be as if the
@@ -327,33 +223,25 @@
*/
void setPrefetch(u_int16_t prefetch);
+ u_int16_t getPrefetch() { return prefetch; }
+
/**
* Start message dispatching on a new thread
*/
void start();
- // TODO aconway 2007-01-26: Can it be private?
- /**
- * Dispatch messages on this channel in the calling thread.
- */
- void run();
-
/**
* Close the channel with optional error information.
* Closing a channel that is not open has no effect.
*/
void close(
framing::ReplyCode = 200, const std::string& ="OK",
- framing::ClassId = 0, framing::MethodId = 0);
-
- /**
- * Set a handler for this channel that will process any
- * returned messages
- *
- * @see publish()
- */
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+ framing::ClassId = 0, framing::MethodId = 0);
+ /** True if the channel is transactional */
+ bool isTransactional() { return transactional; }
+
+ /** True if the channel is open */
bool isOpen() const;
};
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h Thu Feb 22
15:23:52 2007
@@ -112,7 +112,8 @@
{ return method; }
void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
- boost::shared_ptr<framing::AMQHeaderBody> getHeader();
+ boost::shared_ptr<framing::AMQHeaderBody> getHeader() const
+ { return header; }
// TODO aconway 2007-02-15: remove friendships.
friend class IncomingMessage;
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Thu Feb 22
15:23:52 2007
@@ -31,6 +31,7 @@
#include <sys/ShutdownHandler.h>
#include <sys/TimeoutHandler.h>
+
#include "framing/amqp_types.h"
#include <framing/amqp_framing.h>
#include <ClientExchange.h>
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am Thu Feb 22
15:23:52 2007
@@ -13,6 +13,7 @@
ClientExchange.cpp \
ClientMessage.cpp \
ClientQueue.cpp \
+ Basic.cpp \
Connection.cpp \
Connector.cpp \
IncomingMessage.cpp \
@@ -24,6 +25,7 @@
ClientExchange.h \
ClientMessage.h \
ClientQueue.h \
+ Basic.h \
Connection.h \
Connector.h \
IncomingMessage.h \
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h Thu
Feb 22 15:23:52 2007
@@ -33,7 +33,6 @@
class ThreadSafeQueue
{
public:
- struct QueueStoppedException : public Exception {};
ThreadSafeQueue() {}
@@ -47,7 +46,7 @@
}
/** Pop a value from the front of the queue. Waits till value is available.
- [EMAIL PROTECTED] QueueStoppedException if queue is stopped while waiting.
+ [EMAIL PROTECTED] ShutdownException if queue is stopped while waiting.
*/
T pop() {
ProducerConsumer::ConsumerLock consumer(pc);
@@ -57,7 +56,7 @@
container.pop_front();
return value;
}
- throw QueueStoppedException();
+ throw ShutdownException();
}
/**
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp Thu Feb 22
15:23:52 2007
@@ -76,21 +76,21 @@
void testPublishGet() {
Message pubMsg(data);
pubMsg.getHeaders().setString("hello", "world");
- channel.publish(pubMsg, exchange, qname);
+ channel.getBasic().publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
- CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
+ CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue
}
void testGetNoContent() {
Message pubMsg;
pubMsg.getHeaders().setString("hello", "world");
- channel.publish(pubMsg, exchange, qname);
+ channel.getBasic().publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
CPPUNIT_ASSERT(getMsg.getData().empty());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
@@ -98,10 +98,10 @@
void testConsumeCancel() {
string tag; // Broker assigned
- channel.consume(queue, tag, &listener);
+ channel.getBasic().consume(queue, tag, &listener);
channel.start();
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel.publish(Message("a"), exchange, qname);
+ channel.getBasic().publish(Message("a"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
Time deadline(now() + 1*TIME_SEC);
@@ -112,8 +112,8 @@
CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
- channel.publish(Message("b"), exchange, qname);
- channel.publish(Message("c"), exchange, qname);
+ channel.getBasic().publish(Message("b"), exchange, qname);
+ channel.getBasic().publish(Message("c"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 3) {
@@ -124,15 +124,15 @@
CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
- channel.cancel(tag);
- channel.publish(Message("d"), exchange, qname);
+ channel.getBasic().cancel(tag);
+ channel.getBasic().publish(Message("d"), exchange, qname);
CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
{
Mutex::ScopedLock l(listener.monitor);
CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
}
Message msg;
- CPPUNIT_ASSERT(channel.get(msg, queue));
+ CPPUNIT_ASSERT(channel.getBasic().get(msg, queue));
CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
}
@@ -140,9 +140,9 @@
void testConsumePublished() {
Message pubMsg("x");
pubMsg.getHeaders().setString("y", "z");
- channel.publish(pubMsg, exchange, qname);
+ channel.getBasic().publish(pubMsg, exchange, qname);
string tag;
- channel.consume(queue, tag, &listener);
+ channel.getBasic().consume(queue, tag, &listener);
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
channel.start();
{
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp Thu Feb 22
15:23:52 2007
@@ -101,7 +101,7 @@
Monitor monitor;
SimpleListener listener(&monitor);
string tag("MyTag");
- channel.consume(queue, tag, &listener);
+ channel.getBasic().consume(queue, tag, &listener);
if (verbose) std::cout << "Registered consumer." << std::endl;
//we need to enable the message dispatching for this channel
@@ -114,7 +114,7 @@
Message msg;
string data("MyMessage");
msg.setData(data);
- channel.publish(msg, exchange, "MyTopic");
+ channel.getBasic().publish(msg, exchange, "MyTopic");
if (verbose) std::cout << "Published message: " << data << std::endl;
{
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp Thu Feb 22
15:23:52 2007
@@ -116,7 +116,7 @@
//Consume from the response queue, logging all echoed message to
console:
LoggingListener listener;
std::string tag;
- channel.consume(response, tag, &listener);
+ channel.getBasic().consume(response, tag, &listener);
//Process incoming requests on a new thread
channel.start();
@@ -129,7 +129,7 @@
Message msg;
msg.getHeaders().setString("RESPONSE_QUEUE",
response.getName());
msg.setData(text);
- channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE,
echo_service);
+ channel.getBasic().publish(msg,
Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
std::cout << "Enter text to send:" << std::endl;
}
@@ -158,10 +158,10 @@
//Consume from the request queue, echoing back all messages
received to the client that sent them
EchoServer server(&channel);
std::string tag = "server_tag";
- channel.consume(request, tag, &server);
+ channel.getBasic().consume(request, tag, &server);
//Process incoming requests on the main thread
- channel.run();
+ channel.getBasic().run();
connection.close();
} catch(qpid::QpidError error) {
@@ -184,7 +184,7 @@
std::cout << "Echoing " << message.getData() << " back to " << name <<
std::endl;
//'echo' the message back:
- channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
+ channel->getBasic().publish(message,
Exchange::STANDARD_DIRECT_EXCHANGE, name);
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp Thu Feb 22
15:23:52 2007
@@ -71,7 +71,7 @@
class Args{
string host;
int port;
- int ackMode;
+ AckMode ackMode;
bool transactional;
int prefetch;
bool trace;
@@ -81,13 +81,13 @@
void parse(int argc, char** argv);
void usage();
- inline const string& getHost() const { return host;}
- inline int getPort() const { return port; }
- inline int getAckMode(){ return ackMode; }
- inline bool getTransactional() const { return transactional; }
- inline int getPrefetch(){ return prefetch; }
- inline bool getTrace() const { return trace; }
- inline bool getHelp() const { return help; }
+ const string& getHost() const { return host;}
+ int getPort() const { return port; }
+ AckMode getAckMode(){ return ackMode; }
+ bool getTransactional() const { return transactional; }
+ int getPrefetch(){ return prefetch; }
+ bool getTrace() const { return trace; }
+ bool getHelp() const { return help; }
};
/**
@@ -119,9 +119,9 @@
//set up listener
Listener listener(&channel, response.getName(),
args.getTransactional());
string tag;
- channel.consume(control, tag, &listener, args.getAckMode());
+ channel.getBasic().consume(control, tag, &listener,
args.getAckMode());
cout << "topic_listener: Consuming." << endl;
- channel.run();
+ channel.getBasic().run();
connection.close();
cout << "topic_listener: normal exit" << endl;
return 0;
@@ -166,7 +166,7 @@
<< time/TIME_MSEC << " ms.";
Message msg(reportstr.str());
msg.getHeaders().setString("TYPE", "REPORT");
- channel->publish(msg, string(), responseQueue);
+ channel->getBasic().publish(msg, string(), responseQueue);
if(transactional){
channel->commit();
}
@@ -184,7 +184,7 @@
}else if("-port" == name){
port = atoi(argv[++i]);
}else if("-ack_mode" == name){
- ackMode = atoi(argv[++i]);
+ ackMode = AckMode(atoi(argv[++i]));
}else if("-transactional" == name){
transactional = true;
}else if("-prefetch" == name){
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp Thu Feb 22
15:23:52 2007
@@ -80,7 +80,7 @@
int port;
int messages;
int subscribers;
- int ackMode;
+ AckMode ackMode;
bool transactional;
int prefetch;
int batches;
@@ -96,18 +96,18 @@
void parse(int argc, char** argv);
void usage();
- inline const string& getHost() const { return host;}
- inline int getPort() const { return port; }
- inline int getMessages() const { return messages; }
- inline int getSubscribers() const { return subscribers; }
- inline int getAckMode(){ return ackMode; }
- inline bool getTransactional() const { return transactional; }
- inline int getPrefetch(){ return prefetch; }
- inline int getBatches(){ return batches; }
- inline int getDelay(){ return delay; }
- inline int getSize(){ return size; }
- inline bool getTrace() const { return trace; }
- inline bool getHelp() const { return help; }
+ const string& getHost() const { return host;}
+ int getPort() const { return port; }
+ int getMessages() const { return messages; }
+ int getSubscribers() const { return subscribers; }
+ AckMode getAckMode(){ return ackMode; }
+ bool getTransactional() const { return transactional; }
+ int getPrefetch(){ return prefetch; }
+ int getBatches(){ return batches; }
+ int getDelay(){ return delay; }
+ int getSize(){ return size; }
+ bool getTrace() const { return trace; }
+ bool getHelp() const { return help; }
};
int main(int argc, char** argv) {
@@ -129,7 +129,7 @@
//set up listener
Publisher publisher(&channel, "topic_control",
args.getTransactional());
std::string tag("mytag");
- channel.consume(response, tag, &publisher, args.getAckMode());
+ channel.getBasic().consume(response, tag, &publisher,
args.getAckMode());
channel.start();
int batchSize(args.getBatches());
@@ -187,12 +187,12 @@
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
- channel->publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE,
controlTopic);
+ channel->getBasic().publish(msg,
Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
}
//send report request
Message reportRequest;
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE,
controlTopic);
+ channel->getBasic().publish(reportRequest,
Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
if(transactional){
channel->commit();
}
@@ -216,7 +216,7 @@
//send termination request
Message terminationRequest;
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE,
controlTopic);
+ channel->getBasic().publish(terminationRequest,
Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
if(transactional){
channel->commit();
}
@@ -237,7 +237,7 @@
}else if("-subscribers" == name){
subscribers = atoi(argv[++i]);
}else if("-ack_mode" == name){
- ackMode = atoi(argv[++i]);
+ ackMode = AckMode(atoi(argv[++i]));
}else if("-transactional" == name){
transactional = true;
}else if("-prefetch" == name){