We retry our testcase with the following lines in "activemq.xml" file to disable "flow control":
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb"> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> <subscriptionRecoveryPolicy> <lastImageSubscriptionRecoveryPolicy/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> but it doesn't seem to solve the problem: ActiveMQ still freezes after some messages... Olivier Bigard wrote: > > Ok, we will try to disable this flow control. > > Thank you > > > rajdavies wrote: >> >> It looks like you've hit flow control - you might want to disable it - >> see here: http://activemq.apache.org/producer-flow-control.html >> cheers, >> >> Rob >> >> On Jan 18, 2008, at 6:59 AM, Olivier Bigard wrote: >> >>> >>> Hello, >>> >>> We had the same problem with ActiveMQ 5.0.0. >>> Yesterday we downloaded the last 5.1 SNAPSHOT and the message >>> disappeared, >>> but ActiveMQ is still frozen after some messages... >>> I put a message in the forum with our Java code. >>> We are running ActiveMQ on Solaris platform with JDK 1.5.0_07. >>> >>> Can anyone help us? >>> Thank you >>> >>> >>> >>> Juergen.Schumacher wrote: >>>> >>>> Hello, >>>> >>>> ok, attaching the source code file does apparently not work here. >>>> So here are the relevant code parts from the producer and processor >>>> classes, >>>> which create the connections to ActiveMQ and the messages. >>>> >>>> Sorry for the trouble... >>>> >>>> Regards, >>>> Jürgen Schumacher >>>> >>>> ---------------- >>>> Procucer.java >>>> >>>> private void initializeBrokerConnection() throws JMSException { >>>> LOG.info("Connecting to " + brokerUrl); >>>> ActiveMQConnectionFactory connectionFactory = >>>> new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, >>>> ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); >>>> connection = connectionFactory.createQueueConnection(); >>>> connection.start(); >>>> session = connection.createQueueSession(true, >>>> Session.CLIENT_ACKNOWLEDGE); >>>> destination = session.createQueue("queue1"); >>>> producer = session.createProducer(destination); >>>> producer.setDeliveryMode(DeliveryMode.PERSISTENT); >>>> } >>>> >>>> private void startIteration() throws Exception { >>>> for (int i = 0; i < numberOfMessages; i++) { >>>> LOG.info("Creating message " + i); >>>> TextMessage message = session.createTextMessage("Message #" + >>>> i); >>>> LOG.info("Sending message " + i); >>>> producer.send(message); >>>> session.commit(); >>>> producedCount++; >>>> LOG.info("Produced " + producedCount + " messages."); >>>> } >>>> } >>>> >>>> --------------- >>>> Processor.java >>>> >>>> private void initializeBrokerConnection() throws JMSException { >>>> LOG.info("Connecting to " + brokerUrl); >>>> ActiveMQConnectionFactory connectionFactory = >>>> new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, >>>> ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); >>>> sourceConnection = connectionFactory.createQueueConnection(); >>>> destinationConnection = connectionFactory.createQueueConnection(); >>>> sourceConnection.start(); >>>> destinationConnection.start(); >>>> sourceSession = sourceConnection.createQueueSession(true, >>>> Session.CLIENT_ACKNOWLEDGE); >>>> destinationSession = >>>> destinationConnection.createQueueSession(true, >>>> Session.CLIENT_ACKNOWLEDGE); >>>> source = sourceSession.createQueue("queue1"); >>>> consumer = sourceSession.createConsumer(source); >>>> destination = destinationSession.createQueue("queue2"); >>>> producer = destinationSession.createProducer(destination); >>>> producer.setDeliveryMode(DeliveryMode.PERSISTENT); >>>> } >>>> >>>> public void onMessage(Message message) { >>>> if (message instanceof TextMessage) { >>>> try { >>>> String id = ((TextMessage) message).getText(); >>>> LOG.info("Processing id = " + id); >>>> MapMessage map = destinationSession.createMapMessage(); >>>> map.setString("id", id); >>>> map.setString("text", messageText); >>>> LOG.info("Send message"); >>>> producer.send(map); >>>> LOG.info("Acknowledge message"); >>>> message.acknowledge(); >>>> LOG.info("Commit source"); >>>> sourceSession.commit(); >>>> LOG.info("Commit destination"); >>>> destinationSession.commit(); >>>> producedCount++; >>>> LOG.info("Produced " + producedCount + " messages."); >>>> } catch (Exception ex) { >>>> LOG.error("Error during processing", ex); >>>> } >>>> >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> >>> >>> -- >>> View this message in context: >>> http://www.nabble.com/High-message-frequency-causes-ActiveMQ-to-freeze-tp14919292s2354p14947393.html >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>> >> >> >> > > -- View this message in context: http://www.nabble.com/High-message-frequency-causes-ActiveMQ-to-freeze-tp14919292s2354p14949383.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.