yes, you're right. you can assign larger prefetch for fast consumers so that they can buffer more messages to process than slow consumers.
At 2013-02-15 12:01:01,"AMARNATH, Balachandar" <[email protected]> wrote: >Thanks for the reply, > >My intention is not transferring files at the moment. I was just checking how >the consumers pull the messages from the queue. When I ran the code, it looks >like, the messages are equally shared between the consumers that make the >application need to wait for the slow consumers to finish even though fast >consumers are free. Looks like the messages are buffered in consumers, and >hence no messages are available for fast consumers when they finished >processing. Not sure though ! > >-Bala > >-----Original Message----- >From: SuoNayi [mailto:[email protected]] >Sent: 14 February 2013 17:30 >To: [email protected] >Subject: Re:New bie question - Running producer and two instance of consumer > >Hi,AMQ is not designed to transfer files, especially large files although it >has provided two approaches(BlobMessage or Jms Stream). >In order to provide better performance, AMQ broker dispatches batches of >messages to consumers instead of pulling messages every time. > >So tune prefetch of consumers will help your fast consumers to get more >chances to pull messages from brokers. >Take a look at: >http://activemq.apache.org/what-is-the-prefetch-limit-for.html > > > > > > >At 2013-02-14 19:41:49,"AMARNATH, Balachandar" ><[email protected]> wrote: >>Hi, >> >>I am new to activeMQ (for any other messaging system). I want to use this >>messaging system for distributed processing of large number of files. I wish >>to split my application in to two pieces namely producer and consumer, so >>that one instance of producer generates messages that several instance of >>similar consumers can process it. The consumers are supposed to be >>distributed machines. I wrote a simple producer and two consumers (as >>attached). The producer generates 100 messages (simple text messages) and >>consumers take 'different' processing times to process every message. Hence, >>I expect, the consumer that take less time to finish processing would process >>more messages while the other one process less number of messages. However, >>when I ran them, I saw the number of messages is evenly splitted and >>processed. Can someone give me a hint here, how this can be achieved? And >>where I am going wrong? >> >>Here are my classes:- >> >>public class HelloWorldConsumer1 { >> public static void main(String[] args ){ >> try { >> String url = ActiveMQConnection.DEFAULT_BROKER_URL; >> ActiveMQConnectionFactory connectionFactory = new >> ActiveMQConnectionFactory(url); >> // Create a Connection >> Connection connection = connectionFactory.createConnection(); >> connection.start(); >> // Create a Session >> Session session = connection.createSession(false, >> Session.AUTO_ACKNOWLEDGE); >> // Create the destination (Topic or Queue) >> Destination destination = session.createQueue("TEST.FOO"); >> // Create a MessageConsumer from the Session to the Topic or >> Queue >> MessageConsumer consumer = >> session.createConsumer(destination); >> ArrayList<String> contents = new ArrayList<String>(); >> boolean flag = true; >> while (flag){ >> Message message = consumer.receive(1000); >> if (message instanceof TextMessage) { >> TextMessage textMessage = (TextMessage) message; >> String text = textMessage.getText(); >> System.out.println("Received : " + text + " and >> processing"); >> Thread.sleep(5000); >> if (text.equalsIgnoreCase("END")){ >> System.out.println("The recvd text is "+text); >> flag = false; >> } >> contents.add(text); >> } else { >> System.out.println("Received : "+message); >> } >> } >> System.out.println("Exiting.."); >> System.out.println("The contents collected in consumer.java >> is "+contents.size()); >> connection.close(); >> } catch (Exception e) { >> System.out.println("Caught: " + e); >> e.printStackTrace(); >> } >> } >>} >> >> >> >>public class HelloWorldProducer { >> public static void main(String[] args) throws Exception { >> String url = ActiveMQConnection.DEFAULT_BROKER_URL; >> ActiveMQConnectionFactory connectionFactory = new >> ActiveMQConnectionFactory(url); >> // Create a Connection >> Connection connection = connectionFactory.createConnection(); >> connection.start(); >> // Create a Session >> Session session = >> connection.createSession(false,Session.AUTO_ACKNOWLEDGE); >> // Create the destination (Topic or Queue) >> Destination destination = session.createQueue("TEST.FOO"); >> // Create a MessageProducer from the Session to the Topic or >> Queue >> MessageProducer producer = >> session.createProducer(destination); >> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); >> int counter = 0; >> boolean flag = true; >> while (counter < 50){ >> counter++; >> String text = "Message "+counter;// + >> Thread.currentThread().getName() + " : " + this.hashCode(); >> TextMessage message = session.createTextMessage(text); >> // Tell the producer to send the message >> producer.send(message); >> System.out.println("Sent message '" + message.getText() + >> "'"); >> Thread.sleep(1000); >> } >> Thread.sleep(2000); >> producer.send(session.createTextMessage("END")); >> Thread.sleep(4000); >> producer.send(session.createTextMessage("END")); >> // Clean up >> connection.close(); >> } >> } >> >> >> >> >> >>With thanks and regards >>Balachandar >> >> >> >> >>The information in this e-mail is confidential. The contents may not be >>disclosed or used by anyone other than the addressee. Access to this e-mail >>by anyone else is unauthorised. >>If you are not the intended recipient, please notify Airbus immediately and >>delete this e-mail. >>Airbus cannot accept any responsibility for the accuracy or completeness of >>this e-mail as it has been sent over public networks. If you have any >>concerns over the content of this message or its Accuracy or Integrity, >>please contact Airbus immediately. >>All outgoing e-mails from Airbus are checked using regularly updated virus >>scanning software but you should take whatever measures you deem to be >>appropriate to ensure that this message and any attachments are virus free. >> > >This mail has originated outside your organization, either from an external >partner or the Global Internet. >Keep this in mind if you answer this message. > > > >The information in this e-mail is confidential. The contents may not be >disclosed or used by anyone other than the addressee. Access to this e-mail by >anyone else is unauthorised. >If you are not the intended recipient, please notify Airbus immediately and >delete this e-mail. >Airbus cannot accept any responsibility for the accuracy or completeness of >this e-mail as it has been sent over public networks. If you have any concerns >over the content of this message or its Accuracy or Integrity, please contact >Airbus immediately. >All outgoing e-mails from Airbus are checked using regularly updated virus >scanning software but you should take whatever measures you deem to be >appropriate to ensure that this message and any attachments are virus free. >
