Repository: activemq-artemis
Updated Branches:
  refs/heads/master f08f24645 -> 8d343ade1


ARTEMIS-1695 - Improve STOMP compatiblity with 5.x clients

Also make sure on authentication error in version 1.0 a client will
disconnect


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2ea0dce6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2ea0dce6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2ea0dce6

Branch: refs/heads/master
Commit: 2ea0dce6c7103e87676b8baed022a2b681a0484e
Parents: f08f246
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Thu Feb 22 10:28:57 2018 -0500
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Thu Feb 22 16:22:40 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/protocol/stomp/Stomp.java      |  15 +++
 .../stomp/VersionedStompFrameHandler.java       |  11 +-
 .../stomp/v10/StompFrameHandlerV10.java         |  56 ++++++----
 .../stomp/v11/StompFrameHandlerV11.java         |  11 +-
 .../integration/plugin/StompPluginTest.java     | 106 ++++++++++++++-----
 .../tests/integration/stomp/StompTest.java      |  61 +++++++++++
 .../tests/integration/stomp/StompTestBase.java  |  40 ++++++-
 7 files changed, 243 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index f1000cc..fa02ee3 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -152,10 +152,20 @@ public interface Stomp {
 
          String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
 
+         /**
+          * Backwards compatibility for STOMP clients that were using 5.x
+          */
+         String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = 
"activemq.subscriptionName";
+
          String SUBSCRIPTION_TYPE = "subscription-type";
 
          String NO_LOCAL = "no-local";
 
+         /**
+          * Backwards compatibility for STOMP clients that were using 5.x
+          */
+         String ACTIVEMQ_NO_LOCAL = "activemq.noLocal";
+
          public interface AckModeValues {
 
             String AUTO = "auto";
@@ -176,6 +186,11 @@ public interface Stomp {
          String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
 
          String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+
+         /**
+          * Backwards compatibility for STOMP clients that were using 5.x
+          */
+         String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = 
"activemq.subscriptionName";
       }
 
       interface Connect {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 2259a17..3cb5ab8 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
+import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
+
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -30,10 +31,9 @@ import 
org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
 import 
org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
 import 
org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
 import 
org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
-import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
-
 public abstract class VersionedStompFrameHandler {
 
    protected StompConnection connection;
@@ -266,10 +266,15 @@ public abstract class VersionedStompFrameHandler {
       if (durableSubscriptionName == null) {
          durableSubscriptionName = 
frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
       }
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = 
frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
+      }
       RoutingType routingType = 
getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), 
frame.getHeader(Headers.Subscribe.DESTINATION));
       boolean noLocal = false;
       if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
          noLocal = 
Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+      } else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) {
+         noLocal = 
Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
       }
       return connection.subscribe(destination, selector, ack, id, 
durableSubscriptionName, noLocal, routingType);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
index e632c2b..a6785b7 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp.v10;
 
+import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
+
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -30,8 +32,6 @@ import 
org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
-import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
-
 public class StompFrameHandlerV10 extends VersionedStompFrameHandler 
