Hi Tim, we managed to attach a debugger as requested and the path you
identified does seem to be the one taken by the incorrect update to
ACTIVEMQ_ACKS table in respect of DC1.  We took several stack dumps (text
below) at the different debug points you requested in case that also helps.  
Note that the code passed through the OracleJDBCAdapter.doSetLastAck method
twice, the first time updating LAST_ACKED_ID for DC1 and the second time, as
you'd sort of expected, updating the LAST_ACKED_ID for DC2 (correct as this
consumer was consuming and the selector matched).  We also used the debugger
to inspect the value of bind params on the UPDATE statement each time to
confirm which DC1 was being updated in which order.  We further used SQL
developer to identify when the change had occurred.   Based on your earlier
comment, I guess we move to creating a JIRA which I'll now do.  Never having
done this I assume I just bring the history of this forum topic to the JIRA
background.  Let me know please if anything else to do on our end.

Cheers

Jason.


Here are the stack traces in the order they fired and were captured from the
debug session for the thread hitting the breakpoint: 

1st BP:

Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35880@61616] (Suspended
(entry into method unmatched in DurableTopicSubscription))      
        owns: Topic  (id=72)    
        DurableTopicSubscription.unmatched(MessageReference) line: 98   
        SimpleDispatchPolicy.dispatch(MessageReference, 
MessageEvaluationContext,
List<Subscription>) line: 44    
        Topic.dispatch(ConnectionContext, Message) line: 769    
        Topic.doMessageSend(ProducerBrokerExchange, Message) line: 554  
        Topic.send(ProducerBrokerExchange, Message) line: 482   
        ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, Message)
line: 503       
        ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message)
line: 468       
        ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 293     
        SchedulerBroker(BrokerFilter).send(ProducerBrokerExchange, Message) 
line:
153     
        SchedulerBroker.send(ProducerBrokerExchange, Message) line: 195 
        AdvisoryBroker(BrokerFilter).send(ProducerBrokerExchange, Message) line:
153     
        CompositeDestinationBroker.send(ProducerBrokerExchange, Message) line: 
96       
        TransactionBroker.send(ProducerBrokerExchange, Message) line: 293       
        RedeliveryPlugin(MutableBrokerFilter).send(ProducerBrokerExchange, 
Message)
line: 158       
        LoggingBrokerPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
Message) line: 158      
        LoggingBrokerPlugin.send(ProducerBrokerExchange, Message) line: 275     
        BrokerService$6(MutableBrokerFilter).send(ProducerBrokerExchange, 
Message)
line: 158       
        ManagedTransportConnection(TransportConnection).processMessage(Message)
line: 581       
        ActiveMQTextMessage(ActiveMQMessage).visit(CommandVisitor) line: 768    
        ManagedTransportConnection(TransportConnection).service(Command) line: 
336      
        TransportConnection$1.onCommand(Object) line: 200       
        MutexTransport.onCommand(Object) line: 50       
        WireFormatNegotiator.onCommand(Object) line: 125        
        InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line: 
301        
        TcpTransport(TransportSupport).doConsume(Object) line: 83       
        TcpTransport.doRun() line: 233  
        TcpTransport.run() line: 215    
        Thread.run() line: 745  

2nd BP

Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35875@61616] (Suspended
(breakpoint at line 325 in DurableTopicSubscription))   
        owns: Object  (id=130)  
        DurableTopicSubscription.acknowledge(ConnectionContext, MessageAck,
MessageReference) line: 325     

DurableTopicSubscription(PrefetchSubscription).acknowledge(ConnectionContext,
MessageAck) line: 233   
        ManagedTopicRegion(AbstractRegion).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 526   
        ManagedRegionBroker(RegionBroker).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 484   
        SchedulerBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88    
        AdvisoryBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88    

CompositeDestinationBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88    
        TransactionBroker.acknowledge(ConsumerBrokerExchange, MessageAck) line: 
276     
        
RedeliveryPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98    

LoggingBrokerPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98    
        LoggingBrokerPlugin.acknowledge(ConsumerBrokerExchange, MessageAck) 
line:
162     
        BrokerService$6(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98    

ManagedTransportConnection(TransportConnection).processMessageAck(MessageAck)
line: 590       
        MessageAck.visit(CommandVisitor) line: 245      
        ManagedTransportConnection(TransportConnection).service(Command) line: 
336      
        TransportConnection$1.onCommand(Object) line: 200       
        MutexTransport.onCommand(Object) line: 50       
        WireFormatNegotiator.onCommand(Object) line: 125        
        InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line: 
301        
        TcpTransport(TransportSupport).doConsume(Object) line: 83       
        TcpTransport.doRun() line: 233  
        TcpTransport.run() line: 215    
        Thread.run() line: 745  


3rd BP:

Daemon Thread [Journal checkpoint worker] (Suspended)   
        OracleJDBCAdapter(DefaultJDBCAdapter).doSetLastAck(TransactionContext,
ActiveMQDestination, XATransactionId, String, String, long, long) line: 526     
        JDBCTopicMessageStore.acknowledge(ConnectionContext, String, String,
MessageId, MessageAck) line: 86 
        JournalTopicMessageStore$2.execute() line: 191  
        JournalMessageStore$3.execute() line: 306       
        TransactionTemplate.run(Callback) line: 44      
        JournalTopicMessageStore(JournalMessageStore).checkpoint(Callback) line:
262     
        JournalTopicMessageStore.checkpoint() line: 182 
        JournalPersistenceAdapter$5.call() line: 455    
        JournalPersistenceAdapter$5.call() line: 452    
        FutureTask<V>.run() line: 262   
        ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145      
        ThreadPoolExecutor$Worker.run() line: 615       
        Thread.run() line: 745  








--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Reply via email to