Gary,

I did recommended changes. On the embedded broker side everything looks fine
now. I restarted Jetty (embedded broker) many times and it worked.

What is interesting I started to see some exceptions in my other application
that is listening on the same ActiveMQ server. I think below schema will
help to explain my topology,

client ---> topic://OUTGOING (embedded broker with JMS connector) -->
topic://OUT.subtopic (ActiveMQ server) --> Mule ESB

When message is sent from client to MuleESB, after implementing your changes
I started to see follwoing message:

19761 [ActiveMQ Session Task] DEBUG
org.mule.transport.jms.MultiConsumerJmsMessageReceiver  - Message Received
from: jms://topic:OUT.*
19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  - There is
no session id on the request using key: ID. Generating new session id:
575fc82b-1a0f-11de-8ce7-c7a955038e40
19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
dispatching event to service: service.que-outgoing, event is: MuleEvent:
575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f
alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*,
connector=ActiveMQJmsConnector{this=ad97f5, started=true, initialised=true,
name='jmsConnector', disposed=false, numberOfConcurrentTransactedReceiv
ers=4, createMultipleTransactedReceivers=true, connected=true,
supportedProtocols=[jms], serviceOverrides=null},
transformer=[JMSMessageToObject{this=170b819, name='JMSMessageToObject',
ignoreBadInput=false,
 returnClass=class java.lang.Object, sourceTypes=[interface
javax.jms.Message, interface javax.jms.TextMessage, interface
javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface
javax.jms.MapMes
sage, interface javax.jms.StreamMessage]}, XML2UMMTransformer{this=146ad8b,
name='xml2umm', ignoreBadInput=false, returnClass=class java.lang.Object,
sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14,
name='extract-headers', ignoreBadInput=false, returnClass=class
java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT',
properties={durableName=cozaciota},
transactionConfig=Transaction{factory=null, ac
tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false,
securityFilter=null, synchronous=false, initialState=started,
remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8}
10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG
org.apache.activemq.ActiveMQConnection  - Async exception with no exception
listener: java.lang.NullPointerException
java.lang.NullPointerException
        at
org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265)
        at
org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314)
        at
org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695)
        at
org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148)
        at
org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163)
        at
org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91)
        at
org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447)
        at
org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223)
        at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238)
        at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
        at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205)
        at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
        at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)

Application is working correctly, the only thing is this exception in logs.
For testing purposes I'm running both client and Mule ESB under same JVM.

Jarek


W dniu 26 marca 2009 14:26 użytkownik Gary Tully <gary.tu...@gmail.com>napisał:

