Hello everyone We are running AMQ 5.14 embedded mode under JBoss 7 with Java 7, 100% persistent messaging supported by JournalledJDCBPersistence to an Oracle 12 DB. Durable topic subscription processing is our focus.
Our activmq-broker.xml looks like this: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.0.xsd"> <amq:broker id="broker" brokerName="broker" persistent="true" schedulerSupport="true" useJmx="true"> <amq:destinationPolicy> <amq:policyMap> <amq:policyEntries> <amq:policyEntry topic="EllipseServices" allConsumersExclusiveByDefault="true" /> </amq:policyEntries> </amq:policyMap> </amq:destinationPolicy> <amq:destinations> <amq:topic physicalName="EllipseServices" /> </amq:destinations> <amq:managementContext> <amq:managementContext createConnector="false" /> </amq:managementContext> <amq:persistenceAdapter> <amq:journalPersistenceAdapter> <amq:persistenceAdapter> <amq:jdbcPersistenceAdapter dataSource="#dataSource" transactionIsolation="2" useDatabaseLock="false" cleanupPeriod="60000" /> </amq:persistenceAdapter> <amq:taskRunnerFactory> <bean class="org.apache.activemq.thread.TaskRunnerFactory"/> </amq:taskRunnerFactory> <amq:journal> <bean class="org.apache.activeio.journal.active.JournalImpl"> <constructor-arg index="0"> <bean class="java.io.File"> <constructor-arg index="0"> <value>./.activemq_data</value> </constructor-arg> </bean> </constructor-arg> <constructor-arg index="1"> <value>10</value> </constructor-arg> <constructor-arg index="2"> <value>104857600</value> </constructor-arg> </bean> </amq:journal> </amq:journalPersistenceAdapter> </amq:persistenceAdapter> <amq:plugins> <amq:redeliveryPlugin> <amq:redeliveryPolicyMap> <amq:redeliveryPolicyMap> <amq:redeliveryPolicyEntries> <amq:redeliveryPolicy topic="EllipseServices" maximumRedeliveries="0" /> </amq:redeliveryPolicyEntries> <amq:defaultEntry> <amq:redeliveryPolicy maximumRedeliveries="2" initialRedeliveryDelay="5000" useExponentialBackOff="true" /> </amq:defaultEntry> </amq:redeliveryPolicyMap> </amq:redeliveryPolicyMap> </amq:redeliveryPlugin> <amq:loggingBrokerPlugin logAll="true" /> </amq:plugins> <amq:transportConnectors> <amq:transportConnector uri="tcp://0.0.0.0:61616" /> <amq:transportConnector uri="vm://broker" /> </amq:transportConnectors> </amq:broker> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker"> <property name="brokerURL" value="vm://broker" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> </beans> We think our scenario is pretty basic and so we're pretty sure we're doing something wrong or misunderstanding something here. Can someone help explain how/why we're seeing what we're seeing. We seem to be losing messages which of course is NOT good. Scenario: When two durable consumers,DC1 and DC2, consuming from the same topic (T1) with different selectors, DC1 selector is “table=’TEST1’” and DC2 selector “table=’TEST2’”, are sent messages, first 100 msgs to DC1 (stopped status) and then 100 msgs to DC2 (receiving status), we see the ACTIVEMQ_ACKS.LAST_ACKED_ID of each consumer ultimately rise to 200 and within an hour all the messages are deleted. The 100 messages sent to DC1 are recoverable only as long as the broker does not restart. Interestingly, it actually happens like this: After DC1 has been sent 100 messages (with selection property “table=’TEST1’”): select * from activemq_acks where container = 'topic://EllipseServices' and sub_name='TestListenerSubscriber': CONTAINER SUB_DEST CLIENT_ID SUB_NAME SELECTOR LAST_ACKED_ID PRIORITY topic://EllipseServices topic://EllipseServices TestListenerClient TestListenerSubscriber table='TEST1' 0 0 topic://EllipseServices topic://EllipseServices TestListenerClient2 TestListenerSubscriber table='TEST2' 100 0 Out of JMX we see: - DC1 (stopped) and reporting 100 pending messages as we'd expect. - DC2 (still listening/receiving) shows 0 pending, 100 enqueued and 100 dequeued and 100 dispatched as we’d expect. After DC2 has been sent 100 messages (with selection property “table=’TEST2’”): select * from activemq_acks where container = 'topic://EllipseServices' and sub_name='TestListenerSubscriber': CONTAINER SUB_DEST CLIENT_ID SUB_NAME SELECTOR LAST_ACKED_ID PRIORITY topic://EllipseServices topic://EllipseServices TestListenerClient TestListenerSubscriber table='TEST1' 200 0 topic://EllipseServices topic://EllipseServices TestListenerClient2 TestListenerSubscriber table='TEST2' 200 0 Out of JMX we see: - DC1 (stopped) shows 100 pending as expected. - DC2 (still listening/receiving) shows 0 pending, 100 enqueued, 100 dequeued and 100 dispatched as we’d expect. Then, after up to 50 mins (5min checkpoint intervals x 10 priorities), the cleanup task kicks in issuing this statement (apparently to clean up consumed durable topic messages): 14:25:39,009 INFO [stdout] (ActiveMQ Task-1) 14:25:39.009 [ActiveMQ Task-1] DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleaning up old messages. 14:25:39,011 INFO [stdout] (ActiveMQ Task-1) 14:25:39.011 [ActiveMQ Task-1] DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Executing SQL: DELETE FROM ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <= ( SELECT min(ACTIVEMQ_ACKS.LAST_ACKED_ID) FROM ACTIVEMQ_ACKS WHERE ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER AND ACTIVEMQ_ACKS.PRIORITY=?) ) 14:25:39,013 INFO [stdout] (ActiveMQ Task-1) 14:25:39.012 [ActiveMQ Task-1] DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Deleted 0 old message(s) at priority: 0 14:25:39,013 INFO [stdout] (ActiveMQ Task-1) 14:25:39.013 [ActiveMQ Task-1] DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleanup done. All records in the DB are now deleted (we agree the statement would delete the rows based on the data in the DB and the predicates of the query but it doesn’t seem want one would want). Interestingly the cleanup message usually says “Deleted 0 old messages” (but sometimes it reports correctly the 100 rows deleted). Regardless, they’re definitely all gone from the DB after this log entry appears (the one with PRIORITY=0 predicate). Interestingly, if DC1 starts consuming again it will get all the pending messages (presumably from the in-memory structures being managed by the broker). However, if the broker is restarted any-time before the consumer receives all its outstanding message then all the messages are lost permanently…..OOPS! Some observations: 1. Consumers on the same topic but with different selectors seem to be able to increase the LAST_ACKED_ID for consumers with a completely different selector? How or why might that be happening? 2. The system behaves as if the selector has no real bearing on the the updating of the LAST_ACKED_ID row in ACTIVEMQ_ACKS table? 2. If DC1 is allowed to consume all its pending messages, the LAST_ACKED_ID for DC1 in the ACTIVEMQ_ACKS table is correctly set back to 100 Can someone help point to what we’re doing wrong that’s might be causing this behaviour? Thanks in advance. Jason -- Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html