Richard, didn't you just send this same email to the ActiveMQ user list
(although with a slightly different subject)? Just want to clarify if they
are the same or different issues.


Justin

On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard <
richard.cann...@edfenergy.com> wrote:

> Hi,
>
>
>
> We are trying to enlist in a network of brokers arrangement with a third
> party broker hosted externally to us. We wish to receive messages from a
> topic (called *sourceTopic*) and forward to internal queues on our local
> broker for load balancing and throughput purposes using virtual
> destinations, a compositeTopic forwarding to two local queues on our local
> broker.
>
>
>
> In order to avoid implementing ingress firewall rules for the third party
> onto our estate, we have created a duplex network connector so that we
> initiate the outbound tcp connection. We have excluded our local broker
> queues from propagating to the other broker using the
> <excludedDestinations> configuration element.
>
>
>
> Our configuration below works fine, as long as our local broker is up, and
> we have an active consumer. In this case, messages published to the third
> party broker *sourceTopic* are successfully received on our composite
> topic forwarded queues, test1 and test2.  To avoid any confusion, the
> source message producer publishing to *sourceTopic *is explicitly using
> PersistentDelivery (even though I believe that is the default anyway).
>
>
>
> If we restart our broker, any messages that were published to the
> *sourceTopic* in the intervening period that our broker was down, are not
> received onto local forwarded queues test and test2.
>
>
>
> It would appear that the network connector configuration and\or
> compositeTopic do not result in a durable topic subscription to the
> *sourceTopic* on the other broker.
>
>
>
> I can also successfully recreate this behaviour with two local instances
> of Active MQ on my development machine running side by side with broker B
> duplex connected to broker A, so it doesn’t appear to be an issue with the
> third party broker either.
>
>
>
> Our configuration (sensitive data removed) is as below:
>
>
>
> Any ideas on how we can resolve this?
>
>
>
> Many thanks
>
>
> Richard
>
>
>
> <beans
>
>   xmlns="http://www.springframework.org/schema/beans";
>
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>
>   xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
>
>   http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>
>
>
>     <!-- Allows us to use system properties as variables in this
> configuration file -->
>
>     <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>
>         <property name="locations">
>
>             <value>file:${activemq.conf}/credentials.properties</value>
>
>         </property>
>
>     </bean>
>
>
>
>    <!-- Allows accessing the server log -->
>
>     <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
>
>           lazy-init="false" scope="singleton"
>
>           init-method="start" destroy-method="stop">
>
>     </bean>
>
>
>
>     <!--
>
>         The <broker> element is used to configure the ActiveMQ broker.
>
>     -->
>
>     <broker xmlns="http://activemq.apache.org/schema/core";
> brokerName="localhost" dataDirectory="${activemq.data}"
> useVirtualDestSubs="true">
>
>
>
>         <destinationPolicy>
>
>             <policyMap>
>
>               <policyEntries>
>
>                 <policyEntry topic=">" >
>
>                     <!-- The constantPendingMessageLimitStrategy is used
> to prevent
>
>                          slow topic consumers to block producers and
> affect other consumers
>
>                          by limiting the number of messages that are
> retained
>
>                          For more information, see:
>
>
>
>
> http://activemq.apache.org/slow-consumer-handling.html
>
>
>
>                     -->
>
>                   <pendingMessageLimitStrategy>
>
>                     <constantPendingMessageLimitStrategy limit="1000"/>
>
>                   </pendingMessageLimitStrategy>
>
>                 </policyEntry>
>
>
> <policyEntry queue="test" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>
> <policyEntry topic="*sourceTopic*" enableAudit="false">
>
>
> <networkBridgeFilterFactory>
>
>
> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
>
>
> </networkBridgeFilterFactory>
>
>
> </policyEntry>
>
>               </policyEntries>
>
>             </policyMap>
>
>         </destinationPolicy>
>
>
>
>
>
>
>
>                                 <destinationInterceptors>
>
>
> <virtualDestinationInterceptor>
>
>                                                   <virtualDestinations>
>
>
> <compositeTopic forwardOnly="true" name="* sourceTopic*">
>
>
> <forwardTo>
>
>
> <queue physicalName="test" />
>
>
>                                                                 <queue
> physicalName="test2" />
>
>
> </forwardTo>
>
>
> </compositeTopic>
>
>                                                   </virtualDestinations>
>
>
> </virtualDestinationInterceptor>
>
>                                 </destinationInterceptors>
>
>
>
>         <!--
>
>             The managementContext is used to configure how ActiveMQ is
> exposed in
>
>             JMX. By default, ActiveMQ uses the MBean server that is
> started by
>
>             the JVM. For more information, see:
>
>
>
>            http://activemq.apache.org/jmx.html
>
>         -->
>
>         <managementContext>
>
>             <managementContext createConnector="false"/>
>
>         </managementContext>
>
>
>
>         <!--
>
>             Configure message persistence for the broker. The default
> persistence
>
>             mechanism is the KahaDB store (identified by the kahaDB tag).
>
>             For more information, see:
>
>
>
>             http://activemq.apache.org/persistence.html
>
>         -->
>
>         <persistenceAdapter>
>
>             <kahaDB directory="${activemq.data}/kahadb"/>
>
>         </persistenceAdapter>
>
>
>
>
>
>           <!--
>
>             The systemUsage controls the maximum amount of space the
> broker will
>
>             use before disabling caching and/or slowing down producers.
> For more information, see:
>
>             http://activemq.apache.org/producer-flow-control.html
>
>           -->
>
>           <systemUsage>
>
>             <systemUsage>
>
>                 <memoryUsage>
>
>                     <memoryUsage percentOfJvmHeap="70" />
>
>                 </memoryUsage>
>
>                 <storeUsage>
>
>                     <storeUsage limit="100 gb"/>
>
>                 </storeUsage>
>
>                 <tempUsage>
>
>                     <tempUsage limit="50 gb"/>
>
>                 </tempUsage>
>
>             </systemUsage>
>
>         </systemUsage>
>
>
>
>         <!--
>
>             The transport connectors expose ActiveMQ over a given protocol
> to
>
>             clients and other brokers. For more information, see:
>
>
>
>             http://activemq.apache.org/configuring-transports.html
>
>         -->
>
>         <transportConnectors>
>
>             <!-- DOS protection, limit concurrent connections to 1000 and
> frame size to 100MB -->
>
>             <transportConnector name="openwire" uri="tcp://
> 0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <!--
>
>                                                 <transportConnector
> name="amqp" uri="amqp://
> 0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="stomp" uri="stomp://
> 0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="mqtt" uri="mqtt://
> 0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>             <transportConnector name="ws" uri="ws://
> 0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600
> "/>
>
>                                                 -->
>
>         </transportConnectors>
>
>
>
>         <!-- destroy the spring context on shutdown to stop jetty -->
>
>         <shutdownHooks>
>
>             <bean xmlns="http://www.springframework.org/schema/beans";
> class="org.apache.activemq.hooks.SpringContextHook" />
>
>         </shutdownHooks>
>
>
>
>                                 <networkConnectors>
>
>                                                 <networkConnector
> name="edf-to-thirdpartybroker-network-connector"
>
>                                                 useVirtualDestSubs="true"
>
>                                                 duplex="true"
>
>                                                 uri="static:(tcp://
> 127.0.0.1:61616)" userName="omitted" password="omitted" >
>
>
> <staticallyIncludedDestinations>
>
>
> <topic physicalName="*sourceTopic*"/>
>
>
> </staticallyIncludedDestinations>
>
>
> <excludedDestinations>
>
>
> <queue physicalName=">"/>
>
>
> </excludedDestinations>
>
>
>                                                 </networkConnector>
>
>                                 </networkConnectors>
>
>
>
>                                 <plugins>
>
>
> <simpleAuthenticationPlugin>
>
>                                                   <users>
>
>
> <authenticationUser username="system" password="omitted"
> groups="users,admins"/>
>
>
> <authenticationUser username="queueAUser" password="omitted"
> groups="queueAusers"/>
>
>
> <authenticationUser username="queueBUser" password="omitted"
> groups="queueBusers"/>
>
>                                                   </users>
>
>
> </simpleAuthenticationPlugin>
>
>
>
>                                                 <authorizationPlugin>
>
>         <map>
>
>           <authorizationMap>
>
>             <authorizationEntries>
>
>               <authorizationEntry queue=">" read="admins" write="admins"
> admin="admins" />
>
>               <authorizationEntry queue="test" read="queueAusers"
> write="queueAusers" admin="queueAUsers" />
>
>                                                   <authorizationEntry
> queue="test2" read="queueBusers" write="queueBusers" admin="queueBusers" />
>
>
>
>               <authorizationEntry topic=">" read="admins" write="admins"
> admin="admins" />
>
>
>
>
>
>               <authorizationEntry topic="ActiveMQ.Advisory.>"
> read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers"
> admin="admins,queueAusers,queueBusers" />
>
>             </authorizationEntries>
>
>
>
>             <!-- let's assign roles to temporary destinations. comment
> this entry if we don't want any roles assigned to temp destinations  -->
>
>
>
>           </authorizationMap>
>
>         </map>
>
>       </authorizationPlugin>
>
>
>
>
>
>                                 </plugins>
>
>
>
>
>
>
>
>
>
>                 </broker>
>
>
>
>     <!--
>
>         Enable web consoles, REST and Ajax APIs and demos
>
>         The web consoles requires by default login, you can disable this
> in the jetty.xml file
>
>
>
>         Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>
>     -->
>
>     <import resource="jetty.xml"/>
>
>
>
> </beans>
>
> <!-- END SNIPPET: example -->
>
>
>
> *Richard Cannock*
>
> *Solution Architect*
>
> Business Change and IT
>
> Customers
>
>
>
> *T:* +44 (0) 2081862465
>
> *M:* +44 (0)7729 320505
>
>
>
> Barnwood
>
> Barnett Way
>
> Gloucester, Gloucestershire, GL4 3RS
>
>
>
> [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24]
>
>
>
> edfenergy.com <http://www.edfenergy.com/>
>
>
>
> Please consider the environment before printing this email
>
>
>
> This e-mail and any files transmitted with it are confidential and may be
> protected by legal privilege. If you are not the intended recipient, please
> notify the sender and delete the e-mail from your system.
> This e-mail has been scanned for malicious content but the internet is
> inherently insecure and EDF Energy Limited can not accept any liability for
> the integrity of this message or its attachments.
> No employee or agent of EDF Energy Limited or any related company is
> authorised to conclude any binding agreement on behalf of EDF Energy
> Limited or any related company by e-mail.
>
> All e-mails sent and received by EDF Energy Limited are monitored to
> ensure compliance with the company's information security policy.
> Executable and script files are not permitted through the EDF Energy
> Limited mail gateway.  EDF Energy does not accept or send mails above 30 Mb
> in size.
>
> EDF Energy Limited
> Registered in England and Wales No. 2366852
> Registered Office: 90 Whitfield Street, London W1T 4EZ
>

Reply via email to