Is this in a single broker or a network of brokers? If the latter, does it
work for a single broker without advisory messages enabled?

Also, your activemq.xml didn't make it.

Tim

On Sun, Sep 16, 2018, 2:06 AM dpatel <dhpa...@factset.com> wrote:

>         ActiveMQCPP version: 3.7.1
> AcitveMQBroker version: 5.10.0
>
> Here is a simple example. The code includes both consumer and producer
>
>
> // START SNIPPET: demo
>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <decaf/lang/Integer.h>
> #include <decaf/lang/Long.h>
> #include <decaf/lang/System.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Config.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <stdio.h>
> #include <iostream>
> #include <memory>
>
> #include <decaf/util/Random.h>
>
> using namespace activemq::core;
> using namespace decaf::util::concurrent;
> using namespace decaf::util;
> using namespace decaf::lang;
> using namespace cms;
> using namespace std;
>
> #define  QUEUE_NAME    "eventQueue"
> #define NAME_BYTE_LEN        16
>
> class HelloWorldProducer : public ExceptionListener,
>         public MessageListener,
>         public Runnable {
> private:
>         CountDownLatch latch;
>         CountDownLatch doneLatch;
>         Connection* connection;
>         Session* session;
>         Destination* destination;
>         MessageProducer* producer;
>         int numMessages;
>         bool useTopic;
>         bool sessionTransacted;
>         std::string brokerURI;
>         bool bReciveMessage;
>         long waitMillis;
>
> private:
>
>         HelloWorldProducer(const HelloWorldProducer&);
>         HelloWorldProducer& operator=(const HelloWorldProducer&);
>
> public:
>
>         HelloWorldProducer(const std::string& brokerURI, int numMessages,
> bool
> useTopic = false, bool sessionTransacted = false,
>                 long waitMillis = 3000) :
>                 latch(1),
>                 doneLatch(numMessages),
>                 connection(NULL),
>                 session(NULL),
>                 destination(NULL),
>                 producer(NULL),
>                 numMessages(numMessages),
>                 useTopic(useTopic),
>                 sessionTransacted(sessionTransacted),
>                 brokerURI(brokerURI),
>                 bReciveMessage(false),
>                 waitMillis(waitMillis)
>         { }
>
>         virtual ~HelloWorldProducer() {
>                 cleanup();
>         }
>
>         void close() {
>                 this->cleanup();
>         }
>
>         void waitUntilReady() {
>                 latch.await();
>         }
>
>         virtual void run() {
>
>                 try {
>
>                         // Create a ConnectionFactory
>                         auto_ptr<ConnectionFactory> connectionFactory(
>
> ConnectionFactory::createCMSConnectionFactory(brokerURI));
>
>                         // Create a Connection
>                         connection =
> connectionFactory->createConnection();
>                         connection->start();
>
>                         // Create a Session
>                         if (this->sessionTransacted) {
>                                 session =
> connection->createSession(Session::SESSION_TRANSACTED);
>                         }
>                         else {
>                                 session =
> connection->createSession(Session::AUTO_ACKNOWLEDGE);
>                         }
>
>                         session = connection->createSession();
>                         // Create the destination (Topic or Queue)
>                         if (useTopic) {
>                                 destination =
> session->createTopic(QUEUE_NAME);
>                         }
>                         else {
>                                 destination =
> session->createQueue(QUEUE_NAME);
>                         }
>
>                         // Create a MessageProducer from the Session to the
> Topic or Queue
>                         producer = session->createProducer(destination);
>
> producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
>                         // Create the Thread Id String
>                         string threadIdStr =
> Long::toString(Thread::currentThread()->getId());
>
>                         // Create a messages
>                         string text = (string) "Hello world! from thread "
> +
> threadIdStr;
>
>                         for (int ix = 0; ix < numMessages; ++ix) {
>                                 std::auto_ptr<TextMessage>
> message(session->createTextMessage(text));
>
>                                 //????...
>                                 std::auto_ptr<Destination>
> tempDest(session->createTemporaryQueue());
>
>                                 //cms::Destination
> tempDest=session->createTemporaryTopic() ;
>                                 MessageConsumer * responseConsumer =
> session->createConsumer(tempDest.get());
>
> responseConsumer->setMessageListener(this);//??...
>
>
>                                 message->setCMSReplyTo(tempDest.get());
>                                 Random random;
>                                 char buffer[NAME_BYTE_LEN] = { 0 };
>                                 random.nextBytes((unsigned char *)buffer,
> NAME_BYTE_LEN);
>                                 string correlationId = "";
>                                 for (int i = 0; i < NAME_BYTE_LEN; ++i)
>                                 {
>                                         char ch[NAME_BYTE_LEN * 2] = { 0
> };
>                                         sprintf(ch, "%02X", (unsigned
> char)buffer[i]);
>                                         string str(ch);
>
>                                         correlationId += str;
>                                 }
>
>
> message->setCMSCorrelationID(correlationId);
>
>                                 message->setIntProperty("Integer", ix);
>                                 printf("Producer Sent message #%d from
> thread %s\n", ix + 1,
> threadIdStr.c_str());
>                                 producer->send(message.get());
>
>                                 // Indicate we are ready for messages.
>                                 latch.countDown();
>
>                                 // Wait while asynchronous messages come
> in.
>                                 doneLatch.await(waitMillis);
>
>                         }
>                 }
>                 catch (CMSException& e) {
>                         printf("Producer run() CMSException \n");
>                         // Indicate we are ready for messages.
>                         latch.countDown();
>                         e.printStackTrace();
>                 }
>
>
>         }
>
>
>         // Called from the Producer since this class is a registered
> MessageListener.
>         virtual void onMessage(const Message* message) {
>
>                 static int count = 0;
>
>                 try {
>                         count++;
>                         const TextMessage* textMessage = dynamic_cast<const
> TextMessage*>
> (message);
>                         //ActiveMQMessageTransformation
>                         //std::auto_ptr<TextMessage>
> responsemessage(session->createTextMessage());
>
> //responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
>
>                         //responsemessage->getCMSReplyTo()
>
>                         string text = "";
>
>                         if (textMessage != NULL) {
>                                 text = textMessage->getText();
>                         }
>                         else {
>                                 text = "NOT A TEXTMESSAGE!";
>                         }
>
>                         printf("Producer Message #%d Received: %s\n",
> count,
> text.c_str());
>
>
>                         //producer.send
>
>                 }
>                 catch (CMSException& e) {
>                         printf("Producer onMessage() CMSException \n");
>                         e.printStackTrace();
>                 }
>
>                 // Commit all messages.
>                 if (this->sessionTransacted) {
>                         session->commit();
>                 }
>
>                 // No matter what, tag the count down latch until done.
>                 doneLatch.countDown();
>         }
>
>         // If something bad happens you see it here as this class is also
> been
>         // registered as an ExceptionListener with the connection.
>         virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
>                 printf("Producer onException() CMS Exception occurred.
> Shutting down
> client. \n");
>                 ex.printStackTrace();
>                 exit(1);
>         }
>
>
> private:
>
>         void cleanup() {
>
>                 if (connection != NULL) {
>                         try {
>                                 connection->close();
>                         }
>                         catch (cms::CMSException& ex) {
>                                 ex.printStackTrace();
>                         }
>                 }
>
>                 // Destroy resources.
>                 try {
>                         delete destination;
>                         destination = NULL;
>                         delete producer;
>                         producer = NULL;
>                         delete session;
>                         session = NULL;
>                         delete connection;
>                         connection = NULL;
>                 }
>                 catch (CMSException& e) {
>                         e.printStackTrace();
>                 }
>         }
> };
>
> class HelloWorldConsumer : public ExceptionListener,
>         public MessageListener,
>         public Runnable {
>
> private:
>
>         CountDownLatch latch;
>         CountDownLatch doneLatch;
>         Connection* connection;
>         Session* session;
>         Destination* destination;
>         MessageConsumer* consumer;
>         MessageProducer *producer;
>         long waitMillis;
>         bool useTopic;
>         bool sessionTransacted;
>         std::string brokerURI;
>
> private:
>
>         HelloWorldConsumer(const HelloWorldConsumer&);
>         HelloWorldConsumer& operator=(const HelloWorldConsumer&);
>
> public:
>
>         HelloWorldConsumer(const std::string& brokerURI, int numMessages,
> bool
> useTopic = false, bool sessionTransacted = false, int waitMillis = 30000)
> :
>                 latch(1),
>                 doneLatch(numMessages),
>                 connection(NULL),
>                 session(NULL),
>                 destination(NULL),
>                 consumer(NULL),
>                 producer(NULL),
>                 waitMillis(waitMillis),
>                 useTopic(useTopic),
>                 sessionTransacted(sessionTransacted),
>                 brokerURI(brokerURI) {
>         }
>
>         virtual ~HelloWorldConsumer() {
>                 cleanup();
>         }
>
>         void close() {
>                 this->cleanup();
>         }
>
>         void waitUntilReady() {
>                 latch.await();
>         }
>
>         virtual void run() {
>
>                 try {
>
>                         // Create a ConnectionFactory
>                         auto_ptr<ConnectionFactory> connectionFactory(
>
> ConnectionFactory::createCMSConnectionFactory(brokerURI));
>
>                         // Create a Connection
>                         connection =
> connectionFactory->createConnection();
>                         connection->start();
>                         connection->setExceptionListener(this);
>
>                         // Create a Session
>                         if (this->sessionTransacted == true) {
>                                 session =
> connection->createSession(Session::SESSION_TRANSACTED);
>                         }
>                         else {
>                                 session =
> connection->createSession(Session::AUTO_ACKNOWLEDGE);
>                         }
>
>                         // Create the destination (Topic or Queue)
>                         if (useTopic) {
>                                 destination =
> session->createTopic(QUEUE_NAME);
>                         }
>                         else {
>                                 destination =
> session->createQueue(QUEUE_NAME);
>                         }
>
>                         producer = session->createProducer();
>
> producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
>
>                         // Create a MessageConsumer from the Session to the
> Topic or Queue
>                         consumer = session->createConsumer(destination);
>
>                         consumer->setMessageListener(this);
>
>                         std::cout.flush();
>                         std::cerr.flush();
>
>                         // Indicate we are ready for messages.
>                         latch.countDown();
>
>                         // Wait while asynchronous messages come in.
>                         doneLatch.await();
>
>                 }
>                 catch (CMSException& e) {
>                         printf("Consumer onException() CMS Exception
> occurred.  Shutting down
> client. \n");
>                         // Indicate we are ready for messages.
>                         latch.countDown();
>                         e.printStackTrace();
>                 }
>         }
>
>         // Called from the consumer since this class is a registered
> MessageListener.
>         virtual void onMessage(const Message* message) {
>
>                 static int count = 0;
>
>                 try {
>                         count++;
>
>
>                         // Create the Thread Id String
>                         string threadIdStr =
> Long::toString(Thread::currentThread()->getId());
>
>                         static bool bPrintf = true;
>                         if (bPrintf)
>                         {
>                                 bPrintf = false;
>                                 printf("consumer Message threadid: %s\n",
> threadIdStr.c_str());
>                         }
>
>                         string strReply = "consumer return  xxx,ThreadID="
> +
> threadIdStr;
>                         const TextMessage* textMessage = dynamic_cast<const
> TextMessage*>
> (message);
>
>                         if (NULL == textMessage)
>                         {
>                                 printf("NULL==textMessage %s",
> message->getCMSType().c_str());
>
>
>                                 //const cms::MapMessage* mapMsg =
> dynamic_cast<const
> cms::MapMessage*>(message);
>                                 //if(mapMsg)
>                                 //{
>                                 //
>                                 //    std::vector<std::string> elements =
> mapMsg->getMapNames();
>                                 //    std::vector<std::string>::iterator
> iter = elements.begin();
>                                 //    for(; iter != elements.end() ;
> ++iter)
>                                 //    {
>                                 //        std::string key = *iter;
>                                 //        cms::Message::ValueType
> elementType =
> mapMsg->getValueType(key);
>                                 //        string strxxx;
>                                 //        int cc=0;
>                                 //        switch(elementType) {
>                                 //    case cms::Message::BOOLEAN_TYPE:
>                                 //        //msg->setBoolean(key,
> mapMsg->getBoolean(key));
>                                 //        break;
>                                 //    case cms::Message::BYTE_TYPE:
>                                 //        //msg->setByte(key,
> mapMsg->getByte(key));
>                                 //        break;
>                                 //    case cms::Message::BYTE_ARRAY_TYPE:
>                                 //        //msg->setBytes(key,
> mapMsg->getBytes(key));
>                                 //        break;
>                                 //    case cms::Message::CHAR_TYPE:
>                                 //        //msg->setChar(key,
> mapMsg->getChar(key));
>                                 //        break;
>                                 //    case cms::Message::SHORT_TYPE:
>                                 //        //msg->setShort(key,
> mapMsg->getShort(key));
>                                 //        break;
>                                 //    case cms::Message::INTEGER_TYPE:
>                                 //        //msg->setInt(key,
> mapMsg->getInt(key));
>                                 //        break;
>                                 //    case cms::Message::LONG_TYPE:
>                                 //        //msg->setLong(key,
> mapMsg->getLong(key));
>                                 //        break;
>                                 //    case cms::Message::FLOAT_TYPE:
>                                 //        //msg->setFloat(key,
> mapMsg->getFloat(key));
>                                 //        break;
>                                 //    case cms::Message::DOUBLE_TYPE:
>                                 //        //msg->setDouble(key,
> mapMsg->getDouble(key));
>                                 //        break;
>                                 //    case cms::Message::STRING_TYPE:
>                                 //        //msg->setString(key,
> mapMsg->getString(key));
>                                 //        strxxx=mapMsg->getString(key);
>                                 //        cc=1;
>                                 //        break;
>                                 //    default:
>                                 //        break;
>                                 //        }
>                                 //    }
>
>                                 //}
>
>                                 return;
>                         }
>
>                         std::auto_ptr<TextMessage>
> responsemessage(session->createTextMessage(strReply));
>
> responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
>
>
>                         string text = "";
>
>                         if (textMessage != NULL) {
>                                 text = textMessage->getText();
>                         }
>                         else {
>                                 text = "NOT A TEXTMESSAGE!";
>                         }
>
>                         int nProPerty =
> textMessage->getIntProperty("Integer");
>                         printf("consumer Message #%d Received:
> %s,nProPerty[%d]\n", count,
> text.c_str(), nProPerty);
>
>
>                         const cms::Destination* destSend =
> textMessage->getCMSReplyTo();
>                         if (destSend)
>                         {
>                                 this->producer->send(destSend,
> responsemessage.get());
>
>                                 printf("consumer Message #%d send: %s\n",
> count, strReply.c_str());
>                         }
>
>
>                 }
>                 catch (CMSException& e) {
>                         printf("Consumer onMessage() CMS Exception
> occurred.
> Shutting down
> client. \n");
>                         e.printStackTrace();
>                 }
>
>                 // Commit all messages.
>                 if (this->sessionTransacted) {
>                         session->commit();
>                 }
>
>                 // No matter what, tag the count down latch until done.
>                 //doneLatch.countDown();
>         }
>
>         // If something bad happens you see it here as this class is also
> been
>         // registered as an ExceptionListener with the connection.
>         virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
>                 printf("Consumer onException() CMS Exception occurred.
> Shutting down
> client. \n");
>                 //printf("CMS Exception occurred.  Shutting down
> client.\n");
>                 ex.printStackTrace();
>                 exit(1);
>         }
>
> private:
>
>         void cleanup() {
>                 if (connection != NULL) {
>                         try {
>                                 connection->close();
>                         }
>                         catch (cms::CMSException& ex) {
>                                 ex.printStackTrace();
>                         }
>                 }
>
>                 // Destroy resources.
>                 try {
>                         delete destination;
>                         destination = NULL;
>                         delete consumer;
>                         consumer = NULL;
>                         delete session;
>                         session = NULL;
>                         delete connection;
>                         connection = NULL;
>                 }
>                 catch (CMSException& e) {
>                         e.printStackTrace();
>                 }
>         }
> };
>
> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
>
>         //if(argc<2)
>         //{
>         //    printf("argc<2\r\n");
>         //    return 0;
>         //}
>
>         activemq::library::ActiveMQCPP::initializeLibrary();
>         {
>                 std::cout <<
> "=====================================================\n";
>                 std::cout << "Starting the example:" << std::endl;
>                 std::cout <<
> "-----------------------------------------------------\n";
>
>
>                 // Set the URI to point to the IP Address of your broker.
>                 // add any optional params to the url to enable things
> like
>                 // tightMarshalling or tcp logging etc.  See the CMS web
> site for
>                 // a full list of configuration options.
>                 //
>                 //  http://activemq.apache.org/cms/
>                 //
>                 // Wire Format Options:
>                 // =========================
>                 // Use either stomp or openwire, the default ports are
> different for each
>                 //
>                 // Examples:
>                 //    tcp://127.0.0.1:61616                      default
> to
> openwire
>                 //    tcp://127.0.0.1:61616?wireFormat=openwire  same as
> above
>                 //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp
> instead
>                 //
>                 // SSL:
>                 // =========================
>                 // To use SSL you need to specify the location of the
> trusted Root CA or
> the
>                 // certificate for the broker you want to connect to.
> Using
> the Root CA
> allows
>                 // you to use failover with multiple servers all using
> certificates signed
> by
>                 // the trusted root.  If using client authentication you
> also need to
> specify
>                 // the location of the client Certificate.
>                 //
>                 //     System::setProperty( "decaf.net.ssl.keyStore",
> "<path>/client.pem"
> );
>                 //     System::setProperty(
> "decaf.net.ssl.keyStorePassword", "password"
> );
>                 //     System::setProperty( "decaf.net.ssl.trustStore",
> "<path>/rootCA.pem" );
>                 //
>                 // The you just specify the ssl transport in the URI, for
> example:
>                 //
>                 //     ssl://localhost:61617
>                 //
>                 std::string brokerURI =
> "tcp://127.0.0.1:61616?jms.watchTopicAdvisories=false";
>
>
> //============================================================
>                 // set to true to use topics instead of queues
>                 // Note in the code above that this causes createTopic or
>                 // createQueue to be used in both consumer an producer.
>
> //============================================================
>                 bool useTopics = false;
>                 bool sessionTransacted = true;
>                 int numMessages = 1;
>                 bool useConsumer = true;
>                 bool useProducer = true;
>
>                 //int nSet=atoi(argv[1]);
>                 //if(1==nSet)
>                 //{
>                 //#define USE_COMSUMER
>
>
>                 //}
>                 //else
>                 //{
>                 //#define USE_PRODUCER
>
>                 //
>                 //}
>
>
>
>                 long long startTime = System::currentTimeMillis();
>
> #ifdef USE_PRODUCER
>                 printf("?? USE_PRODUCER \r\n");
>
>                 int numProducerMessages = 30;
>                 int nThreadNumber = 10;
>                 vector<HelloWorldProducer *> vHelloWorldProducer;
>                 for (int i = 0; i < nThreadNumber; ++i)
>                 {
>                         HelloWorldProducer * producerTemp = new
> HelloWorldProducer(brokerURI,
> numProducerMessages, useTopics);
>                         vHelloWorldProducer.push_back(producerTemp);
>                 }
>
> #endif
>
> #ifdef USE_COMSUMER
>                 printf("?? USE_COMSUMER \r\n");
>                 HelloWorldConsumer consumer(brokerURI, numMessages,
> useTopics,
> sessionTransacted);
>                 // Start the consumer thread.
>                 Thread consumerThread(&consumer);
>                 consumerThread.start();
>
>                 // Wait for the consumer to indicate that its ready to go.
>                 consumer.waitUntilReady();
>
> #endif
>
>
>
>
> #ifdef USE_PRODUCER
>                 // Start the producer thread.
>
>                 vector<Thread *> vThread;
>                 for (int i = 0; i < nThreadNumber; ++i)
>                 {
>                         HelloWorldProducer & ProducerTemp =
> *vHelloWorldProducer[i];
>                         Thread * threadTemp = new Thread(&ProducerTemp);
>                         vThread.push_back(threadTemp);
>                         threadTemp->start();
>                         ProducerTemp.waitUntilReady();
>
>                 }
>
>                 for (size_t i = 0; i < vThread.size(); ++i)
>                 {
>                         Thread * threadTemp = vThread[i];
>                         //threadTemp->join();
>                 }
>                 while (1)
>                 {
>                         Thread::sleep(10);
>                 }
>
>                 //Thread producerThread1(&producer1);
>                 //producerThread1.start();
>                 //producer1.waitUntilReady();
>
>                 //Thread producerThread2(&producer2);
>                 //producerThread2.start();
>                 //producer2.waitUntilReady();
>
>                 //Thread producerThread3(&producer3);
>                 //producerThread3.start();
>                 //producer3.waitUntilReady();
> #endif
>
>
>
>
> #ifdef USE_PRODUCER
>                 // Wait for the threads to complete.
>                 //producerThread1.join();
>                 //producerThread2.join();
>                 //producerThread3.join();
> #endif
>
> #ifdef USE_COMSUMER
>                 consumerThread.join();
> #endif
>
>                 long long endTime = System::currentTimeMillis();
>                 double totalTime = (double)(endTime - startTime) / 1000.0;
>
> #ifdef USE_PRODUCER
>                 //producer1.close();
>                 //producer2.close();
>                 //producer3.close();
>
>                 for (size_t i = 0; i < vHelloWorldProducer.size(); ++i)
>                 {
>                         HelloWorldProducer * ProducerTemp =
> vHelloWorldProducer[i];
>                         ProducerTemp->close();
>
>                         if (ProducerTemp)
>                         {
>                                 delete ProducerTemp;
>                                 ProducerTemp = NULL;
>                         }
>                 }
>
> #endif
> #ifdef USE_COMSUMER
>                 consumer.close();
> #endif
>
>
>
>
>                 std::cout << "Time to completion = " << totalTime << "
> seconds." <<
> std::endl;
>                 std::cout <<
> "-----------------------------------------------------\n";
>                 std::cout << "Finished with the example." << std::endl;
>                 std::cout <<
> "=====================================================\n";
>
>         }
>         activemq::library::ActiveMQCPP::shutdownLibrary();
>
>
>         return 0;
> }
>
> When I run the above example producer and consumer following happens:
>
> 1. Producer is able to put the message on the queue.
> 2. Consumer is able to retrieve the message from the queues.
> 3. When consumer tries to send response back using replyTo desitnation,
> the
> send fails with error message listed above.
>
> On the broker and consumer/producer I have advisorySupport turned OFF.
> When
> I turn them on this work fine.
>
> What I would like to know:
> 1. How can I make the error go away but still have advisorySupport off.
>
> Thanks a lot for your help.
>
> P.S-- Attaching my activemq.xml.
>
>
>
> --
> Sent from:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
>

Reply via email to