Hi,

I am new to activeMQ (for any other messaging system). I want to use this 
messaging system for distributed processing of large number of files. I wish to 
split my application in to two pieces namely producer and consumer, so that one 
instance of producer generates messages that several instance of similar 
consumers can process it. The consumers are supposed to be distributed 
machines. I wrote a simple producer and two consumers (as attached). The 
producer generates 100 messages (simple text messages) and consumers take 
'different' processing times to process every message. Hence, I expect, the 
consumer that take less time to finish processing would process more messages 
while the other one process less number of messages. However, when I ran them, 
I saw the number of messages is evenly splitted and processed. Can someone give 
me a hint here, how this can be achieved? And where I am going wrong?

Here are my classes:-

public class HelloWorldConsumer1 {
             public static void main(String[] args ){
            try {
            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
                ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(url);
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageConsumer from the Session to the Topic or 
Queue
                MessageConsumer consumer = session.createConsumer(destination);
                ArrayList<String> contents = new ArrayList<String>();
                boolean flag = true;
                while (flag){
                        Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received : " + text + " and 
processing");
                    Thread.sleep(5000);
                    if (text.equalsIgnoreCase("END")){
                        System.out.println("The recvd text is "+text);
                        flag = false;
                    }
                    contents.add(text);
                } else {
                        System.out.println("Received : "+message);
                }
                }
                System.out.println("Exiting..");
                System.out.println("The contents collected in consumer.java is 
"+contents.size());
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
}



public class HelloWorldProducer {
        public static void main(String[] args) throws Exception {
                    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
                ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(url);
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = 
connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageProducer from the Session to the Topic or 
Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                int counter = 0;
                boolean flag = true;
                while (counter < 50){
                counter++;
                String text = "Message "+counter;// + 
Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
                // Tell the producer to send the message
                producer.send(message);
                System.out.println("Sent message '" + message.getText() + "'");
                Thread.sleep(1000);
                }
                Thread.sleep(2000);
                producer.send(session.createTextMessage("END"));
                Thread.sleep(4000);
                producer.send(session.createTextMessage("END"));
                // Clean up
                connection.close();
        }
    }





With thanks and regards
Balachandar




The information in this e-mail is confidential. The contents may not be 
disclosed or used by anyone other than the addressee. Access to this e-mail by 
anyone else is unauthorised.
If you are not the intended recipient, please notify Airbus immediately and 
delete this e-mail.
Airbus cannot accept any responsibility for the accuracy or completeness of 
this e-mail as it has been sent over public networks. If you have any concerns 
over the content of this message or its Accuracy or Integrity, please contact 
Airbus immediately.
All outgoing e-mails from Airbus are checked using regularly updated virus 
scanning software but you should take whatever measures you deem to be 
appropriate to ensure that this message and any attachments are virus free.

Reply via email to