yes, you're right. you can assign larger prefetch for fast consumers so that 
they can buffer more messages to process than slow consumers.


At 2013-02-15 12:01:01,"AMARNATH, Balachandar" 
<[email protected]> wrote:
>Thanks for the reply,
>
>My intention is not transferring files at the moment. I was just checking how 
>the consumers pull the messages from the queue. When I ran the code, it looks 
>like, the messages are equally shared between the consumers that make the 
>application need to wait for the slow consumers to finish even though fast 
>consumers are free. Looks like the messages are buffered in consumers, and 
>hence no messages are available for fast consumers when they finished 
>processing. Not sure though !
>
>-Bala
>
>-----Original Message-----
>From: SuoNayi [mailto:[email protected]] 
>Sent: 14 February 2013 17:30
>To: [email protected]
>Subject: Re:New bie question - Running producer and two instance of consumer
>
>Hi,AMQ is not designed to transfer files, especially large files although it 
>has provided two approaches(BlobMessage or Jms Stream).
>In order to provide better performance, AMQ broker dispatches batches of 
>messages to consumers instead of pulling messages every time.
>
>So tune prefetch of consumers will help your fast consumers to get more 
>chances to pull messages from brokers.
>Take a look at:
>http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>
>
>
>
>
>
>At 2013-02-14 19:41:49,"AMARNATH, Balachandar" 
><[email protected]> wrote:
>>Hi,
>>
>>I am new to activeMQ (for any other messaging system). I want to use this 
>>messaging system for distributed processing of large number of files. I wish 
>>to split my application in to two pieces namely producer and consumer, so 
>>that one instance of producer generates messages that several instance of 
>>similar consumers can process it. The consumers are supposed to be 
>>distributed machines. I wrote a simple producer and two consumers (as 
>>attached). The producer generates 100 messages (simple text messages) and 
>>consumers take 'different' processing times to process every message. Hence, 
>>I expect, the consumer that take less time to finish processing would process 
>>more messages while the other one process less number of messages. However, 
>>when I ran them, I saw the number of messages is evenly splitted and 
>>processed. Can someone give me a hint here, how this can be achieved? And 
>>where I am going wrong?
>>
>>Here are my classes:-
>>
>>public class HelloWorldConsumer1 {
>>             public static void main(String[] args ){
>>            try {
>>            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>>                ActiveMQConnectionFactory connectionFactory = new 
>> ActiveMQConnectionFactory(url);
>>                // Create a Connection
>>                Connection connection = connectionFactory.createConnection();
>>                connection.start();
>>                // Create a Session
>>                Session session = connection.createSession(false, 
>> Session.AUTO_ACKNOWLEDGE);
>>                // Create the destination (Topic or Queue)
>>                Destination destination = session.createQueue("TEST.FOO");
>>                // Create a MessageConsumer from the Session to the Topic or 
>> Queue
>>                MessageConsumer consumer = 
>> session.createConsumer(destination);
>>                ArrayList<String> contents = new ArrayList<String>();
>>                boolean flag = true;
>>                while (flag){
>>                        Message message = consumer.receive(1000);
>>                if (message instanceof TextMessage) {
>>                    TextMessage textMessage = (TextMessage) message;
>>                    String text = textMessage.getText();
>>                    System.out.println("Received : " + text + " and 
>> processing");
>>                    Thread.sleep(5000);
>>                    if (text.equalsIgnoreCase("END")){
>>                        System.out.println("The recvd text is "+text);
>>                        flag = false;
>>                    }
>>                    contents.add(text);
>>                } else {
>>                        System.out.println("Received : "+message);
>>                }
>>                }
>>                System.out.println("Exiting..");
>>                System.out.println("The contents collected in consumer.java 
>> is "+contents.size());
>>                connection.close();
>>            } catch (Exception e) {
>>                System.out.println("Caught: " + e);
>>                e.printStackTrace();
>>            }
>>        }
>>}
>>
>>
>>
>>public class HelloWorldProducer {
>>        public static void main(String[] args) throws Exception {
>>                    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
>>                ActiveMQConnectionFactory connectionFactory = new 
>> ActiveMQConnectionFactory(url);
>>                // Create a Connection
>>                Connection connection = connectionFactory.createConnection();
>>                connection.start();
>>                // Create a Session
>>                Session session = 
>> connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
>>                // Create the destination (Topic or Queue)
>>                Destination destination = session.createQueue("TEST.FOO");
>>                // Create a MessageProducer from the Session to the Topic or 
>> Queue
>>                MessageProducer producer = 
>> session.createProducer(destination);
>>                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>>                int counter = 0;
>>                boolean flag = true;
>>                while (counter < 50){
>>                counter++;
>>                String text = "Message "+counter;// + 
>> Thread.currentThread().getName() + " : " + this.hashCode();
>>                TextMessage message = session.createTextMessage(text);
>>                // Tell the producer to send the message
>>                producer.send(message);
>>                System.out.println("Sent message '" + message.getText() + 
>> "'");
>>                Thread.sleep(1000);
>>                }
>>                Thread.sleep(2000);
>>                producer.send(session.createTextMessage("END"));
>>                Thread.sleep(4000);
>>                producer.send(session.createTextMessage("END"));
>>                // Clean up
>>                connection.close();
>>        }
>>    }
>>
>>
>>
>>
>>
>>With thanks and regards
>>Balachandar
>>
>>
>>
>>
>>The information in this e-mail is confidential. The contents may not be 
>>disclosed or used by anyone other than the addressee. Access to this e-mail 
>>by anyone else is unauthorised.
>>If you are not the intended recipient, please notify Airbus immediately and 
>>delete this e-mail.
>>Airbus cannot accept any responsibility for the accuracy or completeness of 
>>this e-mail as it has been sent over public networks. If you have any 
>>concerns over the content of this message or its Accuracy or Integrity, 
>>please contact Airbus immediately.
>>All outgoing e-mails from Airbus are checked using regularly updated virus 
>>scanning software but you should take whatever measures you deem to be 
>>appropriate to ensure that this message and any attachments are virus free.
>>
>
>This mail has originated outside your organization, either from an external 
>partner or the Global Internet.
>Keep this in mind if you answer this message.
>
>
>
>The information in this e-mail is confidential. The contents may not be 
>disclosed or used by anyone other than the addressee. Access to this e-mail by 
>anyone else is unauthorised.
>If you are not the intended recipient, please notify Airbus immediately and 
>delete this e-mail.
>Airbus cannot accept any responsibility for the accuracy or completeness of 
>this e-mail as it has been sent over public networks. If you have any concerns 
>over the content of this message or its Accuracy or Integrity, please contact 
>Airbus immediately.
>All outgoing e-mails from Airbus are checked using regularly updated virus 
>scanning software but you should take whatever measures you deem to be 
>appropriate to ensure that this message and any attachments are virus free.
>

Reply via email to