Hello everyone

We are running AMQ 5.14 embedded mode under JBoss 7 with Java 7, 100%
persistent messaging supported by JournalledJDCBPersistence to an Oracle 12
DB.  Durable topic subscription processing is our focus.

Our activmq-broker.xml looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
        xmlns:amq="http://activemq.apache.org/schema/core";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.10.0.xsd";>

        <amq:broker id="broker" brokerName="broker" persistent="true"
schedulerSupport="true" useJmx="true">
                <amq:destinationPolicy>
                        <amq:policyMap>
                                <amq:policyEntries>
                                        <amq:policyEntry
topic="EllipseServices"
                                               
allConsumersExclusiveByDefault="true" />
                                </amq:policyEntries>
                        </amq:policyMap>
                </amq:destinationPolicy>
                <amq:destinations>
                        <amq:topic physicalName="EllipseServices" />
                </amq:destinations>
                <amq:managementContext>
                        <amq:managementContext createConnector="false" />
                </amq:managementContext>
                <amq:persistenceAdapter>
                    <amq:journalPersistenceAdapter>
                         <amq:persistenceAdapter>
                              <amq:jdbcPersistenceAdapter
dataSource="#dataSource" transactionIsolation="2" useDatabaseLock="false"
cleanupPeriod="60000" />
                         </amq:persistenceAdapter>
                         <amq:taskRunnerFactory>
                              <bean
class="org.apache.activemq.thread.TaskRunnerFactory"/>
                         </amq:taskRunnerFactory>
                         <amq:journal>
                            <bean
class="org.apache.activeio.journal.active.JournalImpl">
                                  <constructor-arg index="0">
                                      <bean class="java.io.File">
                                           <constructor-arg index="0">
                                              
<value>./.activemq_data</value>
                                           </constructor-arg>
                                      </bean>
                                   </constructor-arg>
                                   <constructor-arg index="1">
                                       <value>10</value>
                                   </constructor-arg>
                                   <constructor-arg index="2">
                                       <value>104857600</value>
                                   </constructor-arg>
                            </bean>
                        </amq:journal>
                    </amq:journalPersistenceAdapter>
                </amq:persistenceAdapter>

                
                <amq:plugins>
                        <amq:redeliveryPlugin>
                                <amq:redeliveryPolicyMap>
                                        <amq:redeliveryPolicyMap>
                                               
<amq:redeliveryPolicyEntries>
                                                       
<amq:redeliveryPolicy topic="EllipseServices"
                                                               
maximumRedeliveries="0" />
                                               
</amq:redeliveryPolicyEntries>
                                                <amq:defaultEntry>
                                                       
<amq:redeliveryPolicy maximumRedeliveries="2"
                                                               
initialRedeliveryDelay="5000"
                                                               
useExponentialBackOff="true" />
                                                </amq:defaultEntry>
                                        </amq:redeliveryPolicyMap>
                                </amq:redeliveryPolicyMap>
                        </amq:redeliveryPlugin>
                        <amq:loggingBrokerPlugin logAll="true" />
                </amq:plugins>
                <amq:transportConnectors>
                        <amq:transportConnector uri="tcp://0.0.0.0:61616" />
                        <amq:transportConnector uri="vm://broker" />
                </amq:transportConnectors>
        </amq:broker>
        <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
                <property name="brokerURL" value="vm://broker" />
        </bean>
        <bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory"
ref="jmsConnectionFactory" />
        </bean>

</beans>
 
We think our scenario is pretty basic and so we're pretty sure we're doing
something wrong or misunderstanding something here.  Can someone help
explain how/why we're seeing what we're seeing.  We seem to be losing
messages which of course is NOT good.

Scenario:
When two durable consumers,DC1 and DC2, consuming from the same topic (T1)
with different selectors, DC1 selector is “table=’TEST1’” and DC2 selector
“table=’TEST2’”, are sent messages, first 100 msgs to DC1 (stopped status)
and then 100 msgs to DC2 (receiving status), we see the
ACTIVEMQ_ACKS.LAST_ACKED_ID of each consumer ultimately rise to 200 and
within an hour all the messages are deleted.   The 100 messages sent to DC1
are recoverable only as long as the broker does not restart.

Interestingly, it actually happens like this:

After DC1 has been sent 100 messages (with selection property
“table=’TEST1’”):

select * from activemq_acks where container = 'topic://EllipseServices' and
sub_name='TestListenerSubscriber':

CONTAINER                SUB_DEST                 CLIENT_ID            
SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
topic://EllipseServices  topic://EllipseServices  TestListenerClient   
TestListenerSubscriber  table='TEST1'        0               0  
topic://EllipseServices  topic://EllipseServices  TestListenerClient2  
TestListenerSubscriber  table='TEST2'        100             0

Out of JMX we see:
- DC1 (stopped) and reporting 100 pending messages as we'd expect. 
- DC2 (still listening/receiving) shows 0 pending, 100 enqueued and 100
dequeued and 100 dispatched as we’d expect.

After DC2 has been sent 100 messages (with selection property
“table=’TEST2’”):

select * from activemq_acks where container = 'topic://EllipseServices' and
sub_name='TestListenerSubscriber':

CONTAINER                SUB_DEST                 CLIENT_ID            
SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
topic://EllipseServices  topic://EllipseServices  TestListenerClient   
TestListenerSubscriber  table='TEST1'        200             0  
topic://EllipseServices  topic://EllipseServices  TestListenerClient2  
TestListenerSubscriber  table='TEST2'        200             0
Out of JMX we see: 
- DC1 (stopped) shows 100 pending as expected.  
- DC2 (still listening/receiving) shows 0 pending, 100 enqueued, 100
dequeued and 100 dispatched as we’d expect.


Then, after up to 50 mins (5min checkpoint intervals x 10 priorities), the
cleanup task kicks in issuing this statement (apparently to clean up
consumed durable topic messages):

14:25:39,009 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.009 [ActiveMQ Task-1]
DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleaning up old messages.
14:25:39,011 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.011 [ActiveMQ Task-1]
DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Executing SQL: DELETE FROM
ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=      ( SELECT
min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
ACTIVEMQ_ACKS.PRIORITY=?)   )
14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.012 [ActiveMQ Task-1]
DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Deleted 0 old message(s) at
priority: 0
14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.013 [ActiveMQ Task-1]
DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleanup done.

All records in the DB are now deleted (we agree the statement would delete
the rows based on the data in the DB and the predicates of the query but it
doesn’t seem want one would want).  Interestingly the cleanup message
usually says “Deleted 0 old messages” (but sometimes it reports correctly
the 100 rows deleted).  Regardless, they’re definitely all gone from the DB
after this log entry appears (the one with PRIORITY=0 predicate). 
Interestingly, if DC1 starts consuming again it will get all the pending
messages (presumably from the in-memory structures being managed by the
broker).  However, if the broker is restarted any-time before the consumer
receives all its outstanding message then all the messages are lost
permanently…..OOPS!  


Some observations:
1.      Consumers on the same topic but with different selectors seem to be able
to increase the LAST_ACKED_ID for consumers with a completely different
selector?  How or why might that be happening?
2. The system behaves as if the selector has no real bearing on the the
updating of the LAST_ACKED_ID row in ACTIVEMQ_ACKS table?
2.      If DC1 is allowed to consume all its pending messages, the LAST_ACKED_ID
for DC1 in the ACTIVEMQ_ACKS table is correctly set back to 100

Can someone help point to what we’re doing wrong that’s might be causing
this behaviour?

Thanks in advance.


Jason






--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Reply via email to