I have attached the source code.

My environment is: 
- Windows XP Home SP2
- JDK1.5.0_08-b03
- for C++ MS VisualStudio 2005

Thanks,
Grzegorz
 
---- Timothy Bish <[EMAIL PROTECTED]> schrieb:
> 
> Without seeing source code its hard to say.  One thing to check is to ensure
> that you are letting the C++ consumer start and connect before the Java
> producer sends its messages.
> 
> 
> > -----Original Message-----
> > From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED]
> > Sent: Saturday, March 03, 2007 8:59 AM
> > To: users@activemq.apache.org
> > Cc: Timothy Bish
> > Subject: RE: Java Producer and C++ Consumer
> > Importance: High
> > 
> > Hi,
> > 
> > my C++ client is connected to tcp://localhost:61613.
> > Of course I have tried the Java Producer with the URL
> > tcp://localhost:61616 and the messages were created.
> > My problem is that the C++ Consumer did not received anything (but is
> > successfully connected to the queue TEST.FOO).
> > If I start a Java Consumer all is fine.
> > 
> > Grzegorz
> > 
> > ---- Timothy Bish <[EMAIL PROTECTED]> schrieb:
> > > The Java Producer should probably be using tcp://localhost:61616 as its
> > url,
> > > as that is the default for the openwire transport connector in the
> > broker.
> > > The C++ client would connect to tcp://localhost:61613 assuming that you
> > have
> > > the default stomp settings the same in your broker's configuration.
> > >
> > > Once you do that you should be all set.
> > >
> > > The openwire support in the C++ client is close, I'm working on some
> > issues
> > > with tight marshalling now, but loose marshalling seems to be working
> > just
> > > fine.  We are working on new integration tests and unit tests to
> > validate
> > > all the new functionality.  Its only a matter of time now.  I've got a
> > busy
> > > week next week so it might get delayed a bit until I get a chance to
> > work on
> > > it some more.
> > >
> > > > -----Original Message-----
> > > > From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED]
> > > > Sent: Saturday, March 03, 2007 6:53 AM
> > > > To: users@activemq.apache.org
> > > > Subject: Java Producer and C++ Consumer
> > > > Importance: High
> > > >
> > > > I have ActiveMQ 4.1.0 and ActiveMQ-CPP 1.1 installed.
> > > > The language specific examples (Java producer/Java consumers
> > respectively
> > > > C++ producer/C++ consumers) work fine.
> > > >
> > > > Is it now possible with the Stomp protocol to have ordinary JMS
> > producers
> > > > written in Java and queue consumers written in C++? I tried it, but
> > > > without success. I got always following exception trying to start Java
> > > > producer using this URL tcp://localhost:61613
> > > >
> > > > Caught: javax.jms.JMSException: Wire format negociation timeout: peer
> > did
> > > > not send his wire format.
> > > >
> > > > Will be possible to have the interconnection (Java and C++) with the
> > > > OpenWire protocol?
> > > > When do you expect to release the OpenWire C++ implementation?
> > > >
> > > > Thanks,
> > > > Grzegorz
> > >
> 
#include <activemq/core/ActiveMQConsumer.h>
#include <activemq/core/ActiveMQConnectionFactory.h>

#include <iostream>
#include <memory>
#include <string>

using namespace activemq;
using namespace activemq::core;
using namespace cms;
using namespace std;

