Here is my version of the code which runs to completion without segfaults.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <decaf/lang/Integer.h> #include <decaf/util/Date.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <iostream> #include <memory> using namespace activemq; using namespace activemq::core; using namespace decaf; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; //////////////////////////////////////////////////////////////////////////////// class SimpleProducer : public Runnable { private: Connection* connection; Session* session; Destination* destination1; Destination* destination2; MessageProducer* producer1; MessageProducer* producer2; bool useTopic; bool clientAck; unsigned int numMessages; std::string brokerURI; std::string destURI1; std::string destURI2; unsigned int connectRetries; public: SimpleProducer( const std::string& brokerURI, unsigned int numMessages, const std::string& destURI1, const std::string& destURI2, bool useTopic = false, bool clientAck = false, unsigned int connectRetries = 0 ){ connection = NULL; session = NULL; destination1 = NULL; destination2 = NULL; producer1 = NULL; producer2 = NULL; this->numMessages = numMessages; this->useTopic = useTopic; this->brokerURI = brokerURI; this->destURI1 = destURI1; this->destURI2 = destURI2; this->clientAck = clientAck; this->connectRetries = 0; } virtual ~SimpleProducer(){ cleanup(); } void setConnectRetries( unsigned int retries ) { this->connectRetries = retries; } unsigned int getConnectRetries() const { return this->connectRetries; } virtual void run() { try { // Create a ConnectionFactory auto_ptr<ActiveMQConnectionFactory> connectionFactory( new ActiveMQConnectionFactory( brokerURI ) ); unsigned int retries = this->connectRetries; do{ // Create a Connection try{ connection = connectionFactory->createConnection(); connection->start(); } catch( CMSException& e ) { e.printStackTrace(); if( retries == 0 ) { return; } } } while( retries-- != 0 ); // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination1 = session->createTopic( destURI1 ); destination2 = session->createTopic( destURI2 ); } else { destination1 = session->createQueue( destURI1 ); destination2 = session->createQueue( destURI2 ); } // Create a MessageProducer from the Session to the Topic or Queue producer1 = session->createProducer( destination1 ); producer2 = session->createProducer( destination2 ); producer1->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); producer2->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); // Create the Thread Id String string threadIdStr = Integer::toString( Thread::getId() ); // Create a messages string text = (string)"Hello world! from thread " + threadIdStr; for( unsigned int ix=0; ix<numMessages; ++ix ){ TextMessage* message = session->createTextMessage( text ); message->setIntProperty( "Integer", ix ); // Tell the producer to send the message printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() ); producer1->send( message ); producer2->send( message ); delete message; } }catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ // Destroy resources. try{ if( destination1 != NULL ) delete destination1; if( destination2 != NULL ) delete destination2; }catch ( CMSException& e ) { e.printStackTrace(); } destination1 = NULL; destination2 = NULL; try{ if( producer1 != NULL ) delete producer1; if( producer2 != NULL ) delete producer2; }catch ( CMSException& e ) { e.printStackTrace(); } producer1 = NULL; producer2 = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch ( CMSException& e ) { e.printStackTrace(); } try{ if( session != NULL ) delete session; }catch ( CMSException& e ) { e.printStackTrace(); } session = NULL; try{ if( connection != NULL ) delete connection; }catch ( CMSException& e ) { e.printStackTrace(); } connection = NULL; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { std::cout << "===================================================== \n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // // Wire Format Options: // ===================== // Use either stomp or openwire, the default ports are different for each // // Examples: // tcp://127.0.0.1:61616 default to openwire // tcp://127.0.0.1:61616?wireFormat=openwire same as above // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead // std::string brokerURI = "tcp://127.0.0.1:61616" "?wireFormat=openwire" // "&connection.useAsyncSend=true" // "&transport.commandTracingEnabled=true" // "&transport.tcpTracingEnabled=true" // "&wireFormat.tightEncodingEnabled=true" ; //============================================================ // Total number of messages for this producer to send. //============================================================ unsigned int numMessages = 2000; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the Producer produces, to have the producer // use a topic or queue set the 'useTopics' flag. //============================================================ std::string destURI1 = "TEST.FOO"; std::string destURI2 = "TEST.BAR"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the producer. //============================================================ bool useTopics = false; // Pass an integer value to the producer for retry unsigned int connectRetries = 0; if( argc > 1 ) { try { connectRetries = decaf::lang::Integer::parseInt( argv[1] ); } catch( decaf::lang::exceptions::NumberFormatException& ex ) { connectRetries = 0; } } // Create the producer and run it. SimpleProducer producer( brokerURI, numMessages, destURI1, destURI2, useTopics ); producer.setConnectRetries( connectRetries ); producer.run(); std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "===================================================== \n"; } On Tue, 2009-03-17 at 08:24 -0700, manua wrote: > Hi Tim, > > I have pasted the main content of my file here. > > Actually, I wanted to have a single session and multiple message producers > in that session.Each producer will be sending a different message o its > respective topic/queue. > > In the single program, if I create different connection and different > sessions, then the program is working fine, but giving issues with multiple > message producers in the single session. > > > class SimpleProducer : public Runnable { > private: > > Connection* connection; > Session* session; > Destination* destination; > Destination* destination2; > MessageProducer* producer; > MessageProducer* producer2; > bool useTopic; > bool clientAck; > unsigned int numMessages; > std::string brokerURI; > std::string destURI; > std::string destURI2; > unsigned int connectRetries; > > public: > > SimpleProducer( const std::string& brokerURI, > unsigned int numMessages, > const std::string& destURI, > bool useTopic = false, > bool clientAck = false, > unsigned int connectRetries = 0 ){ > connection = NULL; > session = NULL; > destination = NULL; > producer = NULL; > this->numMessages = numMessages; > this->useTopic = useTopic; > this->brokerURI = brokerURI; > this->destURI = destURI; > this->clientAck = clientAck; > this->connectRetries = 0; > } > > virtual ~SimpleProducer(){ > cleanup(); > } > > void setConnectRetries( unsigned int retries ) { > this->connectRetries = retries; > } > unsigned int getConnectRetries() const { > return this->connectRetries; > } > > virtual void run() { > try { > // Create a ConnectionFactory > auto_ptr<ActiveMQConnectionFactory> connectionFactory( > new ActiveMQConnectionFactory( brokerURI ) ); > > unsigned int retries = this->connectRetries; > do{ > // Create a Connection > try{ > connection = connectionFactory->createConnection(); > connection->start(); > } catch( CMSException& e ) { > e.printStackTrace(); > > if( retries == 0 ) { > return; > } > } > } while( retries-- != 0 ); > > // Create a Session > if( clientAck ) { > session = connection->createSession( > Session::CLIENT_ACKNOWLEDGE ); > } else { > session = connection->createSession( > Session::AUTO_ACKNOWLEDGE ); > } > > // Create the destination (Topic or Queue) > if( useTopic ) { > destination = session->createTopic( destURI ); > destination2 = session->createTopic( destURI2 ); > } else { > destination = session->createQueue( destURI ); > destination2 = session->createQueue( destURI2 ); > } > > // Create a MessageProducer from the Session to the Topic or > Queue > producer = session->createProducer( destination ); > producer2 = session->createProducer( destination2 ); > > producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); > > producer2->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); > > // Create the Thread Id String > string threadIdStr = Integer::toString( Thread::getId() ); > > // Create a messages > string text = (string)"Hello world! from thread " + threadIdStr; > > string text2 = (string)"Message for second producer "; > > for( unsigned int ix=0; ix<numMessages; ++ix ){ > TextMessage* message = session->createTextMessage( text ); > > TextMessage* message2 = session->createTextMessage( text2 ); > > message->setIntProperty( "Integer", ix ); > > // Tell the producer to send the message > printf( "Sent message #%d from thread %s\n", ix+1, > threadIdStr.c_str() ); > producer->send( message ); > producer2->send( message2 ); > > delete message; > delete message2; > } > > }catch ( CMSException& e ) { > e.printStackTrace(); > } > } > > private: > > void cleanup(){ > > // Destroy resources. > try{ > if( destination != NULL ) delete destination; > delete destination2; > }catch ( CMSException& e ) { e.printStackTrace(); } > destination = NULL; > destination2 = NULL; > > try{ > if( producer != NULL ) delete producer; > delete producer2; > }catch ( CMSException& e ) { e.printStackTrace(); } > producer = NULL; producer2 = NULL; > > // Close open resources. > try{ > if( session != NULL ) session->close(); > if( connection != NULL ) connection->close(); > }catch ( CMSException& e ) { e.printStackTrace(); } > > try{ > if( session != NULL ) delete session; > }catch ( CMSException& e ) { e.printStackTrace(); } > session = NULL; > > try{ > if( connection != NULL ) delete connection; > }catch ( CMSException& e ) { e.printStackTrace(); } > connection = NULL; > } > }; > > //////////////////////////////////////////////////////////////////////////////// > int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { > > std::cout << "=====================================================\n"; > std::cout << "Starting the example:" << std::endl; > std::cout << "-----------------------------------------------------\n"; > > // Set the URI to point to the IPAddress of your broker. > // add any optional params to the url to enable things like > // tightMarshalling or tcp logging etc. See the CMS web site for > // a full list of configuration options. > // > // http://activemq.apache.org/cms/ > // > // Wire Format Options: > // ===================== > // Use either stomp or openwire, the default ports are different for > each > // > // Examples: > // tcp://127.0.0.1:61616 default to openwire > // tcp://127.0.0.1:61616?wireFormat=openwire same as above > // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead > // > std::string brokerURI = > "tcp://localhost:61616" > "?wireFormat=openwire" > // "&connection.useAsyncSend=true" > // "&transport.commandTracingEnabled=true" > // "&transport.tcpTracingEnabled=true" > // "&wireFormat.tightEncodingEnabled=true" > ; > > //============================================================ > // Total number of messages for this producer to send. > //============================================================ > unsigned int numMessages = 20; > > //============================================================ > // This is the Destination Name and URI options. Use this to > // customize where the Producer produces, to have the producer > // use a topic or queue set the 'useTopics' flag. > //============================================================ > std::string destURI = "TEST.FOO"; > > std::string destURI2 = "TEST1.FOO"; > //============================================================ > // set to true to use topics instead of queues > // Note in the code above that this causes createTopic or > // createQueue to be used in the producer. > //============================================================ > bool useTopics = true; > > // Pass an integer value to the producer for retry > unsigned int connectRetries = 0; > > if( argc > 1 ) { > try { > connectRetries = decaf::lang::Integer::parseInt( argv[1] ); > } catch( decaf::lang::exceptions::NumberFormatException& ex ) { > connectRetries = 0; > } > } > > // Create the producer and run it. > SimpleProducer producer( brokerURI, numMessages, destURI, useTopics ); > producer.setConnectRetries( connectRetries ); > producer.run(); > > > SimpleProducer producer2( brokerURI, numMessages, destURI2, > useTopics ); producer2.setConnectRetries( connectRetries ); > producer2.run(); > > std::cout << "-----------------------------------------------------\n"; > std::cout << "Finished with the example." << std::endl; > std::cout << "=====================================================\n"; > } > -- Tim Bish http://fusesource.com http://timbish.blogspot.com/