What happens if you run the same message stream with only DC1 subscribed? I suspect that you'll see the same behavior, and that it stems not from consumer cross-talk but from our improper handling of messages that don't match the subscription's selector.
Tim On Nov 6, 2017 10:42 PM, "jasons" <jason.sm...@au.abb.com> wrote: > Hello everyone > > We are running AMQ 5.14 embedded mode under JBoss 7 with Java 7, 100% > persistent messaging supported by JournalledJDCBPersistence to an Oracle 12 > DB. Durable topic subscription processing is our focus. > > Our activmq-broker.xml looks like this: > > <?xml version="1.0" encoding="UTF-8"?> > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.apache.org/schema/core" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=" > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core-5.10.0.xsd"> > > <amq:broker id="broker" brokerName="broker" persistent="true" > schedulerSupport="true" useJmx="true"> > <amq:destinationPolicy> > <amq:policyMap> > <amq:policyEntries> > <amq:policyEntry > topic="EllipseServices" > > allConsumersExclusiveByDefault="true" /> > </amq:policyEntries> > </amq:policyMap> > </amq:destinationPolicy> > <amq:destinations> > <amq:topic physicalName="EllipseServices" /> > </amq:destinations> > <amq:managementContext> > <amq:managementContext createConnector="false" /> > </amq:managementContext> > <amq:persistenceAdapter> > <amq:journalPersistenceAdapter> > <amq:persistenceAdapter> > <amq:jdbcPersistenceAdapter > dataSource="#dataSource" transactionIsolation="2" useDatabaseLock="false" > cleanupPeriod="60000" /> > </amq:persistenceAdapter> > <amq:taskRunnerFactory> > <bean > class="org.apache.activemq.thread.TaskRunnerFactory"/> > </amq:taskRunnerFactory> > <amq:journal> > <bean > class="org.apache.activeio.journal.active.JournalImpl"> > <constructor-arg index="0"> > <bean class="java.io.File"> > <constructor-arg index="0"> > > <value>./.activemq_data</value> > </constructor-arg> > </bean> > </constructor-arg> > <constructor-arg index="1"> > <value>10</value> > </constructor-arg> > <constructor-arg index="2"> > <value>104857600</value> > </constructor-arg> > </bean> > </amq:journal> > </amq:journalPersistenceAdapter> > </amq:persistenceAdapter> > > > <amq:plugins> > <amq:redeliveryPlugin> > <amq:redeliveryPolicyMap> > <amq:redeliveryPolicyMap> > > <amq:redeliveryPolicyEntries> > > <amq:redeliveryPolicy topic="EllipseServices" > > maximumRedeliveries="0" /> > > </amq:redeliveryPolicyEntries> > <amq:defaultEntry> > > <amq:redeliveryPolicy maximumRedeliveries="2" > > initialRedeliveryDelay="5000" > > useExponentialBackOff="true" /> > </amq:defaultEntry> > </amq:redeliveryPolicyMap> > </amq:redeliveryPolicyMap> > </amq:redeliveryPlugin> > <amq:loggingBrokerPlugin logAll="true" /> > </amq:plugins> > <amq:transportConnectors> > <amq:transportConnector uri="tcp://0.0.0.0:61616" > /> > <amq:transportConnector uri="vm://broker" /> > </amq:transportConnectors> > </amq:broker> > <bean id="jmsConnectionFactory" > class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker"> > <property name="brokerURL" value="vm://broker" /> > </bean> > <bean id="jmsTemplate" > class="org.springframework.jms.core.JmsTemplate"> > <property name="connectionFactory" > ref="jmsConnectionFactory" /> > </bean> > > </beans> > > We think our scenario is pretty basic and so we're pretty sure we're doing > something wrong or misunderstanding something here. Can someone help > explain how/why we're seeing what we're seeing. We seem to be losing > messages which of course is NOT good. > > Scenario: > When two durable consumers,DC1 and DC2, consuming from the same topic (T1) > with different selectors, DC1 selector is “table=’TEST1’” and DC2 selector > “table=’TEST2’”, are sent messages, first 100 msgs to DC1 (stopped status) > and then 100 msgs to DC2 (receiving status), we see the > ACTIVEMQ_ACKS.LAST_ACKED_ID of each consumer ultimately rise to 200 and > within an hour all the messages are deleted. The 100 messages sent to DC1 > are recoverable only as long as the broker does not restart. > > Interestingly, it actually happens like this: > > After DC1 has been sent 100 messages (with selection property > “table=’TEST1’”): > > select * from activemq_acks where container = 'topic://EllipseServices' and > sub_name='TestListenerSubscriber': > > CONTAINER SUB_DEST CLIENT_ID > SUB_NAME SELECTOR LAST_ACKED_ID PRIORITY > topic://EllipseServices topic://EllipseServices TestListenerClient > TestListenerSubscriber table='TEST1' 0 0 > topic://EllipseServices topic://EllipseServices TestListenerClient2 > TestListenerSubscriber table='TEST2' 100 0 > > Out of JMX we see: > - DC1 (stopped) and reporting 100 pending messages as we'd expect. > - DC2 (still listening/receiving) shows 0 pending, 100 enqueued and 100 > dequeued and 100 dispatched as we’d expect. > > After DC2 has been sent 100 messages (with selection property > “table=’TEST2’”): > > select * from activemq_acks where container = 'topic://EllipseServices' and > sub_name='TestListenerSubscriber': > > CONTAINER SUB_DEST CLIENT_ID > SUB_NAME SELECTOR LAST_ACKED_ID PRIORITY > topic://EllipseServices topic://EllipseServices TestListenerClient > TestListenerSubscriber table='TEST1' 200 0 > topic://EllipseServices topic://EllipseServices TestListenerClient2 > TestListenerSubscriber table='TEST2' 200 0 > Out of JMX we see: > - DC1 (stopped) shows 100 pending as expected. > - DC2 (still listening/receiving) shows 0 pending, 100 enqueued, 100 > dequeued and 100 dispatched as we’d expect. > > > Then, after up to 50 mins (5min checkpoint intervals x 10 priorities), the > cleanup task kicks in issuing this statement (apparently to clean up > consumed durable topic messages): > > 14:25:39,009 INFO [stdout] (ActiveMQ Task-1) 14:25:39.009 [ActiveMQ > Task-1] > DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleaning up old messages. > 14:25:39,011 INFO [stdout] (ActiveMQ Task-1) 14:25:39.011 [ActiveMQ > Task-1] > DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Executing SQL: DELETE FROM > ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <= ( SELECT > min(ACTIVEMQ_ACKS.LAST_ACKED_ID) FROM ACTIVEMQ_ACKS WHERE > ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER AND > ACTIVEMQ_ACKS.PRIORITY=?) ) > 14:25:39,013 INFO [stdout] (ActiveMQ Task-1) 14:25:39.012 [ActiveMQ > Task-1] > DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Deleted 0 old message(s) at > priority: 0 > 14:25:39,013 INFO [stdout] (ActiveMQ Task-1) 14:25:39.013 [ActiveMQ > Task-1] > DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleanup done. > > All records in the DB are now deleted (we agree the statement would delete > the rows based on the data in the DB and the predicates of the query but it > doesn’t seem want one would want). Interestingly the cleanup message > usually says “Deleted 0 old messages” (but sometimes it reports correctly > the 100 rows deleted). Regardless, they’re definitely all gone from the DB > after this log entry appears (the one with PRIORITY=0 predicate). > Interestingly, if DC1 starts consuming again it will get all the pending > messages (presumably from the in-memory structures being managed by the > broker). However, if the broker is restarted any-time before the consumer > receives all its outstanding message then all the messages are lost > permanently…..OOPS! > > > Some observations: > 1. Consumers on the same topic but with different selectors seem to > be able > to increase the LAST_ACKED_ID for consumers with a completely different > selector? How or why might that be happening? > 2. The system behaves as if the selector has no real bearing on the the > updating of the LAST_ACKED_ID row in ACTIVEMQ_ACKS table? > 2. If DC1 is allowed to consume all its pending messages, the > LAST_ACKED_ID > for DC1 in the ACTIVEMQ_ACKS table is correctly set back to 100 > > Can someone help point to what we’re doing wrong that’s might be causing > this behaviour? > > Thanks in advance. > > > Jason > > > > > > > -- > Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User- > f2341805.html >