The behavior you described is consistent with your networkTTL=1 setting
plus the following sequence of events:

1. Message produced to broker A.
2. Consumer connected to broker B, so A forwards the message to B.
3. Before the consumer can consume the message, it disconnects and
reconnects to A.
4. B has no local consumer for the message and can't forward it to A
because of the TTL setting, so the message stays, unconsumed, on B.

That's not a guarantee that what I described is what actually happened, but
it's definitely possible/plausible. So do consider carefully whether to
raise that value above the default.

Tim

On Mon, Jan 27, 2020, 2:10 AM Jérôme Barotin <j...@s4e.fr> wrote:

> Hi Tim,
>
> Yes, I'm aware of this section in the documentation. The
> |replayWhenNoConsumers="true" is positionned|since several month, but
> this doesn't change anything.
>
> I'll consider your advice of changing the networkTTL setting, I've got a
> default value of 1, an higher value will avoid fail back mistake.
>
> Anyway, since last Thursday, I have set the
> decreaseNetworkConsumerPriority="true" in the NetworkConnector, and it
> works very well. I haven't observed message stuck since this change. If
> it still the case Friday this week, I'll update the "message stuck" part
> in the documentation.
>
> We're staying in touch,
>
> Jérôme
>
> Le 25/01/2020 à 14:29, Tim Bain a écrit :
>
> > Have you read the Stuck Messages section of
> > https://activemq.apache.org/networks-of-brokers and applied one of the
> > solutions it describes (I recommend the replayWhenNoConsumers=true
> solution
> > because it works no matter how many consumers you have). You'll also need
> > to ensure that your TTL setting for the Network of Brokers (which is not
> > the same as the TTL on an individual message) is high enough to allow
> > messages to hop between the brokers several times in case the consumer
> > fails back and forth several times. I'd think that a value of 10 would be
> > more than sufficient.
> >
> > Tim
> >
> > On Thu, Jan 23, 2020, 7:29 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >
> >> My question about DLQ is if you see some messages moved in DLQ (because
> >> they expired for instance).
> >>
> >> Anyway, I would try first:
> >> 1. decrease network priority to consume locally and avoid use of the
> bridge
> >> 2. increase the prefetch on the bridge (the network connector)
> >>
> >> Regards
> >> JB
> >>
> >> On 23/01/2020 14:52, Jérôme Barotin wrote:
> >>> Hi Jean Baptiste and Tim ,
> >>>
> >>> My response to your very interesting questions, below :
> >>>
> >>> @Jean Baptiste
> >>>> You are using a network of brokers.
> >>> Yes exactly.
> >>>
> >>>> Did you try decreaseNetworkPriority on the network connector ?
> >>> Nope I'll try that immediatly, we'll see in few days if it .
> >>>
> >>>> By default, messages will be forwarded to another broker only if
> >>> there's at least an active consumer.
> >>>
> >>>> Don't you have slow consumers identified ?
> >>> I don't know, the only way to check that is to use JMX ?
> >>>
> >>>> By the way, what's the broker URL in the connection factory ?
> failover ?
> >>> Yep, I'm using failover url with url of both brocker.  (
> >>> failover:(tcp://broker1:61616,tcp://broker2:61616) )
> >>>
> >>>> Are you using rebalance client ?
> >>> Nope, my topology is static, I think I don't need to do that.
> >>>
> >>>> No DLQ ?
> >>> What do you mean by this question, I have a DLQ, it's not the purpose
> >>> here, I think ?.
> >>>
> >>>> What's the prefetch on client side configuration ?
> >>> For the 200 000 messages a day queue, I set a prefetch size of 10
> >>> For the 10 000 messages a day queue, I set a prefetch size of 1
> >>>
> >>> @Tim
> >>>
> >>>> Do messages get stuck in both queues, or only one of them?
> >>> Message stuck only in one queue, the other one works very well.
> >>>
> >>> To complete, in the second brocker, there's no networkConnectors part,
> >>> cause, of the duplex=true.
> >>>
> >>>
> >>>
> >>> Le 23/01/2020 à 13:42, Jean-Baptiste Onofré a écrit :
> >>>
> >>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 23/01/2020 11:19, Jérôme Barotin wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I've got two AMQ brokers installed in cluster mode with KahaDB, in
> >> order
> >>>>> to have an HA configuration. I have configured 2 queues (no topic) :
> >>>>>
> >>>>>    * one that exchange 200 000 messages by day, message group is
> >> enabled,
> >>>>>      time to process a message goes between 10 to 60 seconds, it's
> done
> >>>>>      by 4 job with 15 threads each.
> >>>>>    * one others deal with 10 000 messages by day and there no group,
> >> it's
> >>>>>      consumed by 4 job with 5 thread each.
> >>>>>
> >>>>> Consumers are multithreaded with PooledConnectionFactory.
> >>>>>
> >>>>> All is working well but some time there messages that stuck. My
> >>>>> consumers are paused and are doing nothing. I can see that, on the
> >>>>> ActiveMq monitoring GUI :
> >>>>>
> >>>>>    * Number of pending message is higher than zero
> >>>>>    * All consumer, Dispatched queue is setted to zero
> >>>>>
> >>>>> Message can stuck 1 or 2 hours and get unblocked by theirself without
> >>>>> explicit reason, if I restart brockers : it unblocks messages.
> >>>>>
> >>>>> I checked AMQ documentation and lot of parameters and haven't found a
> >>>>> solution. That's why I'm posting here a description of my issue. Is
> >>>>> anybody have an idea of a something to change or something to
> monitor,
> >>>>> in order to help me ?
> >>>>>
> >>>>> The AMQ configuration is the following :
> >>>>>
> >>>>> <beans
> >>>>>     xmlns="http://www.springframework.org/schema/beans";
> >>>>>     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> >>>>>     xsi:schemaLocation="http://www.springframework.org/schema/beans
> >>>>> http://www.springframework.org/schema/beans/spring-beans.xsd
> >>>>>     http://activemq.apache.org/schema/core
> >>>>> http://activemq.apache.org/schema/core/activemq-core.xsd";>
> >>>>>
> >>>>>       <!-- Allows us to use system properties as variables in this
> >>>>> configuration file -->
> >>>>>       <bean
> >>>>>
> >>
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
> >>>>>
> >>>>>           <property name="locations">
> >>>>> <value>file:${activemq.conf}/credentials.properties</value>
> >>>>>           </property>
> >>>>>       </bean>
> >>>>>
> >>>>>      <!-- Allows accessing the server log -->
> >>>>>       <bean id="logQuery"
> >>>>> class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> >>>>>             lazy-init="false" scope="singleton"
> >>>>>             init-method="start" destroy-method="stop">
> >>>>>       </bean>
> >>>>>
> >>>>>       <!--
> >>>>>           The <broker> element is used to configure the ActiveMQ
> broker.
> >>>>>       -->
> >>>>>       <broker xmlns="http://activemq.apache.org/schema/core";
> >>>>> brokerName="energysoft" dataDirectory="${activemq.data}"
> useJmx="true">
> >>>>>
> >>>>>           <networkConnectors>
> >>>>>             <networkConnector
> uri="static:(tcp://myOtherBrokerIP:61616)"
> >>>>>                               userName="mylogin"
> >>>>>                               password="mypassword"
> >>>>>                               prefetchSize="100"
> >>>>>                               duplex="true"
> >>>>>                               />
> >>>>>           </networkConnectors>
> >>>>>
> >>>>>           <destinationPolicy>
> >>>>>               <policyMap>
> >>>>>                 <policyEntries>
> >>>>>                       <policyEntry queue=">" maxPageSize="10000"
> >>>>> maxBrowsePageSize="0" enableAudit="false" >
> >>>>>                     <networkBridgeFilterFactory>
> >>>>>                       <conditionalNetworkBridgeFilterFactory
> >>>>> replayWhenNoConsumers="true"/>
> >>>>>                     </networkBridgeFilterFactory>
> >>>>>                     <messageGroupMapFactory>
> >>>>>                       <simpleMessageGroupMapFactory/>
> >>>>>                     </messageGroupMapFactory>
> >>>>>                   </policyEntry>
> >>>>>                   <policyEntry topic=">" >
> >>>>>                     <pendingMessageLimitStrategy>
> >>>>>                       <constantPendingMessageLimitStrategy
> >> limit="1000"/>
> >>>>>                     </pendingMessageLimitStrategy>
> >>>>>                   </policyEntry>
> >>>>>                 </policyEntries>
> >>>>>               </policyMap>
> >>>>>           </destinationPolicy>
> >>>>>
> >>>>>           <plugins>
> >>>>>             <simpleAuthenticationPlugin>
> >>>>>                 <users>
> >>>>>                     <authenticationUser username="mylogin"
> >>>>> password="mypassword" groups="users,admins"/>
> >>>>>                 </users>
> >>>>>             </simpleAuthenticationPlugin>
> >>>>>           </plugins>
> >>>>>
> >>>>>           <managementContext>
> >>>>>               <managementContext createConnector="false"/>
> >>>>>           </managementContext>
> >>>>>
> >>>>>
> >>>>>           <persistenceAdapter>
> >>>>>               <kahaDB directory="${activemq.data}/kahadb"
> >>>>> journalMaxFileLength="32mb"/>
> >>>>>           </persistenceAdapter>
> >>>>>
> >>>>>             <systemUsage>
> >>>>>               <systemUsage>
> >>>>>                   <memoryUsage>
> >>>>>                       <memoryUsage percentOfJvmHeap="70" />
> >>>>>                   </memoryUsage>
> >>>>>                   <storeUsage>
> >>>>>                       <storeUsage limit="100 gb"/>
> >>>>>                   </storeUsage>
> >>>>>                   <tempUsage>
> >>>>>                       <tempUsage limit="50 gb"/>
> >>>>>                   </tempUsage>
> >>>>>               </systemUsage>
> >>>>>           </systemUsage>
> >>>>>
> >>>>>           <transportConnectors>
> >>>>>               <!-- DOS protection, limit concurrent connections to
> 1000
> >>>>> and frame size to 100MB -->
> >>>>>               <transportConnector name="openwire"
> >>>>> uri="tcp://
> >>
> 0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> >> "
> >>>>> />
> >>>>>               <transportConnector name="amqp"
> >>>>> uri="amqp://
> >>
> 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> >> "/>
> >>>>>
> >>>>>               <transportConnector name="stomp"
> >>>>> uri="stomp://
> >>
> 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> >> "/>
> >>>>>
> >>>>>               <transportConnector name="mqtt"
> >>>>> uri="mqtt://
> >>
> 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> >> "/>
> >>>>>
> >>>>>               <transportConnector name="ws"
> >>>>> uri="ws://
> >>
> 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> >> "/>
> >>>>>
> >>>>>               <!-- cluster port -->
> >>>>>               <!-- <transportConnector uri="tcp://localhost:62002"/>
> -->
> >>>>>           </transportConnectors>
> >>>>>
> >>>>>           <!-- destroy the spring context on shutdown to stop jetty
> -->
> >>>>>           <shutdownHooks>
> >>>>>               <bean xmlns="
> http://www.springframework.org/schema/beans";
> >>>>> class="org.apache.activemq.hooks.SpringContextHook" />
> >>>>>           </shutdownHooks>
> >>>>>       </broker>
> >>>>>
> >>>>>       <import resource="jetty.xml"/>
> >>>>>
> >>>>> </beans>
> >>>>>
> >>>>>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
>
>

Reply via email to