Is this in a single broker or a network of brokers? If the latter, does it work for a single broker without advisory messages enabled?
Also, your activemq.xml didn't make it. Tim On Sun, Sep 16, 2018, 2:06 AM dpatel <dhpa...@factset.com> wrote: > ActiveMQCPP version: 3.7.1 > AcitveMQBroker version: 5.10.0 > > Here is a simple example. The code includes both consumer and producer > > > // START SNIPPET: demo > > #include <activemq/library/ActiveMQCPP.h> > #include <decaf/lang/Thread.h> > #include <decaf/lang/Runnable.h> > #include <decaf/util/concurrent/CountDownLatch.h> > #include <decaf/lang/Integer.h> > #include <decaf/lang/Long.h> > #include <decaf/lang/System.h> > #include <activemq/core/ActiveMQConnectionFactory.h> > #include <activemq/util/Config.h> > #include <cms/Connection.h> > #include <cms/Session.h> > #include <cms/TextMessage.h> > #include <cms/BytesMessage.h> > #include <cms/MapMessage.h> > #include <cms/ExceptionListener.h> > #include <cms/MessageListener.h> > #include <stdlib.h> > #include <stdio.h> > #include <iostream> > #include <memory> > > #include <decaf/util/Random.h> > > using namespace activemq::core; > using namespace decaf::util::concurrent; > using namespace decaf::util; > using namespace decaf::lang; > using namespace cms; > using namespace std; > > #define QUEUE_NAME "eventQueue" > #define NAME_BYTE_LEN 16 > > class HelloWorldProducer : public ExceptionListener, > public MessageListener, > public Runnable { > private: > CountDownLatch latch; > CountDownLatch doneLatch; > Connection* connection; > Session* session; > Destination* destination; > MessageProducer* producer; > int numMessages; > bool useTopic; > bool sessionTransacted; > std::string brokerURI; > bool bReciveMessage; > long waitMillis; > > private: > > HelloWorldProducer(const HelloWorldProducer&); > HelloWorldProducer& operator=(const HelloWorldProducer&); > > public: > > HelloWorldProducer(const std::string& brokerURI, int numMessages, > bool > useTopic = false, bool sessionTransacted = false, > long waitMillis = 3000) : > latch(1), > doneLatch(numMessages), > connection(NULL), > session(NULL), > destination(NULL), > producer(NULL), > numMessages(numMessages), > useTopic(useTopic), > sessionTransacted(sessionTransacted), > brokerURI(brokerURI), > bReciveMessage(false), > waitMillis(waitMillis) > { } > > virtual ~HelloWorldProducer() { > cleanup(); > } > > void close() { > this->cleanup(); > } > > void waitUntilReady() { > latch.await(); > } > > virtual void run() { > > try { > > // Create a ConnectionFactory > auto_ptr<ConnectionFactory> connectionFactory( > > ConnectionFactory::createCMSConnectionFactory(brokerURI)); > > // Create a Connection > connection = > connectionFactory->createConnection(); > connection->start(); > > // Create a Session > if (this->sessionTransacted) { > session = > connection->createSession(Session::SESSION_TRANSACTED); > } > else { > session = > connection->createSession(Session::AUTO_ACKNOWLEDGE); > } > > session = connection->createSession(); > // Create the destination (Topic or Queue) > if (useTopic) { > destination = > session->createTopic(QUEUE_NAME); > } > else { > destination = > session->createQueue(QUEUE_NAME); > } > > // Create a MessageProducer from the Session to the > Topic or Queue > producer = session->createProducer(destination); > > producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); > > // Create the Thread Id String > string threadIdStr = > Long::toString(Thread::currentThread()->getId()); > > // Create a messages > string text = (string) "Hello world! from thread " > + > threadIdStr; > > for (int ix = 0; ix < numMessages; ++ix) { > std::auto_ptr<TextMessage> > message(session->createTextMessage(text)); > > //????... > std::auto_ptr<Destination> > tempDest(session->createTemporaryQueue()); > > //cms::Destination > tempDest=session->createTemporaryTopic() ; > MessageConsumer * responseConsumer = > session->createConsumer(tempDest.get()); > > responseConsumer->setMessageListener(this);//??... > > > message->setCMSReplyTo(tempDest.get()); > Random random; > char buffer[NAME_BYTE_LEN] = { 0 }; > random.nextBytes((unsigned char *)buffer, > NAME_BYTE_LEN); > string correlationId = ""; > for (int i = 0; i < NAME_BYTE_LEN; ++i) > { > char ch[NAME_BYTE_LEN * 2] = { 0 > }; > sprintf(ch, "%02X", (unsigned > char)buffer[i]); > string str(ch); > > correlationId += str; > } > > > message->setCMSCorrelationID(correlationId); > > message->setIntProperty("Integer", ix); > printf("Producer Sent message #%d from > thread %s\n", ix + 1, > threadIdStr.c_str()); > producer->send(message.get()); > > // Indicate we are ready for messages. > latch.countDown(); > > // Wait while asynchronous messages come > in. > doneLatch.await(waitMillis); > > } > } > catch (CMSException& e) { > printf("Producer run() CMSException \n"); > // Indicate we are ready for messages. > latch.countDown(); > e.printStackTrace(); > } > > > } > > > // Called from the Producer since this class is a registered > MessageListener. > virtual void onMessage(const Message* message) { > > static int count = 0; > > try { > count++; > const TextMessage* textMessage = dynamic_cast<const > TextMessage*> > (message); > //ActiveMQMessageTransformation > //std::auto_ptr<TextMessage> > responsemessage(session->createTextMessage()); > > //responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); > > //responsemessage->getCMSReplyTo() > > string text = ""; > > if (textMessage != NULL) { > text = textMessage->getText(); > } > else { > text = "NOT A TEXTMESSAGE!"; > } > > printf("Producer Message #%d Received: %s\n", > count, > text.c_str()); > > > //producer.send > > } > catch (CMSException& e) { > printf("Producer onMessage() CMSException \n"); > e.printStackTrace(); > } > > // Commit all messages. > if (this->sessionTransacted) { > session->commit(); > } > > // No matter what, tag the count down latch until done. > doneLatch.countDown(); > } > > // If something bad happens you see it here as this class is also > been > // registered as an ExceptionListener with the connection. > virtual void onException(const CMSException& ex AMQCPP_UNUSED) { > printf("Producer onException() CMS Exception occurred. > Shutting down > client. \n"); > ex.printStackTrace(); > exit(1); > } > > > private: > > void cleanup() { > > if (connection != NULL) { > try { > connection->close(); > } > catch (cms::CMSException& ex) { > ex.printStackTrace(); > } > } > > // Destroy resources. > try { > delete destination; > destination = NULL; > delete producer; > producer = NULL; > delete session; > session = NULL; > delete connection; > connection = NULL; > } > catch (CMSException& e) { > e.printStackTrace(); > } > } > }; > > class HelloWorldConsumer : public ExceptionListener, > public MessageListener, > public Runnable { > > private: > > CountDownLatch latch; > CountDownLatch doneLatch; > Connection* connection; > Session* session; > Destination* destination; > MessageConsumer* consumer; > MessageProducer *producer; > long waitMillis; > bool useTopic; > bool sessionTransacted; > std::string brokerURI; > > private: > > HelloWorldConsumer(const HelloWorldConsumer&); > HelloWorldConsumer& operator=(const HelloWorldConsumer&); > > public: > > HelloWorldConsumer(const std::string& brokerURI, int numMessages, > bool > useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) > : > latch(1), > doneLatch(numMessages), > connection(NULL), > session(NULL), > destination(NULL), > consumer(NULL), > producer(NULL), > waitMillis(waitMillis), > useTopic(useTopic), > sessionTransacted(sessionTransacted), > brokerURI(brokerURI) { > } > > virtual ~HelloWorldConsumer() { > cleanup(); > } > > void close() { > this->cleanup(); > } > > void waitUntilReady() { > latch.await(); > } > > virtual void run() { > > try { > > // Create a ConnectionFactory > auto_ptr<ConnectionFactory> connectionFactory( > > ConnectionFactory::createCMSConnectionFactory(brokerURI)); > > // Create a Connection > connection = > connectionFactory->createConnection(); > connection->start(); > connection->setExceptionListener(this); > > // Create a Session > if (this->sessionTransacted == true) { > session = > connection->createSession(Session::SESSION_TRANSACTED); > } > else { > session = > connection->createSession(Session::AUTO_ACKNOWLEDGE); > } > > // Create the destination (Topic or Queue) > if (useTopic) { > destination = > session->createTopic(QUEUE_NAME); > } > else { > destination = > session->createQueue(QUEUE_NAME); > } > > producer = session->createProducer(); > > producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); > > // Create a MessageConsumer from the Session to the > Topic or Queue > consumer = session->createConsumer(destination); > > consumer->setMessageListener(this); > > std::cout.flush(); > std::cerr.flush(); > > // Indicate we are ready for messages. > latch.countDown(); > > // Wait while asynchronous messages come in. > doneLatch.await(); > > } > catch (CMSException& e) { > printf("Consumer onException() CMS Exception > occurred. Shutting down > client. \n"); > // Indicate we are ready for messages. > latch.countDown(); > e.printStackTrace(); > } > } > > // Called from the consumer since this class is a registered > MessageListener. > virtual void onMessage(const Message* message) { > > static int count = 0; > > try { > count++; > > > // Create the Thread Id String > string threadIdStr = > Long::toString(Thread::currentThread()->getId()); > > static bool bPrintf = true; > if (bPrintf) > { > bPrintf = false; > printf("consumer Message threadid: %s\n", > threadIdStr.c_str()); > } > > string strReply = "consumer return xxx,ThreadID=" > + > threadIdStr; > const TextMessage* textMessage = dynamic_cast<const > TextMessage*> > (message); > > if (NULL == textMessage) > { > printf("NULL==textMessage %s", > message->getCMSType().c_str()); > > > //const cms::MapMessage* mapMsg = > dynamic_cast<const > cms::MapMessage*>(message); > //if(mapMsg) > //{ > // > // std::vector<std::string> elements = > mapMsg->getMapNames(); > // std::vector<std::string>::iterator > iter = elements.begin(); > // for(; iter != elements.end() ; > ++iter) > // { > // std::string key = *iter; > // cms::Message::ValueType > elementType = > mapMsg->getValueType(key); > // string strxxx; > // int cc=0; > // switch(elementType) { > // case cms::Message::BOOLEAN_TYPE: > // //msg->setBoolean(key, > mapMsg->getBoolean(key)); > // break; > // case cms::Message::BYTE_TYPE: > // //msg->setByte(key, > mapMsg->getByte(key)); > // break; > // case cms::Message::BYTE_ARRAY_TYPE: > // //msg->setBytes(key, > mapMsg->getBytes(key)); > // break; > // case cms::Message::CHAR_TYPE: > // //msg->setChar(key, > mapMsg->getChar(key)); > // break; > // case cms::Message::SHORT_TYPE: > // //msg->setShort(key, > mapMsg->getShort(key)); > // break; > // case cms::Message::INTEGER_TYPE: > // //msg->setInt(key, > mapMsg->getInt(key)); > // break; > // case cms::Message::LONG_TYPE: > // //msg->setLong(key, > mapMsg->getLong(key)); > // break; > // case cms::Message::FLOAT_TYPE: > // //msg->setFloat(key, > mapMsg->getFloat(key)); > // break; > // case cms::Message::DOUBLE_TYPE: > // //msg->setDouble(key, > mapMsg->getDouble(key)); > // break; > // case cms::Message::STRING_TYPE: > // //msg->setString(key, > mapMsg->getString(key)); > // strxxx=mapMsg->getString(key); > // cc=1; > // break; > // default: > // break; > // } > // } > > //} > > return; > } > > std::auto_ptr<TextMessage> > responsemessage(session->createTextMessage(strReply)); > > responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); > > > string text = ""; > > if (textMessage != NULL) { > text = textMessage->getText(); > } > else { > text = "NOT A TEXTMESSAGE!"; > } > > int nProPerty = > textMessage->getIntProperty("Integer"); > printf("consumer Message #%d Received: > %s,nProPerty[%d]\n", count, > text.c_str(), nProPerty); > > > const cms::Destination* destSend = > textMessage->getCMSReplyTo(); > if (destSend) > { > this->producer->send(destSend, > responsemessage.get()); > > printf("consumer Message #%d send: %s\n", > count, strReply.c_str()); > } > > > } > catch (CMSException& e) { > printf("Consumer onMessage() CMS Exception > occurred. > Shutting down > client. \n"); > e.printStackTrace(); > } > > // Commit all messages. > if (this->sessionTransacted) { > session->commit(); > } > > // No matter what, tag the count down latch until done. > //doneLatch.countDown(); > } > > // If something bad happens you see it here as this class is also > been > // registered as an ExceptionListener with the connection. > virtual void onException(const CMSException& ex AMQCPP_UNUSED) { > printf("Consumer onException() CMS Exception occurred. > Shutting down > client. \n"); > //printf("CMS Exception occurred. Shutting down > client.\n"); > ex.printStackTrace(); > exit(1); > } > > private: > > void cleanup() { > if (connection != NULL) { > try { > connection->close(); > } > catch (cms::CMSException& ex) { > ex.printStackTrace(); > } > } > > // Destroy resources. > try { > delete destination; > destination = NULL; > delete consumer; > consumer = NULL; > delete session; > session = NULL; > delete connection; > connection = NULL; > } > catch (CMSException& e) { > e.printStackTrace(); > } > } > }; > > int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { > > //if(argc<2) > //{ > // printf("argc<2\r\n"); > // return 0; > //} > > activemq::library::ActiveMQCPP::initializeLibrary(); > { > std::cout << > "=====================================================\n"; > std::cout << "Starting the example:" << std::endl; > std::cout << > "-----------------------------------------------------\n"; > > > // Set the URI to point to the IP Address of your broker. > // add any optional params to the url to enable things > like > // tightMarshalling or tcp logging etc. See the CMS web > site for > // a full list of configuration options. > // > // http://activemq.apache.org/cms/ > // > // Wire Format Options: > // ========================= > // Use either stomp or openwire, the default ports are > different for each > // > // Examples: > // tcp://127.0.0.1:61616 default > to > openwire > // tcp://127.0.0.1:61616?wireFormat=openwire same as > above > // tcp://127.0.0.1:61613?wireFormat=stomp use stomp > instead > // > // SSL: > // ========================= > // To use SSL you need to specify the location of the > trusted Root CA or > the > // certificate for the broker you want to connect to. > Using > the Root CA > allows > // you to use failover with multiple servers all using > certificates signed > by > // the trusted root. If using client authentication you > also need to > specify > // the location of the client Certificate. > // > // System::setProperty( "decaf.net.ssl.keyStore", > "<path>/client.pem" > ); > // System::setProperty( > "decaf.net.ssl.keyStorePassword", "password" > ); > // System::setProperty( "decaf.net.ssl.trustStore", > "<path>/rootCA.pem" ); > // > // The you just specify the ssl transport in the URI, for > example: > // > // ssl://localhost:61617 > // > std::string brokerURI = > "tcp://127.0.0.1:61616?jms.watchTopicAdvisories=false"; > > > //============================================================ > // set to true to use topics instead of queues > // Note in the code above that this causes createTopic or > // createQueue to be used in both consumer an producer. > > //============================================================ > bool useTopics = false; > bool sessionTransacted = true; > int numMessages = 1; > bool useConsumer = true; > bool useProducer = true; > > //int nSet=atoi(argv[1]); > //if(1==nSet) > //{ > //#define USE_COMSUMER > > > //} > //else > //{ > //#define USE_PRODUCER > > // > //} > > > > long long startTime = System::currentTimeMillis(); > > #ifdef USE_PRODUCER > printf("?? USE_PRODUCER \r\n"); > > int numProducerMessages = 30; > int nThreadNumber = 10; > vector<HelloWorldProducer *> vHelloWorldProducer; > for (int i = 0; i < nThreadNumber; ++i) > { > HelloWorldProducer * producerTemp = new > HelloWorldProducer(brokerURI, > numProducerMessages, useTopics); > vHelloWorldProducer.push_back(producerTemp); > } > > #endif > > #ifdef USE_COMSUMER > printf("?? USE_COMSUMER \r\n"); > HelloWorldConsumer consumer(brokerURI, numMessages, > useTopics, > sessionTransacted); > // Start the consumer thread. > Thread consumerThread(&consumer); > consumerThread.start(); > > // Wait for the consumer to indicate that its ready to go. > consumer.waitUntilReady(); > > #endif > > > > > #ifdef USE_PRODUCER > // Start the producer thread. > > vector<Thread *> vThread; > for (int i = 0; i < nThreadNumber; ++i) > { > HelloWorldProducer & ProducerTemp = > *vHelloWorldProducer[i]; > Thread * threadTemp = new Thread(&ProducerTemp); > vThread.push_back(threadTemp); > threadTemp->start(); > ProducerTemp.waitUntilReady(); > > } > > for (size_t i = 0; i < vThread.size(); ++i) > { > Thread * threadTemp = vThread[i]; > //threadTemp->join(); > } > while (1) > { > Thread::sleep(10); > } > > //Thread producerThread1(&producer1); > //producerThread1.start(); > //producer1.waitUntilReady(); > > //Thread producerThread2(&producer2); > //producerThread2.start(); > //producer2.waitUntilReady(); > > //Thread producerThread3(&producer3); > //producerThread3.start(); > //producer3.waitUntilReady(); > #endif > > > > > #ifdef USE_PRODUCER > // Wait for the threads to complete. > //producerThread1.join(); > //producerThread2.join(); > //producerThread3.join(); > #endif > > #ifdef USE_COMSUMER > consumerThread.join(); > #endif > > long long endTime = System::currentTimeMillis(); > double totalTime = (double)(endTime - startTime) / 1000.0; > > #ifdef USE_PRODUCER > //producer1.close(); > //producer2.close(); > //producer3.close(); > > for (size_t i = 0; i < vHelloWorldProducer.size(); ++i) > { > HelloWorldProducer * ProducerTemp = > vHelloWorldProducer[i]; > ProducerTemp->close(); > > if (ProducerTemp) > { > delete ProducerTemp; > ProducerTemp = NULL; > } > } > > #endif > #ifdef USE_COMSUMER > consumer.close(); > #endif > > > > > std::cout << "Time to completion = " << totalTime << " > seconds." << > std::endl; > std::cout << > "-----------------------------------------------------\n"; > std::cout << "Finished with the example." << std::endl; > std::cout << > "=====================================================\n"; > > } > activemq::library::ActiveMQCPP::shutdownLibrary(); > > > return 0; > } > > When I run the above example producer and consumer following happens: > > 1. Producer is able to put the message on the queue. > 2. Consumer is able to retrieve the message from the queues. > 3. When consumer tries to send response back using replyTo desitnation, > the > send fails with error message listed above. > > On the broker and consumer/producer I have advisorySupport turned OFF. > When > I turn them on this work fine. > > What I would like to know: > 1. How can I make the error go away but still have advisorySupport off. > > Thanks a lot for your help. > > P.S-- Attaching my activemq.xml. > > > > -- > Sent from: > http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html >