ARTEMIS-1333 SendACK listener fix

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fabc0701
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fabc0701
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fabc0701

Branch: refs/heads/master
Commit: fabc0701a38628ffa8f8d9959cc5ec64c6c3cb10
Parents: 96c6268
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Tue Aug 8 22:48:25 2017 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400

----------------------------------------------------------------------
 .../protocol/core/ServerSessionPacketHandler.java | 15 ++++++++-------
 .../protocol/core/impl/ActiveMQPacketHandler.java |  5 ++++-
 .../protocol/core/impl/CoreSessionCallback.java   | 18 ++++++++++++++++++
 .../core/server/impl/ServerSessionImpl.java       |  5 +++++
 .../spi/core/protocol/SessionCallback.java        |  4 ++++
 .../integration/jms/client/ReceiveNoWaitTest.java |  2 +-
 6 files changed, 40 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 88a6c2c..87ac615 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -95,7 +94,7 @@ import 
org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.SimpleFuture;
 import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.apache.activemq.artemis.utils.actors.Actor;
-import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.jboss.logging.Logger;
 
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
@@ -150,7 +149,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private final Actor<Packet> packetActor;
 
-   private final Executor callExecutor;
+   private final ArtemisExecutor callExecutor;
 
    private final CoreProtocolManager manager;
 
@@ -214,19 +213,20 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
    public void connectionFailed(final ActiveMQException exception, boolean 
failedOver) {
       ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
 
+      flushExecutor();
+
       try {
          session.close(true);
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.errorClosingSession(e);
       }
-      flushExecutor();
 
       ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
    }
 
-   private void flushExecutor() {
+   public void flushExecutor() {
       packetActor.flush();
-      OrderedExecutorFactory.flushExecutor(callExecutor);
+      callExecutor.flush();
    }
 
    public void close() {
@@ -247,7 +247,6 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    @Override
    public void handlePacket(final Packet packet) {
-      channel.confirm(packet);
 
       // This method will call onMessagePacket through an actor
       packetActor.act(packet);
@@ -838,6 +837,8 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                                      final boolean flush,
                                      final boolean closeChannel) {
       if (confirmPacket != null) {
+         channel.confirm(confirmPacket);
+
          if (flush) {
             channel.flushConfirmations();
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index c9cc926..cefd10c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -167,10 +167,13 @@ public class ActiveMQPacketHandler implements 
ChannelHandler {
             routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, 
RoutingType.MULTICAST);
          }
 
-         ServerSession session = server.createSession(request.getName(), 
activeMQPrincipal == null ? request.getUsername() : 
activeMQPrincipal.getUserName(), activeMQPrincipal == null ? 
request.getPassword() : activeMQPrincipal.getPassword(), 
request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), 
request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), 
request.getDefaultAddress(), new CoreSessionCallback(request.getName(), 
protocolManager, channel, connection), true, sessionOperationContext, 
routingTypeMap);
+         CoreSessionCallback sessionCallback = new 
CoreSessionCallback(request.getName(), protocolManager, channel, connection);
+
+         ServerSession session = server.createSession(request.getName(), 
activeMQPrincipal == null ? request.getUsername() : 
activeMQPrincipal.getUserName(), activeMQPrincipal == null ? 
request.getPassword() : activeMQPrincipal.getPassword(), 
request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), 
request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), 
request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, 
routingTypeMap);
 
          ServerSessionPacketHandler handler = new 
ServerSessionPacketHandler(server, protocolManager, session, 
server.getStorageManager(), channel);
          channel.setHandler(handler);
+         sessionCallback.setSessionHandler(handler);
 
          // TODO - where is this removed?
          protocolManager.addSessionHandler(request.getName(), handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 542d726..866130b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import 
org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
@@ -44,6 +45,8 @@ public final class CoreSessionCallback implements 
SessionCallback {
 
    private String name;
 
+   private ServerSessionPacketHandler handler;
+
    public CoreSessionCallback(String name,
                               ProtocolManager protocolManager,
                               Channel channel,
@@ -54,6 +57,21 @@ public final class CoreSessionCallback implements 
SessionCallback {
       this.connection = connection;
    }
 
+   public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler 
handler) {
+      this.handler = handler;
+      return this;
+   }
+
+   @Override
+   public void close(boolean failed) {
+      ServerSessionPacketHandler localHandler = handler;
+      if (failed && localHandler != null) {
+         // We wait any pending tasks before we make this as closed
+         localHandler.flushExecutor();
+      }
+      this.handler = null;
+   }
+
    @Override
    public boolean isWritable(ReadyListener callback, Object protocolContext) {
       return connection.isWritable(callback);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index f3617c1..1661ab2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -345,6 +345,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    protected void doClose(final boolean failed) throws Exception {
+      callback.close(failed);
       synchronized (this) {
          if (!closed) {
             server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> 
plugin.beforeCloseSession(this, failed) : null);
@@ -1238,6 +1239,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    public void close(final boolean failed) {
       if (closed)
          return;
+
+      if (failed) {
+
+      }
       context.executeOnCompletion(new IOCallback() {
          @Override
          public void onError(int errorCode, String errorMessage) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index edfb5dc..ae1612f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -89,4 +89,8 @@ public interface SessionCallback {
     * Some protocols (Openwire) needs a special message with the browser is 
finished.
     */
    void browserFinished(ServerConsumer consumer);
+
+   default void close(boolean failed) {
+
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fabc0701/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
index a426948..6114f49 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ReceiveNoWaitTest.java
@@ -52,7 +52,7 @@ public class ReceiveNoWaitTest extends JMSTestBase {
    public void testReceiveNoWait() throws Exception {
       assertNotNull(queue);
 
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < 1000; i++) {
          Connection connection = cf.createConnection();
 
          Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);

Reply via email to