Hi,

I was trying to create a test case for this issue but I am not being able to
replicate it in an automatic tests, I was debugging the servers that use for
reproduce the issue and print all the binding information from one of them.

= Start Server 1

*Start* LocalQueueBinding [address=Notifications,
queue=QueueImpl[name=Notifications, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@282308c3, filter=null, name=Notifications,
clusterName=Notificationsfac9cdae-1909-11e9-846c-c4b301c8ad71] *End*
*Start* DivertBinding [id=7, address=*.Provider.*.Agent.*.Status,
divert=DivertImpl [routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71,
uniqueName=notifications-divert, forwardAddress=Notifications,
exclusive=true, filter=null, transformer=null], filter=null,
uniqueName=notifications-divert,
routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71, exclusive=true] *End*

= Start Server 2

*Start* LocalQueueBinding
[address=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa, filter=null,
name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
clusterName=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71fac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*
*Start* LocalQueueBinding [address=activemq.notifications,
queue=QueueImpl[name=notif.1080e27a-190a-11e9-aa5a-c4b301c8ad71.ActiveMQServerImpl_serverUUID=101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=true]@3782fffb, filter=FilterImpl [sfilterString=_AMQ_Binding_Type<>2
AND _AMQ_NotifType IN
('BINDING_ADDED','BINDING_REMOVED','CONSUMER_CREATED','CONSUMER_CLOSED','PROPOSAL','PROPOSAL_RESPONSE','UNPROPOSAL')
AND _AMQ_Distance<1 AND (((_AMQ_Address NOT LIKE '$.artemis.internal.sf.%')
AND (_AMQ_Address NOT LIKE 'activemq.management%') AND (_AMQ_Address NOT
LIKE 'activemq.notifications%')))],
name=notif.1080e27a-190a-11e9-aa5a-c4b301c8ad71.ActiveMQServerImpl_serverUUID=101f3a70-190a-11e9-aa5a-c4b301c8ad71,
clusterName=notif.1080e27a-190a-11e9-aa5a-c4b301c8ad71.ActiveMQServerImpl_serverUUID=101f3a70-190a-11e9-aa5a-c4b301c8ad71fac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*
2019-01-15 18:11:02,947 INFO  [org.apache.activemq.artemis.core.server]
AMQ221027: Bridge ClusterConnectionBridge@1b3c29db
[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa targetConnector=ServerLocatorImpl
(identity=(Cluster-connection-bridge::ClusterConnectionBridge@1b3c29db
[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa targetConnector=ServerLocatorImpl
[initialConnectors=[TransportConfiguration(name=netty-connector,
factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)
?port=61617&host=localhost],
discoveryGroupConfiguration=null]]::ClusterConnectionImpl@486759395[nodeUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71,
connector=TransportConfiguration(name=netty-connector,
factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)
?port=61616&host=localhost, address=,
server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71]))
[initialConnectors=[TransportConfiguration(name=netty-connector,
factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)
?port=61617&host=localhost], discoveryGroupConfiguration=null]] is connected
*Start* RemoteQueueBindingImpl(connected)[address=Notifications,
consumerCount=0, distance=1, filters=[], id=23,
idsHeaderName=_AMQ_ROUTE_TO$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queueFilter=null, remoteQueueID=4, routingName=Notifications,
storeAndForwardQueue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa,
uniqueName=Notifications101f3a70-190a-11e9-aa5a-c4b301c8ad71] *End*

= Connect Consumer

*Start* LocalQueueBinding [address=Notifications,
queue=QueueImpl[name=Notifications, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@282308c3, filter=null, name=Notifications,
clusterName=Notificationsfac9cdae-1909-11e9-846c-c4b301c8ad71] *End*
*Start* RemoteQueueBindingImpl(connected)[address=Notifications,
consumerCount=0, distance=1, filters=[], id=23,
idsHeaderName=_AMQ_ROUTE_TO$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queueFilter=null, remoteQueueID=4, routingName=Notifications,
storeAndForwardQueue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa,
uniqueName=Notifications101f3a70-190a-11e9-aa5a-c4b301c8ad71] *End*

= Produce Message to 1

*Start* LocalQueueBinding [address=x.Provider.y.Agent.z.Status,
queue=QueueImpl[name=x.Provider.y.Agent.z.Status, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@467cd269, filter=null, name=x.Provider.y.Agent.z.Status,
clusterName=x.Provider.y.Agent.z.Statusfac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*
*Start* DivertBinding [id=7, address=*.Provider.*.Agent.*.Status,
divert=DivertImpl [routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71,
uniqueName=notifications-divert, forwardAddress=Notifications,
exclusive=true, filter=null, transformer=null], filter=null,
uniqueName=notifications-divert,
routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71, exclusive=true] *End*
*Start* LocalQueueBinding [address=x.Provider.y.Agent.z.Status,
queue=QueueImpl[name=x.Provider.y.Agent.z.Status, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@467cd269, filter=null, name=x.Provider.y.Agent.z.Status,
clusterName=x.Provider.y.Agent.z.Statusfac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*

= Produce Message to 2

*Start*
RemoteQueueBindingImpl(connected)[address=x.Provider.y.Agent.z.Status,
consumerCount=0, distance=1, filters=[], id=141,
idsHeaderName=_AMQ_ROUTE_TO$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queueFilter=null, remoteQueueID=40, routingName=x.Provider.y.Agent.z.Status,
storeAndForwardQueue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa,
uniqueName=x.Provider.y.Agent.z.Status101f3a70-190a-11e9-aa5a-c4b301c8ad71]
*End*
*Start* DivertBinding [id=7, address=*.Provider.*.Agent.*.Status,
divert=DivertImpl [routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71,
uniqueName=notifications-divert, forwardAddress=Notifications,
exclusive=true, filter=null, transformer=null], filter=null,
uniqueName=notifications-divert,
routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71, exclusive=true] *End*

= Produce Message to 1

*Start* LocalQueueBinding [address=x.Provider.y.Agent.z.Status,
queue=QueueImpl[name=x.Provider.y.Agent.z.Status, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@467cd269, filter=null, name=x.Provider.y.Agent.z.Status,
clusterName=x.Provider.y.Agent.z.Statusfac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*
*Start*
RemoteQueueBindingImpl(connected)[address=x.Provider.y.Agent.z.Status,
consumerCount=0, distance=1, filters=[], id=141,
idsHeaderName=_AMQ_ROUTE_TO$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queueFilter=null, remoteQueueID=40, routingName=x.Provider.y.Agent.z.Status,
storeAndForwardQueue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa,
uniqueName=x.Provider.y.Agent.z.Status101f3a70-190a-11e9-aa5a-c4b301c8ad71]
*End*
*Start* LocalQueueBinding [address=x.Provider.y.Agent.z.Status,
queue=QueueImpl[name=x.Provider.y.Agent.z.Status, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@467cd269, filter=null, name=x.Provider.y.Agent.z.Status,
clusterName=x.Provider.y.Agent.z.Statusfac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*
*Start*
RemoteQueueBindingImpl(connected)[address=x.Provider.y.Agent.z.Status,
consumerCount=0, distance=1, filters=[], id=141,
idsHeaderName=_AMQ_ROUTE_TO$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
queueFilter=null, remoteQueueID=40, routingName=x.Provider.y.Agent.z.Status,
storeAndForwardQueue=QueueImpl[name=$.artemis.internal.sf.my-cluster.101f3a70-190a-11e9-aa5a-c4b301c8ad71,
postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@35241daa,
uniqueName=x.Provider.y.Agent.z.Status101f3a70-190a-11e9-aa5a-c4b301c8ad71]
*End*

However, during my test I am not getting binding for specific address when
producing a message.

= Produce Message to 1

*Start* LocalQueueBinding [address=x.Provider.y.Agent.z.Status,
queue=QueueImpl[name=x.Provider.y.Agent.z.Status, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@467cd269, filter=null, name=x.Provider.y.Agent.z.Status,
clusterName=x.Provider.y.Agent.z.Statusfac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*
*Start* DivertBinding [id=7, address=*.Provider.*.Agent.*.Status,
divert=DivertImpl [routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71,
uniqueName=notifications-divert, forwardAddress=Notifications,
exclusive=true, filter=null, transformer=null], filter=null,
uniqueName=notifications-divert,
routingName=fab02b2d-1909-11e9-846c-c4b301c8ad71, exclusive=true] *End*
*Start* LocalQueueBinding [address=x.Provider.y.Agent.z.Status,
queue=QueueImpl[name=x.Provider.y.Agent.z.Status, postOffice=PostOfficeImpl
[server=ActiveMQServerImpl::serverUUID=fac9cdae-1909-11e9-846c-c4b301c8ad71],
temp=false]@467cd269, filter=null, name=x.Provider.y.Agent.z.Status,
clusterName=x.Provider.y.Agent.z.Statusfac9cdae-1909-11e9-846c-c4b301c8ad71]
*End*

Could someone provide me some lights to figure out the issue? 

My Current tests:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import
org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import
org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.junit.Before;
import org.junit.Test;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

public class DuplicateMessageTest extends ClusterTestBase {

    private static final IntegrationTestLogger log =
IntegrationTestLogger.LOGGER;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();

        start();
    }

    private void start() throws Exception {
        setupServers();
    }

    @Override
    protected ActiveMQServer createServer(final boolean realFiles,
                                          final Configuration configuration,
                                          final long pageSize,
                                          final long maxAddressSize,
                                          final Map<String, AddressSettings>
settings) {
        DivertConfiguration divertConf = new
DivertConfiguration().setName("notifications-divert").setAddress("*.Provider.*.Agent.*.Status").setForwardingAddress("Notifications").setExclusive(true);

        configuration.addDivertConfiguration(divertConf);

        ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));

        if (settings != null) {
            for (Map.Entry<String, AddressSettings> setting :
settings.entrySet()) {
               
server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
            }
        }

        AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(pageSize).setRedeliveryDelay(0).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true).setAutoCreateJmsQueues(true).setAutoCreateJmsTopics(true);

        server.getAddressSettingsRepository().addMatch("#", defaultSetting);

        Set<Role> roles = new HashSet<>();
        Role role = new Role("amq", true, true, true, true, true, true,
true, true, true, true);

        roles.add(role);

        server.getSecurityRepository().addMatch("#", roles);

        ActiveMQJAASSecurityManager securityManager =
(ActiveMQJAASSecurityManager) server.getSecurityManager();

        securityManager.getConfiguration().addUser("admin", "admin");
        securityManager.getConfiguration().addRole("admin", "amq");

        server.getConfiguration().setSecurityEnabled(true);

        return server;
    }

    protected boolean isNetty() {
        return true;
    }


    @Override
    protected void setSessionFactoryCreateLocator(int node, boolean ha,
TransportConfiguration serverTotc) {
        super.setSessionFactoryCreateLocator(node, ha, serverTotc);

        locators[node].setConsumerWindowSize(0);

    }

    @Test
    public void testDuplicateMessages() throws Exception {
        setupCluster(MessageLoadBalancingType.ON_DEMAND);

        startServers(0, 1);

        setupSessionFactory(0, isNetty(), false, "admin", "admin");
        setupSessionFactory(1, isNetty(), false, "admin", "admin");

        createQueue(0, "Notifications", "Notifications", null, false,
"admin" , "admin", RoutingType.ANYCAST);
        createQueue(1, "Notifications", "Notifications", null, false,
"admin" , "admin", RoutingType.ANYCAST);

        addConsumer(0, 0, "Notifications", null, true, "admin", "admin");

        waitForBindings(0, "Notifications", 1, 1, true);
        waitForBindings(0, "Notifications", 1, 0, false);

        waitForBindings(1, "Notifications", 1, 0, true);
        waitForBindings(1, "Notifications", 1, 1, false);

        send(0, "x.Provider.y.Agent.z.Status", 1, false, null, "admin",
"admin");

        verifyReceiveAll(1, 0);

        send(1, "x.Provider.y.Agent.z.Status", 1, false, null, "admin",
"admin");

        verifyReceiveAll(1, 0);

        send(0, "x.Provider.y.Agent.z.Status", 1, false, null, "admin",
"admin");

        verifyReceiveAll(1, 0);

    }

    protected void setupCluster(final MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
        setupClusterConnection("cluster0", "", messageLoadBalancingType, 1,
isNetty(), 0, 1);

        setupClusterConnection("cluster1", "", messageLoadBalancingType, 1,
isNetty(), 1, 0);
    }

    protected void setupServers() throws Exception {
        setupServer(0, isFileStorage(), isNetty());
        setupServer(1, isFileStorage(), isNetty());
    }

    protected void stopServers() throws Exception {
        closeAllConsumers();

        closeAllSessionFactories();

        closeAllServerLocatorsFactories();

        stopServers(0, 1);

        clearServer(0, 1);
    }
}

Any help is appreciated



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Reply via email to