What happens if you run the same message stream with only DC1 subscribed? I
suspect that you'll see the same behavior, and that it stems not from
consumer cross-talk but from our improper handling of messages that don't
match the subscription's selector.

Tim

On Nov 6, 2017 10:42 PM, "jasons" <jason.sm...@au.abb.com> wrote:

> Hello everyone
>
> We are running AMQ 5.14 embedded mode under JBoss 7 with Java 7, 100%
> persistent messaging supported by JournalledJDCBPersistence to an Oracle 12
> DB.  Durable topic subscription processing is our focus.
>
> Our activmq-broker.xml looks like this:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans";
>         xmlns:amq="http://activemq.apache.org/schema/core";
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>         xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://activemq.apache.org/schema/core
>         http://activemq.apache.org/schema/core/activemq-core-5.10.0.xsd";>
>
>         <amq:broker id="broker" brokerName="broker" persistent="true"
> schedulerSupport="true" useJmx="true">
>                 <amq:destinationPolicy>
>                         <amq:policyMap>
>                                 <amq:policyEntries>
>                                         <amq:policyEntry
> topic="EllipseServices"
>
> allConsumersExclusiveByDefault="true" />
>                                 </amq:policyEntries>
>                         </amq:policyMap>
>                 </amq:destinationPolicy>
>                 <amq:destinations>
>                         <amq:topic physicalName="EllipseServices" />
>                 </amq:destinations>
>                 <amq:managementContext>
>                         <amq:managementContext createConnector="false" />
>                 </amq:managementContext>
>                 <amq:persistenceAdapter>
>                     <amq:journalPersistenceAdapter>
>                          <amq:persistenceAdapter>
>                               <amq:jdbcPersistenceAdapter
> dataSource="#dataSource" transactionIsolation="2" useDatabaseLock="false"
> cleanupPeriod="60000" />
>                          </amq:persistenceAdapter>
>                          <amq:taskRunnerFactory>
>                               <bean
> class="org.apache.activemq.thread.TaskRunnerFactory"/>
>                          </amq:taskRunnerFactory>
>                          <amq:journal>
>                             <bean
> class="org.apache.activeio.journal.active.JournalImpl">
>                                   <constructor-arg index="0">
>                                       <bean class="java.io.File">
>                                            <constructor-arg index="0">
>
> <value>./.activemq_data</value>
>                                            </constructor-arg>
>                                       </bean>
>                                    </constructor-arg>
>                                    <constructor-arg index="1">
>                                        <value>10</value>
>                                    </constructor-arg>
>                                    <constructor-arg index="2">
>                                        <value>104857600</value>
>                                    </constructor-arg>
>                             </bean>
>                         </amq:journal>
>                     </amq:journalPersistenceAdapter>
>                 </amq:persistenceAdapter>
>
>
>                 <amq:plugins>
>                         <amq:redeliveryPlugin>
>                                 <amq:redeliveryPolicyMap>
>                                         <amq:redeliveryPolicyMap>
>
> <amq:redeliveryPolicyEntries>
>
> <amq:redeliveryPolicy topic="EllipseServices"
>
> maximumRedeliveries="0" />
>
> </amq:redeliveryPolicyEntries>
>                                                 <amq:defaultEntry>
>
> <amq:redeliveryPolicy maximumRedeliveries="2"
>
> initialRedeliveryDelay="5000"
>
> useExponentialBackOff="true" />
>                                                 </amq:defaultEntry>
>                                         </amq:redeliveryPolicyMap>
>                                 </amq:redeliveryPolicyMap>
>                         </amq:redeliveryPlugin>
>                         <amq:loggingBrokerPlugin logAll="true" />
>                 </amq:plugins>
>                 <amq:transportConnectors>
>                         <amq:transportConnector uri="tcp://0.0.0.0:61616"
> />
>                         <amq:transportConnector uri="vm://broker" />
>                 </amq:transportConnectors>
>         </amq:broker>
>         <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
>                 <property name="brokerURL" value="vm://broker" />
>         </bean>
>         <bean id="jmsTemplate"
> class="org.springframework.jms.core.JmsTemplate">
>                 <property name="connectionFactory"
> ref="jmsConnectionFactory" />
>         </bean>
>
> </beans>
>
> We think our scenario is pretty basic and so we're pretty sure we're doing
> something wrong or misunderstanding something here.  Can someone help
> explain how/why we're seeing what we're seeing.  We seem to be losing
> messages which of course is NOT good.
>
> Scenario:
> When two durable consumers,DC1 and DC2, consuming from the same topic (T1)
> with different selectors, DC1 selector is “table=’TEST1’” and DC2 selector
> “table=’TEST2’”, are sent messages, first 100 msgs to DC1 (stopped status)
> and then 100 msgs to DC2 (receiving status), we see the
> ACTIVEMQ_ACKS.LAST_ACKED_ID of each consumer ultimately rise to 200 and
> within an hour all the messages are deleted.   The 100 messages sent to DC1
> are recoverable only as long as the broker does not restart.
>
> Interestingly, it actually happens like this:
>
> After DC1 has been sent 100 messages (with selection property
> “table=’TEST1’”):
>
> select * from activemq_acks where container = 'topic://EllipseServices' and
> sub_name='TestListenerSubscriber':
>
> CONTAINER                SUB_DEST                 CLIENT_ID
> SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
> topic://EllipseServices  topic://EllipseServices  TestListenerClient
> TestListenerSubscriber  table='TEST1'        0               0
> topic://EllipseServices  topic://EllipseServices  TestListenerClient2
> TestListenerSubscriber  table='TEST2'        100             0
>
> Out of JMX we see:
> - DC1 (stopped) and reporting 100 pending messages as we'd expect.
> - DC2 (still listening/receiving) shows 0 pending, 100 enqueued and 100
> dequeued and 100 dispatched as we’d expect.
>
> After DC2 has been sent 100 messages (with selection property
> “table=’TEST2’”):
>
> select * from activemq_acks where container = 'topic://EllipseServices' and
> sub_name='TestListenerSubscriber':
>
> CONTAINER                SUB_DEST                 CLIENT_ID
> SUB_NAME                SELECTOR             LAST_ACKED_ID   PRIORITY
> topic://EllipseServices  topic://EllipseServices  TestListenerClient
> TestListenerSubscriber  table='TEST1'        200             0
> topic://EllipseServices  topic://EllipseServices  TestListenerClient2
> TestListenerSubscriber  table='TEST2'        200             0
> Out of JMX we see:
> - DC1 (stopped) shows 100 pending as expected.
> - DC2 (still listening/receiving) shows 0 pending, 100 enqueued, 100
> dequeued and 100 dispatched as we’d expect.
>
>
> Then, after up to 50 mins (5min checkpoint intervals x 10 priorities), the
> cleanup task kicks in issuing this statement (apparently to clean up
> consumed durable topic messages):
>
> 14:25:39,009 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.009 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleaning up old messages.
> 14:25:39,011 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.011 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Executing SQL: DELETE FROM
> ACTIVEMQ_MSGS WHERE (PRIORITY=? AND ID <=      ( SELECT
> min(ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ_ACKS WHERE
> ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER        AND
> ACTIVEMQ_ACKS.PRIORITY=?)   )
> 14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.012 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.j.adapter.DefaultJDBCAdapter - Deleted 0 old message(s) at
> priority: 0
> 14:25:39,013 INFO  [stdout] (ActiveMQ Task-1) 14:25:39.013 [ActiveMQ
> Task-1]
> DEBUG o.a.a.s.jdbc.JDBCPersistenceAdapter - Cleanup done.
>
> All records in the DB are now deleted (we agree the statement would delete
> the rows based on the data in the DB and the predicates of the query but it
> doesn’t seem want one would want).  Interestingly the cleanup message
> usually says “Deleted 0 old messages” (but sometimes it reports correctly
> the 100 rows deleted).  Regardless, they’re definitely all gone from the DB
> after this log entry appears (the one with PRIORITY=0 predicate).
> Interestingly, if DC1 starts consuming again it will get all the pending
> messages (presumably from the in-memory structures being managed by the
> broker).  However, if the broker is restarted any-time before the consumer
> receives all its outstanding message then all the messages are lost
> permanently…..OOPS!
>
>
> Some observations:
> 1.      Consumers on the same topic but with different selectors seem to
> be able
> to increase the LAST_ACKED_ID for consumers with a completely different
> selector?  How or why might that be happening?
> 2. The system behaves as if the selector has no real bearing on the the
> updating of the LAST_ACKED_ID row in ACTIVEMQ_ACKS table?
> 2.      If DC1 is allowed to consume all its pending messages, the
> LAST_ACKED_ID
> for DC1 in the ACTIVEMQ_ACKS table is correctly set back to 100
>
> Can someone help point to what we’re doing wrong that’s might be causing
> this behaviour?
>
> Thanks in advance.
>
>
> Jason
>
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>

Reply via email to