implements FrameEventListener {
 
    public StompFrameHandlerV10(StompConnection connection,
@@ -52,27 +52,36 @@ public class StompFrameHandlerV10 extends 
VersionedStompFrameHandler implements
       String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
-      connection.setClientID(clientID);
-      if (connection.validateUser(login, passcode, connection)) {
-         connection.setValid(true);
-
-         response = new StompFrameV10(Stomp.Responses.CONNECTED);
-
-         if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
-            response.addHeader(Stomp.Headers.Connected.VERSION, 
StompVersions.V1_0.toString());
-         }
-
-         response.addHeader(Stomp.Headers.Connected.SESSION, 
connection.getID().toString());
-
-         if (requestID != null) {
-            response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+      try {
+         connection.setClientID(clientID);
+         if (connection.validateUser(login, passcode, connection)) {
+            connection.setValid(true);
+
+            // Create session after validating user - this will cache the 
session in the
+            // protocol manager
+            connection.getSession();
+
+            response = new StompFrameV10(Stomp.Responses.CONNECTED);
+
+            if (frame.hasHeader(Stomp.Headers.ACCEPT_VERSION)) {
+               response.addHeader(Stomp.Headers.Connected.VERSION, 
StompVersions.V1_0.toString());
+            }
+
+            response.addHeader(Stomp.Headers.Connected.SESSION, 
connection.getID().toString());
+
+            if (requestID != null) {
+               response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, 
requestID);
+            }
+         } else {
+            // not valid
+            response = new StompFrameV10(Stomp.Responses.ERROR);
+            String responseText = "Security Error occurred: User name [" + 
login + "] or password is invalid";
+            response.setBody(responseText);
+            response.setNeedsDisconnect(true);
+            response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
          }
-      } else {
-         //not valid
-         response = new StompFrameV10(Stomp.Responses.ERROR);
-         String responseText = "Security Error occurred: User name [" + login 
+ "] or password is invalid";
-         response.setBody(responseText);
-         response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
+      } catch (ActiveMQStompException e) {
+         response = e.getFrame();
       }
       return response;
    }
@@ -91,6 +100,9 @@ public class StompFrameHandlerV10 extends 
VersionedStompFrameHandler implements
       if (durableSubscriptionName == null) {
          durableSubscriptionName = 
request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
       }
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = 
request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
+      }
 
       String subscriptionID = null;
       if (id != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 8991898..67fb34b 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp.v11;
 
+import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
+
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,8 +39,6 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 
-import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
-
 public class StompFrameHandlerV11 extends VersionedStompFrameHandler 
implements FrameEventListener {
 
    protected static final char ESC_CHAR = '\\';
@@ -72,6 +72,10 @@ public class StompFrameHandlerV11 extends 
VersionedStompFrameHandler implements
          if (connection.validateUser(login, passcode, connection)) {
             connection.setValid(true);
 
+            // Create session after validating user - this will cache the 
session in the
+            // protocol manager
+            connection.getSession();
+
             response = this.createStompFrame(Stomp.Responses.CONNECTED);
 
             // version
@@ -154,6 +158,9 @@ public class StompFrameHandlerV11 extends 
VersionedStompFrameHandler implements
       if (durableSubscriptionName == null) {
          durableSubscriptionName = 
request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
       }
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = 
request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
+      }
 
       String subscriptionID = null;
       if (id != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
index f527e79..e426e3a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
@@ -49,12 +49,24 @@ import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
 import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
 
 import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
 import 
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -65,6 +77,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runners.Parameterized;
 
 public class StompPluginTest extends StompTestBase {
 
@@ -73,6 +86,11 @@ public class StompPluginTest extends StompTestBase {
 
    private StompClientConnectionV12 conn;
 
+   @Parameterized.Parameters(name = "{0}")
+   public static Collection<Object[]> data() {
+      return Arrays.asList(new Object[][]{{"ws+v12.stomp"}, 
{"tcp+v12.stomp"}});
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -96,57 +114,71 @@ public class StompPluginTest extends StompTestBase {
 
    private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
    private final MethodCalledVerifier verifier = new 
MethodCalledVerifier(methodCalls);
+   private final AtomicBoolean stompBeforeCreateSession = new AtomicBoolean();
+   private final AtomicBoolean stompBeforeRemoveSession = new AtomicBoolean();
 
    @Override
    protected JMSServerManager createServer() throws Exception {
       JMSServerManager server = super.createServer();
       server.getActiveMQServer().registerBrokerPlugin(verifier);
+      server.getActiveMQServer().registerBrokerPlugin(new 
ActiveMQServerPlugin() {
+
+         @Override
+         public void beforeCreateSession(String name, String username, int 
minLargeMessageSize,
+               RemotingConnection connection, boolean autoCommitSends, boolean 
autoCommitAcks, boolean preAcknowledge,
+               boolean xa, String defaultAddress, SessionCallback callback, 
boolean autoCreateQueues,
+               OperationContext context, Map<SimpleString, RoutingType> 
prefixes) throws ActiveMQException {
+
+            if (connection instanceof StompConnection) {
+               stompBeforeCreateSession.set(true);
+            }
+         }
+
+         @Override
+         public void beforeCloseSession(ServerSession session, boolean failed) 
throws ActiveMQException {
+            if (session.getRemotingConnection() instanceof StompConnection) {
+               stompBeforeRemoveSession.set(true);
+            }
+         }
+      });
       return server;
    }
 
    @Test
    public void testSendAndReceive() throws Exception {
 
-      // subscribe
-      //StompClientConnection newConn = 
StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
-      try {
-         URI uri = new URI("ws+v12.stomp://localhost:61613");
-         StompClientConnection newConn = 
StompClientConnectionFactory.createClientConnection(uri);
-         newConn.connect(defUser, defPass);
-         subscribe(newConn, "a-sub");
-
-         send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello 
World 1!");
-         ClientStompFrame frame = newConn.receiveFrame();
-
-         System.out.println("received " + frame);
-         Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      URI uri = new URI(scheme + "://localhost:61613");
+      StompClientConnection newConn = 
StompClientConnectionFactory.createClientConnection(uri);
+      newConn.connect(defUser, defPass);
+      subscribe(newConn, "a-sub");
 
-         verifier.validatePluginMethodsAtLeast(1, MESSAGE_ACKED, BEFORE_SEND, 
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
-                                               AFTER_DELIVER);
+      send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello 
World 1!");
+      ClientStompFrame frame = newConn.receiveFrame();
 
+      System.out.println("received " + frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-         // unsub
-         unsubscribe(newConn, "a-sub");
+      verifier.validatePluginMethodsAtLeast(1, MESSAGE_ACKED, BEFORE_SEND, 
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+                                            AFTER_DELIVER);
 
-         newConn.disconnect();
 
-         verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, BEFORE_REMOVE_BINDING, 
AFTER_REMOVE_BINDING);
-         verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
-                                               AFTER_CREATE_SESSION, 
BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
-                                               AFTER_CREATE_CONSUMER, 
BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, 
AFTER_CREATE_QUEUE,
-                                               MESSAGE_ACKED, BEFORE_SEND, 
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
-                                               AFTER_DELIVER, 
BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING);
+      // unsub
+      unsubscribe(newConn, "a-sub");
 
-      } catch (Throwable e) {
-         fail(e.getMessage());
-      }
+      newConn.disconnect();
 
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, BEFORE_REMOVE_BINDING, 
AFTER_REMOVE_BINDING);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+                                            AFTER_CREATE_SESSION, 
BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
+                                            AFTER_CREATE_CONSUMER, 
BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, 
AFTER_CREATE_QUEUE,
+                                            MESSAGE_ACKED, BEFORE_SEND, 
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+                                            AFTER_DELIVER, BEFORE_ADD_ADDRESS, 
AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING);
    }
 
    @Test
    public void testStompAutoCreateAddress() throws Exception {
 
-      URI uri = new URI("ws+v12.stomp://localhost:61613");
+      URI uri = new URI(scheme + "://localhost:61613");
       StompClientConnection newConn = 
StompClientConnectionFactory.createClientConnection(uri);
       newConn.connect(defUser, defPass);
 
@@ -161,4 +193,22 @@ public class StompPluginTest extends StompTestBase {
 
    }
 
+   @Test
+   public void testConnect() throws Exception {
+
+      URI uri = new URI(scheme + "://localhost:61613");
+      StompClientConnection newConn = 
StompClientConnectionFactory.createClientConnection(uri);
+      newConn.connect(defUser, defPass);
+
+      //Make sure session is created on connect
+      assertTrue(stompBeforeCreateSession.get());
+
+      newConn.disconnect();
+
+      Thread.sleep(500);
+
+      //Make sure session is removed on disconnect
+      assertTrue(stompBeforeRemoveSession.get());
+
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 3d36d3f..e6be06d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1216,6 +1216,35 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testDurableSubscriberWithReconnectionLegacy() throws Exception {
+      conn.connect(defUser, defPass, "myclientid");
+      subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
+
+      conn.disconnect();
+
+      Thread.sleep(500);
+
+      // send the message when the durable subscriber is disconnected
+      sendJmsMessage(getName(), topic);
+
+      conn.destroy();
+      conn = StompClientConnectionFactory.createClientConnection(uri);
+      conn.connect(defUser, defPass, "myclientid");
+
+      subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
+
+      ClientStompFrame frame = conn.receiveFrame(3000);
+      assertNotNull("Should have received a message from the durable 
subscription", frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getTopicPrefix() + getTopicName(), 
frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      unsubscribeLegacyActiveMQ(conn, null, getTopicPrefix() + getTopicName(), 
true, true);
+
+      conn.disconnect();
+   }
+
+   @Test
    public void testDurableSubscriber() throws Exception {
       conn.connect(defUser, defPass, "myclientid");
       subscribeTopic(conn, null, null, getName(), true);
@@ -1228,6 +1257,18 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testDurableSubscriberLegacySubscriptionHeader() throws 
Exception {
+      conn.connect(defUser, defPass, "myclientid");
+      subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
+      ClientStompFrame response = subscribeTopicLegacyActiveMQ(conn, null, 
null, getName(), true, false);
+
+      // creating a subscriber with the same durable-subscriber-name must fail
+      Assert.assertEquals(Stomp.Responses.ERROR, response.getCommand());
+
+      conn.disconnect();
+   }
+
+   @Test
    public void testDurableUnSubscribe() throws Exception {
       conn.connect(defUser, defPass, "myclientid");
       subscribeTopic(conn, null, null, getName(), true);
@@ -1248,6 +1289,26 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testDurableUnSubscribeLegacySubscriptionHeader() throws 
Exception {
+      conn.connect(defUser, defPass, "myclientid");
+      subscribeTopicLegacyActiveMQ(conn, null, null, getName(), true, false);
+      conn.disconnect();
+      Thread.sleep(500);
+
+      
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid."
 + getName())));
+
+      conn.destroy();
+      conn = StompClientConnectionFactory.createClientConnection(uri);
+
+      conn.connect(defUser, defPass, "myclientid");
+      unsubscribeLegacyActiveMQ(conn, getName(), getTopicPrefix() + 
getTopicName(), false, true);
+      conn.disconnect();
+      Thread.sleep(500);
+
+      
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid."
 + getName())));
+   }
+
+   @Test
    public void testSubscribeToTopicWithNoLocal() throws Exception {
       conn.connect(defUser, defPass);
       subscribeTopic(conn, null, null, null, true, true);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ea0dce6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index ec166dd..2b1302f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -442,9 +442,28 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
    }
 
    public static ClientStompFrame subscribeTopic(StompClientConnection conn,
+         String subscriptionId,
+         String ack,
+         String durableId,
+         boolean receipt,
+         boolean noLocal) throws IOException, InterruptedException {
+      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
+   }
+
+   public static ClientStompFrame 
subscribeTopicLegacyActiveMQ(StompClientConnection conn,
+         String subscriptionId,
+         String ack,
+         String durableId,
+         boolean receipt,
+         boolean noLocal) throws IOException, InterruptedException {
+      return subscribeTopic(conn, subscriptionId, ack, durableId, 
Stomp.Headers.Subscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, receipt, noLocal);
+   }
+
+   public static ClientStompFrame subscribeTopic(StompClientConnection conn,
                                           String subscriptionId,
                                           String ack,
                                           String durableId,
+                                          String durableIdHeader,
                                           boolean receipt,
                                           boolean noLocal) throws IOException, 
InterruptedException {
       ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
@@ -457,7 +476,7 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
          frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack);
       }
       if (durableId != null) {
-         frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, 
durableId);
+         frame.addHeader(durableIdHeader, durableId);
       }
       String uuid = UUID.randomUUID().toString();
       if (receipt) {
@@ -492,13 +511,30 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
    }
 
    public static ClientStompFrame unsubscribe(StompClientConnection conn,
+         String subscriptionId,
+         String destination,
+         boolean receipt,
+         boolean durable) throws IOException, InterruptedException {
+      return unsubscribe(conn, subscriptionId, 
Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, destination, receipt, 
durable);
+   }
+
+   public static ClientStompFrame 
unsubscribeLegacyActiveMQ(StompClientConnection conn,
+         String subscriptionId,
+         String destination,
+         boolean receipt,
+         boolean durable) throws IOException, InterruptedException {
+      return unsubscribe(conn, subscriptionId, 
Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME, destination, 
receipt, durable);
+   }
+
+   public static ClientStompFrame unsubscribe(StompClientConnection conn,
                                        String subscriptionId,
+                                       String subscriptionIdHeader,
                                        String destination,
                                        boolean receipt,
                                        boolean durable) throws IOException, 
InterruptedException {
       ClientStompFrame frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
       if (durable && subscriptionId != null) {
-         frame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, 
subscriptionId);
+         frame.addHeader(subscriptionIdHeader, subscriptionId);
       } else if (!durable && subscriptionId != null) {
          frame.addHeader(Stomp.Headers.Unsubscribe.ID, subscriptionId);
       }

Reply via email to