this is my java application:
...
// Create connection. Create session from connection; false means
// session is not transacted. Create requestor and text message.
Send
// messages, wait for answer and finally close session and
connection.
try
{
queueConnection =
queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
// obviously this is needed ??? why doesnt it work to
lookup the activemq
queue via jndi ?
stationInfoQueue =
queueSession.createQueue("stationInfoQueue");
textMessage =
queueSession.createTextMessage(myXMLRequestAsString);
// javax.jms.QueueRequestor creates a TemporaryQueue
for the responses
and provides a request method that sends the request message
// and waits for its reply.This is a basic
request/reply abstraction that
should be sufficient for most uses.
// JMS providers and clients are free to create more
sophisticated
versions.
queueRequestor = new QueueRequestor(queueSession,
stationInfoQueue);
//sends the message and waits until respond is received
TextMessage answer = (TextMessage)
queueRequestor.request(textMessage);
System.out.println("CLIENT: Response message received:
");
System.out.println(answer.getText());
}
catch (JMSException e)
{
System.out.println("JMSExceptionn occurred:" + e);
}
...
and this is my c++ application (it is a modified version of the apache
example SimpleAsyncConsumer.cpp):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic;
bool clientAck;
std::string brokerURI;
std::string destURI;
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) {
connection = NULL;
session = NULL;
destination = NULL;
consumer = NULL;
this->useTopic = useTopic;
this->brokerURI = brokerURI;
this->destURI = destURI;
this->clientAck = clientAck;
}
virtual ~SimpleAsyncConsumer(){
cleanup();
}
void runConsumer() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
connection->start();
connection->setExceptionListener(this);
// Create a Session
if( clientAck ) {
session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageConsumer from the Session to the Topic or
Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered
MessageListener.
virtual void onMessage( const Message* message ){
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL )
{
text = textMessage->getText();
}
else
{
text = "NOT A TEXTMESSAGE!";
}
if( clientAck )
{
message->acknowledge();
}
printf( "Message #%d Received: %s\n", count, text.c_str() );
//sendReply(textMessage);
}
catch (CMSException& e)
{
e.printStackTrace();
}
sendReply(message);
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
void sendReply(const Message* msg)
{
printf( "started sendrReply()");
try
{
// Create a ConnectionFactory
auto_ptr<ActiveMQConnectionFactory> connectionFactory(
new ActiveMQConnectionFactory( brokerURI ) );
try
{
connection = connectionFactory->createConnection();
connection->start();
}
catch( CMSException& e )
{
e.printStackTrace();
}
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
// Create the destination (Topic or Queue)
destination = const_cast<Destination*>(msg->getCMSReplyTo());
// Create a MessageProducer from the Session to the Topic or
Queue
MessageProducer* producer = session->createProducer( destination
);
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
// Create a messages
string text = (string)"Hello World back";
TextMessage* message = session->createTextMessage( text );
// Tell the producer to send the message
producer->send( message );
delete message;
}
catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
//*************************************************
// Always close destination, consumers and producers before
// you destroy their sessions and connection.
//*************************************************
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) { e.printStackTrace(); }
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) { e.printStackTrace(); }
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) { e.printStackTrace(); }
// Now Destroy them
try{
if( session != NULL ) delete session;
}catch (CMSException& e) { e.printStackTrace(); }
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) { e.printStackTrace(); }
connection = NULL;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// http://activemq.apache.org/cms/
//
// Wire Format Options:
// =====================
// Use either stomp or openwire, the default ports are different for
each
//
// Examples:
// tcp://127.0.0.1:61616 default to openwire
// tcp://127.0.0.1:61616?wireFormat=openwire same as above
// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead
//
std::string brokerURI =
"failover:(tcp://127.0.0.1:61616"
// "?wireFormat=openwire"
// "&connection.useAsyncSend=true"
// "&transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true"
// "&wireFormat.tightEncodingEnabled=true"
")";
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the consumer listens, to have the consumer
// use a topic or queue set the 'useTopics' flag.
//============================================================
//std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
std::string destURI = "stationInfoQueue"; //?consumer.prefetchSize=1";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the consumer.
//============================================================
bool useTopics = false;
//============================================================
// set to true if you want the consumer to use client ack mode
// instead of the default auto ack mode.
//============================================================
bool clientAck = false;
// Create the consumer
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck
);
// Start it up and it will listen forever.
consumer.runConsumer();
// Wait to exit.
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
...what I also dont understand is why "sendReply(message);" is called AFTER
I pressed a key ?
... and then I get this error on the console:
-----------------------------------------------------
Finished with the example.
=====================================================
simple_async_consumer(1639,0xa0738720) malloc: *** error for object
0x106a98: Non-aligned pointer being freed
*** set a breakpoint in malloc_error_break to debug
--
View this message in context:
http://www.nabble.com/ActiveMQ-c%2B%2B-and-Java-tp25190274p25204452.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.