This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2992daa  ARTEMIS-2514 dupl cache leak w/clustered temp q
     new 49c96c6  This closes #2858
2992daa is described below

commit 2992daaeb13f5fe33ad5db28f7210692c8283c01
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Mon Oct 7 11:09:42 2019 -0500

    ARTEMIS-2514 dupl cache leak w/clustered temp q
---
 .../server/cluster/impl/ClusterConnectionImpl.java |   2 +-
 .../distribution/TemporaryJMSQueueClusterTest.java | 136 +++++++++++++++++++++
 2 files changed, 137 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index d6f34c9..7680780 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -1292,7 +1292,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
             return;
          }
 
-         postOffice.removeBinding(binding.getUniqueName(), null, false);
+         postOffice.removeBinding(binding.getUniqueName(), null, true);
       }
 
       private synchronized void resetBinding(final SimpleString clusterName) 
throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TemporaryJMSQueueClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TemporaryJMSQueueClusterTest.java
new file mode 100644
index 0000000..a69018c
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TemporaryJMSQueueClusterTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.junit.Test;
+
+public class TemporaryJMSQueueClusterTest extends ClusterTestBase {
+
+   @Test
+   public void testDuplicateCacheCleanupForTempQueues() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      
servers[0].getConfiguration().getClusterConfigurations().get(0).setDuplicateDetection(true);
+      servers[0].getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setRedistributionDelay(0));
+
+      setupClusterConnection("cluster1", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+      
servers[1].getConfiguration().getClusterConfigurations().get(0).setDuplicateDetection(true);
+      servers[1].getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setRedistributionDelay(0));
+
+      startServers(0, 1);
+
+      final Map<String, TextMessage> requestMap = new HashMap<>();
+      ConnectionFactory cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+
+      for (int j = 0; j < 10; j++) {
+         try (Connection connection = cf.createConnection()) {
+            SimpleMessageListener server = new SimpleMessageListener().start();
+            Queue requestQueue = ActiveMQJMSClient.createQueue("exampleQueue");
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(requestQueue);
+            TemporaryQueue replyQueue = session.createTemporaryQueue();
+            MessageConsumer replyConsumer = session.createConsumer(replyQueue);
+
+            int numMessages = 10;
+            for (int i = 0; i < numMessages; i++) {
+
+               TextMessage requestMsg = session.createTextMessage("A request 
message");
+               requestMsg.setJMSReplyTo(replyQueue);
+               producer.send(requestMsg);
+               requestMap.put(requestMsg.getJMSMessageID(), requestMsg);
+            }
+
+            for (int i = 0; i < numMessages; i++) {
+               TextMessage replyMessageReceived = (TextMessage) 
replyConsumer.receive();
+               
assertNotNull(requestMap.get(replyMessageReceived.getJMSCorrelationID()));
+            }
+
+            replyConsumer.close();
+            replyQueue.delete();
+            server.shutdown();
+         }
+
+      }
+
+      assertTrue(((PostOfficeImpl) 
servers[0].getPostOffice()).getDuplicateIDCaches().size() <= 1);
+      assertTrue(((PostOfficeImpl) 
servers[1].getPostOffice()).getDuplicateIDCaches().size() <= 1);
+
+   }
+
+   public boolean isNetty() {
+      return true;
+   }
+}
+
+class SimpleMessageListener implements MessageListener {
+
+   private Session session;
+   MessageProducer replyProducer;
+   MessageConsumer requestConsumer;
+   Connection connection = null;
+
+   public SimpleMessageListener start() throws Exception {
+      Queue requestQueue = ActiveMQJMSClient.createQueue("exampleQueue");
+      ConnectionFactory cf = new 
ActiveMQConnectionFactory("tcp://localhost:61617");
+      connection = cf.createConnection("guest", "guest");
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection.start();
+      replyProducer = session.createProducer(null);
+      requestConsumer = session.createConsumer(requestQueue);
+      requestConsumer.setMessageListener(this);
+      return this;
+   }
+
+   @Override
+   public void onMessage(final javax.jms.Message request) {
+      try {
+         Destination replyDestination = request.getJMSReplyTo();
+         TextMessage replyMessage = session.createTextMessage("A reply 
message");
+         replyMessage.setJMSCorrelationID(request.getJMSMessageID());
+         replyProducer.send(replyDestination, replyMessage);
+      } catch (JMSException e) {
+         e.printStackTrace();
+      }
+   }
+
+   public void shutdown() throws JMSException {
+      connection.close();
+   }
+}

Reply via email to