> Hi Jarek,
> some similar behaviour is expressed by issue
> AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.
>
> One aspect of the problem is related to the persistent index used by the
> Kaha store. You could try using a VM (in memory) index to see if it helps
> in
> your case.
>
> To configure set the setPersistentIndex attribute to false as follows:
>
>        broker = ....
>        AMQPersistenceAdapterFactory persistenceFactory = new
> AMQPersistenceAdapterFactory();
>        persistenceFactory.setDataDirectory(dataDirFile);
>        persistenceFactory.setPersistentIndex(false);
>        broker.setPersistenceFactory(persistenceFactory);
>
> hope this helps,
> Gary.
>
> 2009/3/26 Jarosław Pałka <jpa...@gmail.com>
>
> > Sorry I forgot to copy embedded broker configuration:
> >
> > public abstract class UMMClientConfiguration extends
> ConfigurationSupport{
> >
> >    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> >    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
> >
> >    @Bean
> >    public BrokerService brokerService() throws Exception {
> >        XBeanBrokerService broker = new XBeanBrokerService();
> >        broker.setUseJmx(false);
> >        broker.setPersistent(true);
> >        //broker.
> >        broker.addJmsConnector(incomingTopicConnector());
> >        broker.addJmsConnector(outgoingTopicConnector());
> >        broker.addConnector("vm://localhost");
> >        broker.start();
> >        return broker;
> >    }
> >
> >     @Bean
> >    public JmsTopicConnector outgoingTopicConnector()
> >            throws UnknownHostException, URISyntaxException {
> >        JmsTopicConnector topicConnector = new JmsTopicConnector();
> >        topicConnector
> >
> > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> >        topicConnector.setLocalClientId("local");
> >        topicConnector
> >                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> > outboundTopicBridge() });
> >        return topicConnector;
> >    }
> >
> >    @Bean
> >    public TopicConnectionFactory outboundTopicConnectionFactory()
> >            throws UnknownHostException, URISyntaxException {
> >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > ActiveMQConnectionFactory(
> >                getAMQBrokerURL());
> >        activeMQConnectionFactory.setClientID(getClientID());
> >        return activeMQConnectionFactory;
> >    }
> >
> >    @Bean
> >    public OutboundTopicBridge outboundTopicBridge() throws
> > URISyntaxException {
> >        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
> >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> >        topicBridge.setConsumerName(getConsumerName());
> >        return topicBridge;
> >    }
> >
> >    @Bean
> >    public JmsTemplate jmsTemplate() throws UnknownHostException,
> >            URISyntaxException {
> >        JmsTemplate jmsTemplate = new JmsTemplate(
> >                localOutgoingTopicConnectionFactory());
> >        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
> >        jmsTemplate.setDeliveryPersistent(true);
> >        jmsTemplate.setPubSubDomain(true);
> >        return jmsTemplate;
> >    }
> >
> >      @Bean(dependsOn = "brokerService")
> >    public TopicConnectionFactory localOutgoingTopicConnectionFactory()
> >            throws UnknownHostException, URISyntaxException {
> >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > ActiveMQConnectionFactory(
> >                "vm://localhost");
> >        activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
> >        return activeMQConnectionFactory;
> >    }
> > }
> >
> > Regards,
> > Jarek
> >
> > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jpa...@gmail.com
> > >napisał:
> >
> > > Hi,
> > >
> > > I have strange behavior of embedded ActiveMQ 5.2 broker.
> > >
> > > I have an web application running under Jetty 1.6.14 (with Spring
> 2.5.6).
> > I
> > > have embedded broker that is connected to ActiveMQ server through JMS
> > > connector.I have to topics INCOMING and OUTGOING that retrieve and
> > forward
> > > messages to ActiveMQ server.
> > > I use XBeanBrokerService to configure embedded broker. The
> configuration
> > of
> > > embedded broker is in attachment.
> > >
> > > Everything runs great until restart of Jetty, once restarted web
> > > application is able to receive messages but it doesn't send messages. I
> > got
> > > following exception logged in ActiveMQ server:
> > >
> > > ERROR RecoveryListenerAdapter        - Message id
> > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from
> the
> > > data store - already dispatched
> > > ERROR Service                        - Async error occurred:
> > > javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched
> > > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> > 0,
> > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > > javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched
> > > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> > 0,
> > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > >         at
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > >         at
> > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > >         at
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > >         at
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > >         at
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > >         at
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > >         at java.lang.Thread.run(Thread.java:619)
> > >
> > > and similar message at web application side during shutdown:
> > >
> > > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > > selectchannelconnec...@0.0.0.0:8981
> > > 140421 [ActiveMQ ShutdownHook] INFO
> > > org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > > 140421 [ActiveMQ ShutdownHook] DEBUG
> > > org.apache.activemq.broker.BrokerService  - Caught exception, must be
> > > shutting down: java.lang.IllegalStateException: Shutdown in progress
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@173831b
> > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@1abab88
> > > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.handler.contexthand...@b1b4c3
> > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@5e5f92
> > >
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@1d4ab0e
> > > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@12a585c
> > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@10f3801
> > >
> {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.webappcont...@2606b8
> > > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > > 140444 [ActiveMQ ShutdownHook] INFO
> > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> Connector:0
> > > Stopped
> > > 140447 [ActiveMQ ShutdownHook] INFO
> > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> Connector:1
> > > Stopped
> > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > > org.apache.activemq.broker.TransportConnection  - Stopping connection:
> > > vm://localhost#0
> > > 140452 [VMTransport] DEBUG
> org.apache.activemq.store.amq.AMQMessageStore
> >  -
> > > flush starting ...
> > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > exception
> > > listener: javax.jms.JMSException: Could not correlate acknowledgment
> with
> > > dispatched message: MessageAck {commandId = 8, responseRequired =
> false,
> > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId
> =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > > javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched
> > > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> > 0,
> > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > >         at
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > >         at
> > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > >         at
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > >         at
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > >         at
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > >         at
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > >         at java.lang.Thread.run(Thread.java:619)
> > > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  -
> > Async
> > > exception with no exception listener:
> > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > (vm://localhost#1) disposed.
> > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > (vm://localhost#1) disposed.
> > >         at
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> > >         at
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > >         at
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > >         at java.lang.Thread.run(Thread.java:619)
> > >
> > >
> > > The only workaround I found is to remove activemq-data directory at web
> > > application site before starting Jetty again.
> > >
> > > Can you tell me what I'm doing wrong? I have not seen such behavior
> with
> > > ActiveMQ 5.1.
> > >
> > > Regards,
> > > Jarek
> > >
> > >
> > >
> >
>
>
>
> --
> http://blog.garytully.com
>
> Open Source SOA
> http://FUSESource.com
>

Reply via email to