The default prefetch value(1000) for queue consumers may not help you to observe the expected behavior in your example.
Try 200 for fast consumers while 10(or smaller) for slow consumers to see if it works. At 2013-02-15 14:01:14,SuoNayi <[email protected]> wrote: >yes, you're right. you can assign larger prefetch for fast consumers so that >they can buffer more messages to process than slow consumers. > > >At 2013-02-15 12:01:01,"AMARNATH, Balachandar" ><[email protected]> wrote: >>Thanks for the reply, >> >>My intention is not transferring files at the moment. I was just checking how >>the consumers pull the messages from the queue. When I ran the code, it looks >>like, the messages are equally shared between the consumers that make the >>application need to wait for the slow consumers to finish even though fast >>consumers are free. Looks like the messages are buffered in consumers, and >>hence no messages are available for fast consumers when they finished >>processing. Not sure though ! >> >>-Bala >> >>-----Original Message----- >>From: SuoNayi [mailto:[email protected]] >>Sent: 14 February 2013 17:30 >>To: [email protected] >>Subject: Re:New bie question - Running producer and two instance of consumer >> >>Hi,AMQ is not designed to transfer files, especially large files although it >>has provided two approaches(BlobMessage or Jms Stream). >>In order to provide better performance, AMQ broker dispatches batches of >>messages to consumers instead of pulling messages every time. >> >>So tune prefetch of consumers will help your fast consumers to get more >>chances to pull messages from brokers. >>Take a look at: >>http://activemq.apache.org/what-is-the-prefetch-limit-for.html >> >> >> >> >> >> >>At 2013-02-14 19:41:49,"AMARNATH, Balachandar" >><[email protected]> wrote: >>>Hi, >>> >>>I am new to activeMQ (for any other messaging system). I want to use this >>>messaging system for distributed processing of large number of files. I wish >>>to split my application in to two pieces namely producer and consumer, so >>>that one instance of producer generates messages that several instance of >>>similar consumers can process it. The consumers are supposed to be >>>distributed machines. I wrote a simple producer and two consumers (as >>>attached). The producer generates 100 messages (simple text messages) and >>>consumers take 'different' processing times to process every message. Hence, >>>I expect, the consumer that take less time to finish processing would >>>process more messages while the other one process less number of messages. >>>However, when I ran them, I saw the number of messages is evenly splitted >>>and processed. Can someone give me a hint here, how this can be achieved? >>>And where I am going wrong? >>> >>>Here are my classes:- >>> >>>public class HelloWorldConsumer1 { >>> public static void main(String[] args ){ >>> try { >>> String url = ActiveMQConnection.DEFAULT_BROKER_URL; >>> ActiveMQConnectionFactory connectionFactory = new >>> ActiveMQConnectionFactory(url); >>> // Create a Connection >>> Connection connection = connectionFactory.createConnection(); >>> connection.start(); >>> // Create a Session >>> Session session = connection.createSession(false, >>> Session.AUTO_ACKNOWLEDGE); >>> // Create the destination (Topic or Queue) >>> Destination destination = session.createQueue("TEST.FOO"); >>> // Create a MessageConsumer from the Session to the Topic or >>> Queue >>> MessageConsumer consumer = >>> session.createConsumer(destination); >>> ArrayList<String> contents = new ArrayList<String>(); >>> boolean flag = true; >>> while (flag){ >>> Message message = consumer.receive(1000); >>> if (message instanceof TextMessage) { >>> TextMessage textMessage = (TextMessage) message; >>> String text = textMessage.getText(); >>> System.out.println("Received : " + text + " and >>> processing"); >>> Thread.sleep(5000); >>> if (text.equalsIgnoreCase("END")){ >>> System.out.println("The recvd text is "+text); >>> flag = false; >>> } >>> contents.add(text); >>> } else { >>> System.out.println("Received : "+message); >>> } >>> } >>> System.out.println("Exiting.."); >>> System.out.println("The contents collected in consumer.java >>> is "+contents.size()); >>> connection.close(); >>> } catch (Exception e) { >>> System.out.println("Caught: " + e); >>> e.printStackTrace(); >>> } >>> } >>>} >>> >>> >>> >>>public class HelloWorldProducer { >>> public static void main(String[] args) throws Exception { >>> String url = ActiveMQConnection.DEFAULT_BROKER_URL; >>> ActiveMQConnectionFactory connectionFactory = new >>> ActiveMQConnectionFactory(url); >>> // Create a Connection >>> Connection connection = connectionFactory.createConnection(); >>> connection.start(); >>> // Create a Session >>> Session session = >>> connection.createSession(false,Session.AUTO_ACKNOWLEDGE); >>> // Create the destination (Topic or Queue) >>> Destination destination = session.createQueue("TEST.FOO"); >>> // Create a MessageProducer from the Session to the Topic or >>> Queue >>> MessageProducer producer = >>> session.createProducer(destination); >>> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); >>> int counter = 0; >>> boolean flag = true; >>> while (counter < 50){ >>> counter++; >>> String text = "Message "+counter;// + >>> Thread.currentThread().getName() + " : " + this.hashCode(); >>> TextMessage message = session.createTextMessage(text); >>> // Tell the producer to send the message >>> producer.send(message); >>> System.out.println("Sent message '" + message.getText() + >>> "'"); >>> Thread.sleep(1000); >>> } >>> Thread.sleep(2000); >>> producer.send(session.createTextMessage("END")); >>> Thread.sleep(4000); >>> producer.send(session.createTextMessage("END")); >>> // Clean up >>> connection.close(); >>> } >>> } >>> >>> >>> >>> >>> >>>With thanks and regards >>>Balachandar >>> >>> >>> >>> >>>The information in this e-mail is confidential. The contents may not be >>>disclosed or used by anyone other than the addressee. Access to this e-mail >>>by anyone else is unauthorised. >>>If you are not the intended recipient, please notify Airbus immediately and >>>delete this e-mail. >>>Airbus cannot accept any responsibility for the accuracy or completeness of >>>this e-mail as it has been sent over public networks. If you have any >>>concerns over the content of this message or its Accuracy or Integrity, >>>please contact Airbus immediately. >>>All outgoing e-mails from Airbus are checked using regularly updated virus >>>scanning software but you should take whatever measures you deem to be >>>appropriate to ensure that this message and any attachments are virus free. >>> >> >>This mail has originated outside your organization, either from an external >>partner or the Global Internet. >>Keep this in mind if you answer this message. >> >> >> >>The information in this e-mail is confidential. The contents may not be >>disclosed or used by anyone other than the addressee. Access to this e-mail >>by anyone else is unauthorised. >>If you are not the intended recipient, please notify Airbus immediately and >>delete this e-mail. >>Airbus cannot accept any responsibility for the accuracy or completeness of >>this e-mail as it has been sent over public networks. If you have any >>concerns over the content of this message or its Accuracy or Integrity, >>please contact Airbus immediately. >>All outgoing e-mails from Airbus are checked using regularly updated virus >>scanning software but you should take whatever measures you deem to be >>appropriate to ensure that this message and any attachments are virus free. >>
