Repository: activemq-artemis
Updated Branches:
  refs/heads/master b0c83073e -> 07eb1d25b


ARTEMIS-1416 Queue is not autocreated if address already exists

- Fix on core and amqp.
- Add test to verify amqp's current large message behavior.
- Add test to openwire also just to verify.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f3ace6af
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f3ace6af
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f3ace6af

Branch: refs/heads/master
Commit: f3ace6afd726dc8e3c1c58c76e3fad3d5cfa357d
Parents: b0c8307
Author: Howard Gao <howard....@gmail.com>
Authored: Mon Oct 30 12:47:23 2017 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Fri Nov 3 18:25:23 2017 -0400

----------------------------------------------------------------------
 .../jms/client/ActiveMQMessageProducer.java     |  11 +-
 .../amqp/broker/AMQPSessionCallback.java        |  23 +--
 .../integration/amqp/QueueAutoCreationTest.java | 161 +++++++++++++++++++
 .../LargeMessageQueueAutoCreationTest.java      | 158 ++++++++++++++++++
 4 files changed, 343 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 3121a88..9f86e49 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -420,7 +420,16 @@ public class ActiveMQMessageProducer implements 
MessageProducer, QueueSender, To
                      throw new InvalidDestinationException("Destination " + 
address + " does not exist");
                   }
                } else {
-                  connection.addKnownDestination(address);
+                  ClientSession.QueueQuery queueQuery = 
clientSession.queueQuery(address);
+                  if (queueQuery.isExists()) {
+                     connection.addKnownDestination(address);
+                  } else if (destination.isQueue() && 
query.isAutoCreateQueues()) {
+                     if (destination.isTemporary()) {
+                        clientSession.createTemporaryQueue(address, 
RoutingType.ANYCAST, address);
+                     } else {
+                        clientSession.createQueue(address, 
RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), 
query.isDefaultPurgeOnNoConsumers());
+                     }
+                  }
                }
             } catch (ActiveMQQueueExistsException e) {
                // The queue was created by another client/admin between the 
query check and send create queue packet

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 19f6351..7a7a41e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -284,11 +284,14 @@ public class AMQPSessionCallback implements 
SessionCallback {
             // The address may have been created by another thread in the mean 
time.  Catch and do nothing.
          }
          bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
-      } else if (routingType == RoutingType.ANYCAST && 
!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
-         try {
-            serverSession.createQueue(simpleAddress, simpleAddress, 
routingType, null, false, true, true);
-         } catch (ActiveMQQueueExistsException e) {
-            // The queue may have been created by another thread in the mean 
time.  Catch and do nothing.
+      } else if (routingType == RoutingType.ANYCAST && 
bindingQueryResult.isAutoCreateQueues()) {
+         QueueQueryResult queueBinding = 
serverSession.executeQueueQuery(simpleAddress);
+         if (!queueBinding.isExists()) {
+            try {
+               serverSession.createQueue(simpleAddress, simpleAddress, 
routingType, null, false, true, true);
+            } catch (ActiveMQQueueExistsException e) {
+               // The queue may have been created by another thread in the 
mean time.  Catch and do nothing.
+            }
          }
          bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
       }
@@ -394,14 +397,16 @@ public class AMQPSessionCallback implements 
SessionCallback {
          message.setAddress(new SimpleString(address));
       } else {
          // Anonymous relay must set a To value
-         if (message.getAddress() == null) {
+         address = message.getAddress();
+         if (address == null) {
             rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' 
field for message sent to an anonymous producer");
             return;
          }
+      }
 
-         if (!bindingQuery(message.getAddress().toString(), 
RoutingType.ANYCAST)) {
-            throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
-         }
+      //here check queue-autocreation
+      if (!bindingQuery(address, RoutingType.ANYCAST)) {
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
       }
 
       OperationContext oldcontext = recoverContext();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
new file mode 100644
index 0000000..f6c8b22
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/QueueAutoCreationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Random;
+
+//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
+public class QueueAutoCreationTest extends JMSClientTestSupport {
+
+   Queue queue1;
+   Random random = new Random();
+   ActiveMQConnection testConn;
+   ClientSession clientSession;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      String randomSuffix = new BigInteger(130, random).toString(32);
+      testConn = (ActiveMQConnection)createCoreConnection();
+      clientSession = testConn.getSessionFactory().createSession();
+      queue1 = createQueue("queue1_" + randomSuffix);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      testConn.close();
+      super.tearDown();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE";
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
+      if (map.size() == 0) {
+         AddressSettings as = new AddressSettings();
+         map.put("#", as);
+      }
+      Map.Entry<String, AddressSettings> entry = 
map.entrySet().iterator().next();
+      AddressSettings settings = entry.getValue();
+      settings.setAutoCreateQueues(true);
+      System.out.println("server cofg, isauto? " + 
entry.getValue().isAutoCreateQueues());
+   }
+
+
+   protected Queue createQueue(final String queueName) throws Exception {
+      SimpleString address = SimpleString.toSimpleString(queueName);
+      clientSession.createAddress(address, RoutingType.ANYCAST, false);
+      return new ActiveMQQueue(queueName);
+   }
+
+   @Test(timeout = 30000)
+   public void testSmallString() throws Exception {
+      sendStringOfSize(1024, false);
+   }
+
+   @Test(timeout = 30000)
+   public void testHugeString() throws Exception {
+      //amqp doesn't support large message receive.
+      //using core to receive, it can verify
+      //that the large message is indeed stored in core
+      //via amqp send.
+      sendStringOfSize(1024 * 1024, true);
+   }
+
+   private void sendStringOfSize(int msgSize, boolean useCoreReceive) throws 
JMSException {
+
+      Connection conn = this.createConnection();
+
+      try {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue1);
+
+         TextMessage m = session.createTextMessage();
+
+         m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < msgSize) {
+            buffer.append(UUIDGenerator.getInstance().generateStringUUID());
+         }
+
+         final String originalString = buffer.toString();
+
+         m.setText(originalString);
+
+         prod.send(m);
+
+         conn.close();
+
+         if (useCoreReceive) {
+            conn = createCoreConnection();
+         } else {
+            conn = createConnection();
+         }
+
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         conn.start();
+
+         TextMessage rm = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(rm);
+
+         String str = rm.getText();
+         Assert.assertEquals(originalString, str);
+      } finally {
+         if (conn != null) {
+            conn.close();
+         }
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3ace6af/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
new file mode 100644
index 0000000..b3f224d
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/LargeMessageQueueAutoCreationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Random;
+
+//adapted from https://issues.apache.org/jira/browse/ARTEMIS-1416
+@RunWith(Parameterized.class)
+public class LargeMessageQueueAutoCreationTest extends BasicOpenWireTest {
+
+   Queue queue1;
+   Random random = new Random();
+   ActiveMQConnection testConn;
+   ClientSession clientSession;
+
+   @Parameterized.Parameter
+   public boolean usingCore;
+
+   @Parameterized.Parameters(name = "isCore={0}")
+   public static Collection<Object[]> params() {
+      return Arrays.asList(new Object[][]{{true}, {false}});
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      String randomSuffix = new BigInteger(130, random).toString(32);
+      testConn = (ActiveMQConnection)coreCf.createConnection();
+      clientSession = testConn.getSessionFactory().createSession();
+      queue1 = createCoreQueue("queue1_" + randomSuffix);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      testConn.close();
+      super.tearDown();
+   }
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      serverConfig.setJournalType(JournalType.NIO);
+      Map<String, AddressSettings> map = serverConfig.getAddressesSettings();
+      Map.Entry<String, AddressSettings> entry = 
map.entrySet().iterator().next();
+      AddressSettings settings = entry.getValue();
+      settings.setAutoCreateQueues(true);
+      System.out.println("server cofg, isauto? " + 
entry.getValue().isAutoCreateQueues());
+   }
+
+
+   protected Queue createCoreQueue(final String queueName) throws Exception {
+      SimpleString address = SimpleString.toSimpleString(queueName);
+      clientSession.createAddress(address, RoutingType.ANYCAST, false);
+      return new ActiveMQQueue(queueName);
+   }
+
+   @Test(timeout = 30000)
+   public void testSmallString() throws Exception {
+      sendStringOfSize(1024);
+   }
+
+   @Test(timeout = 30000)
+   public void testHugeString() throws Exception {
+      sendStringOfSize(1024 * 1024);
+   }
+
+   private void sendStringOfSize(int msgSize) throws JMSException {
+
+      ConnectionFactory factoryToUse = usingCore ? coreCf : factory;
+
+      Connection conn = factoryToUse.createConnection();
+
+      try {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue1);
+
+         TextMessage m = session.createTextMessage();
+
+         m.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < msgSize) {
+            buffer.append(UUIDGenerator.getInstance().generateStringUUID());
+         }
+
+         final String originalString = buffer.toString();
+
+         m.setText(originalString);
+
+         prod.send(m);
+
+         conn.close();
+
+         conn = factoryToUse.createConnection();
+
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         conn.start();
+
+         TextMessage rm = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(rm);
+
+         String str = rm.getText();
+         Assert.assertEquals(originalString, str);
+      } finally {
+         if (conn != null) {
+            conn.close();
+         }
+      }
+   }
+}
\ No newline at end of file

Reply via email to