int main(int argc, char** argv)
{

        string uri = "tcp://127.0.0.1:61613";
        string queueName = "TEST.FOO";
        if (argc > 1) {
                uri = argv[1];

                if (argc > 2) {
                        queueName = argv[2];
                }
        }

        auto_ptr<ConnectionFactory> factory(new ActiveMQConnectionFactory(uri));
        try {
                cout << "create connection" << endl;
                auto_ptr<Connection> conn(factory->createConnection());
                cout << "create session" << endl;
                auto_ptr<Session> 
sess(conn->createSession(Session::AUTO_ACKNOWLEDGE));
                cout << "get queue" << endl;
                auto_ptr<Destination> queue(sess->createQueue(queueName));

                cout << "create consumer" << endl;
                auto_ptr<MessageConsumer> 
consumer(sess->createConsumer(queue.get()));
        
                while (true) {
                        cout << "wait for message" << endl;
                        auto_ptr<Message> msg(consumer->receive(10000));
                        
                        if (msg.get()) {
                                cout << "got message" << endl;
                        }
                        else {
                                cout << "Timeout" << endl;
                        }
                }
        } catch (...) {
                cerr << "Exception catched!" << endl;
                return -1;
        }

        return 0;
}
import java.util.Arrays;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Producer {

	private Destination destination;
	private int messageCount = 10;
	private long sleepTime = 0L;
	private boolean verbose = true;
	private int messageSize = 255;
	private long timeToLive = 0L;
	private String user = null;
	private String password = null;
	private static String url = "tcp://127.0.0.1:61616";
	private String subject = "TEST.FOO";
	private boolean topic = false;
	private boolean transacted = false;
	private boolean persistent = false;

	public static void main(String[] args) {
		
		if (args.length > 0) {
			url = args[0];	
		}
		
		Producer producerTool = new Producer();
		producerTool.run();
	}

	public void run() {
		Connection connection=null;
		try {
			System.out.println("Connecting to URL: " + url);
			System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue") + ": " + subject);
			System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
			System.out.println("Sleeping between publish " + sleepTime + " ms");
			if (timeToLive != 0) {
				System.out.println("Messages time to live " + timeToLive + " ms");
			}
			
			// Create the connection.
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);			
			connection = connectionFactory.createConnection();
			
			
			// Create the session
			Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
			if (topic) {
				destination = session.createTopic(subject);
			} else {
				destination = session.createQueue(subject);
			}
			
			// Create the producer.
			MessageProducer producer = session.createProducer(destination);
			if (persistent) {
				producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			} else {
				producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			}			
			if (timeToLive != 0)
				producer.setTimeToLive(timeToLive);
			
			connection.start();
						
			// Start sending messages
			sendLoop(session, producer);

			System.out.println("Done.");
			
						
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		} finally {
			try { 
				connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

	protected void sendLoop(Session session, MessageProducer producer)
			throws Exception {

		for (int i = 0; i < messageCount || messageCount == 0; i++) {

			TextMessage message = session
					.createTextMessage(createMessageText(i));

			if (verbose) {
				String msg = message.getText();
				if (msg.length() > 50) {
					msg = msg.substring(0, 50) + "...";
				}
				System.out.println("Sending message: " + msg);
			}

			producer.send(message);
			if (transacted) {
				session.commit();
			}

			Thread.sleep(sleepTime);

		}

	}

	private String createMessageText(int index) {
		StringBuffer buffer = new StringBuffer(messageSize);
		buffer.append("Message: " + index + " sent at: " + new Date());
		if (buffer.length() > messageSize) {
			return buffer.substring(0, messageSize);
		}
		for (int i = buffer.length(); i < messageSize; i++) {
			buffer.append(' ');
		}
		return buffer.toString();
	}


	public void setPersistent(boolean durable) {
		this.persistent = durable;
	}
	public void setMessageCount(int messageCount) {
		this.messageCount = messageCount;
	}
	public void setMessageSize(int messageSize) {
		this.messageSize = messageSize;
	}
	public void setPassword(String pwd) {
		this.password = pwd;
	}
	public void setSleepTime(long sleepTime) {
		this.sleepTime = sleepTime;
	}
	public void setSubject(String subject) {
		this.subject = subject;
	}
	public void setTimeToLive(long timeToLive) {
		this.timeToLive = timeToLive;
	}
	public void setTopic(boolean topic) {
		this.topic = topic;
	}
	public void setQueue(boolean queue) {
		this.topic = !queue;
	}	
	public void setTransacted(boolean transacted) {
		this.transacted = transacted;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public void setUser(String user) {
		this.user = user;
	}
	public void setVerbose(boolean verbose) {
		this.verbose = verbose;
	}
}

Reply via email to