ARTEMIS-1333 Fix SendACK

(fix copied manually from master.. not possible to cherry-pick)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e5ef406e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e5ef406e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e5ef406e

Branch: refs/heads/1.x
Commit: e5ef406e4e4fbb03b2ce1afdf19d7e9aff8367f4
Parents: 9724571
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Thu Aug 10 22:24:21 2017 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Aug 10 22:27:57 2017 -0400

----------------------------------------------------------------------
 .../core/ServerSessionPacketHandler.java        | 621 ++++++++++---------
 .../core/impl/ActiveMQPacketHandler.java        |   7 +-
 .../protocol/core/impl/CoreSessionCallback.java |  18 +
 .../core/server/impl/ServerSessionImpl.java     |   1 +
 .../spi/core/protocol/SessionCallback.java      |   6 +-
 .../ActiveMQServerControlUsingCoreTest.java     |   7 +
 6 files changed, 354 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 5dbb4f1..c3fc01d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -20,6 +20,7 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -133,6 +134,8 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private final boolean direct;
 
+   private static final ThreadLocal<AtomicBoolean> inHandler = 
ThreadLocal.withInitial(AtomicBoolean::new);
+
    private final Executor callExecutor;
 
    public ServerSessionPacketHandler(final Executor callExecutor,
@@ -170,18 +173,21 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
    public void connectionFailed(final ActiveMQException exception, boolean 
failedOver) {
       ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
 
+      flushExecutor();
+
       try {
          session.close(true);
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.errorClosingSession(e);
       }
-      flushExecutor();
 
       ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
    }
 
-   private void flushExecutor() {
-      OrderedExecutorFactory.flushExecutor(callExecutor);
+   public void flushExecutor() {
+      if (!inHandler.get().get()) {
+         OrderedExecutorFactory.flushExecutor(callExecutor);
+      }
    }
 
    public void close() {
@@ -202,345 +208,350 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    @Override
    public void handlePacket(final Packet packet) {
-      channel.confirm(packet);
       callExecutor.execute(() -> internalHandlePacket(packet));
    }
 
    private void internalHandlePacket(final Packet packet) {
-      byte type = packet.getType();
 
-      storageManager.setContext(session.getSessionContext());
+      inHandler.get().set(true);
+      try {
+         byte type = packet.getType();
 
-      Packet response = null;
-      boolean flush = false;
-      boolean closeChannel = false;
-      boolean requiresResponse = false;
+         storageManager.setContext(session.getSessionContext());
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
-      }
+         Packet response = null;
+         boolean flush = false;
+         boolean closeChannel = false;
+         boolean requiresResponse = false;
+
+         if (logger.isTraceEnabled()) {
+            logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
+         }
 
-      try {
          try {
-            switch (type) {
-               case SESS_CREATECONSUMER: {
-                  SessionCreateConsumerMessage request = 
(SessionCreateConsumerMessage) packet;
-                  requiresResponse = request.isRequiresResponse();
-                  session.createConsumer(request.getID(), 
request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
-                  if (requiresResponse) {
-                     // We send back queue information on the queue as a 
response- this allows the queue to
-                     // be automatically recreated on failover
-                     QueueQueryResult queueQueryResult = 
session.executeQueueQuery(request.getQueueName());
+            try {
+               switch (type) {
+                  case SESS_CREATECONSUMER: {
+                     SessionCreateConsumerMessage request = 
(SessionCreateConsumerMessage) packet;
+                     requiresResponse = request.isRequiresResponse();
+                     session.createConsumer(request.getID(), 
request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
+                     if (requiresResponse) {
+                        // We send back queue information on the queue as a 
response- this allows the queue to
+                        // be automatically recreated on failover
+                        QueueQueryResult queueQueryResult = 
session.executeQueueQuery(request.getQueueName());
+                        if 
(channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
+                           response = new 
SessionQueueQueryResponseMessage_V2(queueQueryResult);
+                        } else {
+                           response = new 
SessionQueueQueryResponseMessage(queueQueryResult);
+                        }
+                     }
+
+                     break;
+                  }
+                  case CREATE_QUEUE: {
+                     CreateQueueMessage request = (CreateQueueMessage) packet;
+                     requiresResponse = request.isRequiresResponse();
+                     session.createQueue(request.getAddress(), 
request.getQueueName(), request.getFilterString(), request.isTemporary(), 
request.isDurable());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case CREATE_SHARED_QUEUE: {
+                     CreateSharedQueueMessage request = 
(CreateSharedQueueMessage) packet;
+                     requiresResponse = request.isRequiresResponse();
+                     session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.isDurable(), request.getFilterString());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case DELETE_QUEUE: {
+                     requiresResponse = true;
+                     SessionDeleteQueueMessage request = 
(SessionDeleteQueueMessage) packet;
+                     session.deleteQueue(request.getQueueName());
+                     response = new NullResponseMessage();
+                     break;
+                  }
+                  case SESS_QUEUEQUERY: {
+                     requiresResponse = true;
+                     SessionQueueQueryMessage request = 
(SessionQueueQueryMessage) packet;
+                     QueueQueryResult result = 
session.executeQueueQuery(request.getQueueName());
                      if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) 
{
-                        response = new 
SessionQueueQueryResponseMessage_V2(queueQueryResult);
+                        response = new 
SessionQueueQueryResponseMessage_V2(result);
                      } else {
-                        response = new 
SessionQueueQueryResponseMessage(queueQueryResult);
+                        response = new 
SessionQueueQueryResponseMessage(result);
                      }
+                     break;
                   }
-
-                  break;
-               }
-               case CREATE_QUEUE: {
-                  CreateQueueMessage request = (CreateQueueMessage) packet;
-                  requiresResponse = request.isRequiresResponse();
-                  session.createQueue(request.getAddress(), 
request.getQueueName(), request.getFilterString(), request.isTemporary(), 
request.isDurable());
-                  if (requiresResponse) {
+                  case SESS_BINDINGQUERY: {
+                     requiresResponse = true;
+                     SessionBindingQueryMessage request = 
(SessionBindingQueryMessage) packet;
+                     BindingQueryResult result = 
session.executeBindingQuery(request.getAddress());
+                     if 
(channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
+                        response = new 
SessionBindingQueryResponseMessage_V3(result.isExists(), 
result.getQueueNames(), result.isAutoCreateJmsQueues(), 
result.isAutoCreateJmsTopics());
+                     } else if 
(channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
+                        response = new 
SessionBindingQueryResponseMessage_V2(result.isExists(), 
result.getQueueNames(), result.isAutoCreateJmsQueues());
+                     } else {
+                        response = new 
SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
+                     }
+                     break;
+                  }
+                  case SESS_ACKNOWLEDGE: {
+                     SessionAcknowledgeMessage message = 
(SessionAcknowledgeMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.acknowledge(message.getConsumerID(), 
message.getMessageID());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_EXPIRED: {
+                     SessionExpireMessage message = (SessionExpireMessage) 
packet;
+                     session.expire(message.getConsumerID(), 
message.getMessageID());
+                     break;
+                  }
+                  case SESS_COMMIT: {
+                     requiresResponse = true;
+                     session.commit();
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case CREATE_SHARED_QUEUE: {
-                  CreateSharedQueueMessage request = 
(CreateSharedQueueMessage) packet;
-                  requiresResponse = request.isRequiresResponse();
-                  session.createSharedQueue(request.getAddress(), 
request.getQueueName(), request.isDurable(), request.getFilterString());
-                  if (requiresResponse) {
+                  case SESS_ROLLBACK: {
+                     requiresResponse = true;
+                     session.rollback(((RollbackMessage) 
packet).isConsiderLastMessageAsDelivered());
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case DELETE_QUEUE: {
-                  requiresResponse = true;
-                  SessionDeleteQueueMessage request = 
(SessionDeleteQueueMessage) packet;
-                  session.deleteQueue(request.getQueueName());
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_QUEUEQUERY: {
-                  requiresResponse = true;
-                  SessionQueueQueryMessage request = 
(SessionQueueQueryMessage) packet;
-                  QueueQueryResult result = 
session.executeQueueQuery(request.getQueueName());
-                  if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
-                     response = new 
SessionQueueQueryResponseMessage_V2(result);
-                  } else {
-                     response = new SessionQueueQueryResponseMessage(result);
+                  case SESS_XA_COMMIT: {
+                     requiresResponse = true;
+                     SessionXACommitMessage message = (SessionXACommitMessage) 
packet;
+                     session.xaCommit(message.getXid(), message.isOnePhase());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
                   }
-                  break;
-               }
-               case SESS_BINDINGQUERY: {
-                  requiresResponse = true;
-                  SessionBindingQueryMessage request = 
(SessionBindingQueryMessage) packet;
-                  BindingQueryResult result = 
session.executeBindingQuery(request.getAddress());
-                  if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
-                     response = new 
SessionBindingQueryResponseMessage_V3(result.isExists(), 
result.getQueueNames(), result.isAutoCreateJmsQueues(), 
result.isAutoCreateJmsTopics());
-                  } else if 
(channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
-                     response = new 
SessionBindingQueryResponseMessage_V2(result.isExists(), 
result.getQueueNames(), result.isAutoCreateJmsQueues());
-                  } else {
-                     response = new 
SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
+                  case SESS_XA_END: {
+                     requiresResponse = true;
+                     SessionXAEndMessage message = (SessionXAEndMessage) 
packet;
+                     session.xaEnd(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
                   }
-                  break;
-               }
-               case SESS_ACKNOWLEDGE: {
-                  SessionAcknowledgeMessage message = 
(SessionAcknowledgeMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.acknowledge(message.getConsumerID(), 
message.getMessageID());
-                  if (requiresResponse) {
+                  case SESS_XA_FORGET: {
+                     requiresResponse = true;
+                     SessionXAForgetMessage message = (SessionXAForgetMessage) 
packet;
+                     session.xaForget(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_JOIN: {
+                     requiresResponse = true;
+                     SessionXAJoinMessage message = (SessionXAJoinMessage) 
packet;
+                     session.xaJoin(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_RESUME: {
+                     requiresResponse = true;
+                     SessionXAResumeMessage message = (SessionXAResumeMessage) 
packet;
+                     session.xaResume(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_ROLLBACK: {
+                     requiresResponse = true;
+                     SessionXARollbackMessage message = 
(SessionXARollbackMessage) packet;
+                     session.xaRollback(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_START: {
+                     requiresResponse = true;
+                     SessionXAStartMessage message = (SessionXAStartMessage) 
packet;
+                     session.xaStart(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_FAILED: {
+                     requiresResponse = true;
+                     SessionXAAfterFailedMessage message = 
(SessionXAAfterFailedMessage) packet;
+                     session.xaFailed(message.getXid());
+                     // no response on this case
+                     break;
+                  }
+                  case SESS_XA_SUSPEND: {
+                     requiresResponse = true;
+                     session.xaSuspend();
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_PREPARE: {
+                     requiresResponse = true;
+                     SessionXAPrepareMessage message = 
(SessionXAPrepareMessage) packet;
+                     session.xaPrepare(message.getXid());
+                     response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
+                     break;
+                  }
+                  case SESS_XA_INDOUBT_XIDS: {
+                     requiresResponse = true;
+                     List<Xid> xids = session.xaGetInDoubtXids();
+                     response = new 
SessionXAGetInDoubtXidsResponseMessage(xids);
+                     break;
+                  }
+                  case SESS_XA_GET_TIMEOUT: {
+                     requiresResponse = true;
+                     int timeout = session.xaGetTimeout();
+                     response = new 
SessionXAGetTimeoutResponseMessage(timeout);
+                     break;
+                  }
+                  case SESS_XA_SET_TIMEOUT: {
+                     requiresResponse = true;
+                     SessionXASetTimeoutMessage message = 
(SessionXASetTimeoutMessage) packet;
+                     session.xaSetTimeout(message.getTimeoutSeconds());
+                     response = new SessionXASetTimeoutResponseMessage(true);
+                     break;
+                  }
+                  case SESS_START: {
+                     session.start();
+                     break;
+                  }
+                  case SESS_STOP: {
+                     requiresResponse = true;
+                     session.stop();
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case SESS_EXPIRED: {
-                  SessionExpireMessage message = (SessionExpireMessage) packet;
-                  session.expire(message.getConsumerID(), 
message.getMessageID());
-                  break;
-               }
-               case SESS_COMMIT: {
-                  requiresResponse = true;
-                  session.commit();
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_ROLLBACK: {
-                  requiresResponse = true;
-                  session.rollback(((RollbackMessage) 
packet).isConsiderLastMessageAsDelivered());
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_XA_COMMIT: {
-                  requiresResponse = true;
-                  SessionXACommitMessage message = (SessionXACommitMessage) 
packet;
-                  session.xaCommit(message.getXid(), message.isOnePhase());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_END: {
-                  requiresResponse = true;
-                  SessionXAEndMessage message = (SessionXAEndMessage) packet;
-                  session.xaEnd(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_FORGET: {
-                  requiresResponse = true;
-                  SessionXAForgetMessage message = (SessionXAForgetMessage) 
packet;
-                  session.xaForget(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_JOIN: {
-                  requiresResponse = true;
-                  SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
-                  session.xaJoin(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_RESUME: {
-                  requiresResponse = true;
-                  SessionXAResumeMessage message = (SessionXAResumeMessage) 
packet;
-                  session.xaResume(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_ROLLBACK: {
-                  requiresResponse = true;
-                  SessionXARollbackMessage message = 
(SessionXARollbackMessage) packet;
-                  session.xaRollback(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_START: {
-                  requiresResponse = true;
-                  SessionXAStartMessage message = (SessionXAStartMessage) 
packet;
-                  session.xaStart(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_FAILED: {
-                  requiresResponse = true;
-                  SessionXAAfterFailedMessage message = 
(SessionXAAfterFailedMessage) packet;
-                  session.xaFailed(message.getXid());
-                  // no response on this case
-                  break;
-               }
-               case SESS_XA_SUSPEND: {
-                  requiresResponse = true;
-                  session.xaSuspend();
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_PREPARE: {
-                  requiresResponse = true;
-                  SessionXAPrepareMessage message = (SessionXAPrepareMessage) 
packet;
-                  session.xaPrepare(message.getXid());
-                  response = new SessionXAResponseMessage(false, 
XAResource.XA_OK, null);
-                  break;
-               }
-               case SESS_XA_INDOUBT_XIDS: {
-                  requiresResponse = true;
-                  List<Xid> xids = session.xaGetInDoubtXids();
-                  response = new SessionXAGetInDoubtXidsResponseMessage(xids);
-                  break;
-               }
-               case SESS_XA_GET_TIMEOUT: {
-                  requiresResponse = true;
-                  int timeout = session.xaGetTimeout();
-                  response = new SessionXAGetTimeoutResponseMessage(timeout);
-                  break;
-               }
-               case SESS_XA_SET_TIMEOUT: {
-                  requiresResponse = true;
-                  SessionXASetTimeoutMessage message = 
(SessionXASetTimeoutMessage) packet;
-                  session.xaSetTimeout(message.getTimeoutSeconds());
-                  response = new SessionXASetTimeoutResponseMessage(true);
-                  break;
-               }
-               case SESS_START: {
-                  session.start();
-                  break;
-               }
-               case SESS_STOP: {
-                  requiresResponse = true;
-                  session.stop();
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_CLOSE: {
-                  requiresResponse = true;
-                  session.close(false);
-                  // removeConnectionListeners();
-                  response = new NullResponseMessage();
-                  flush = true;
-                  closeChannel = true;
-                  break;
-               }
-               case SESS_INDIVIDUAL_ACKNOWLEDGE: {
-                  SessionIndividualAcknowledgeMessage message = 
(SessionIndividualAcknowledgeMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.individualAcknowledge(message.getConsumerID(), 
message.getMessageID());
-                  if (requiresResponse) {
+                  case SESS_CLOSE: {
+                     requiresResponse = true;
+                     session.close(false);
+                     // removeConnectionListeners();
                      response = new NullResponseMessage();
+                     flush = true;
+                     closeChannel = true;
+                     break;
                   }
-                  break;
-               }
-               case SESS_CONSUMER_CLOSE: {
-                  requiresResponse = true;
-                  SessionConsumerCloseMessage message = 
(SessionConsumerCloseMessage) packet;
-                  session.closeConsumer(message.getConsumerID());
-                  response = new NullResponseMessage();
-                  break;
-               }
-               case SESS_FLOWTOKEN: {
-                  SessionConsumerFlowCreditMessage message = 
(SessionConsumerFlowCreditMessage) packet;
-                  session.receiveConsumerCredits(message.getConsumerID(), 
message.getCredits());
-                  break;
-               }
-               case SESS_SEND: {
-                  SessionSendMessage message = (SessionSendMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.send((ServerMessage) message.getMessage(), direct);
-                  if (requiresResponse) {
+                  case SESS_INDIVIDUAL_ACKNOWLEDGE: {
+                     SessionIndividualAcknowledgeMessage message = 
(SessionIndividualAcknowledgeMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.individualAcknowledge(message.getConsumerID(), 
message.getMessageID());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_CONSUMER_CLOSE: {
+                     requiresResponse = true;
+                     SessionConsumerCloseMessage message = 
(SessionConsumerCloseMessage) packet;
+                     session.closeConsumer(message.getConsumerID());
                      response = new NullResponseMessage();
+                     break;
                   }
-                  break;
-               }
-               case SESS_SEND_LARGE: {
-                  SessionSendLargeMessage message = (SessionSendLargeMessage) 
packet;
-                  session.sendLarge(message.getLargeMessage());
-                  break;
-               }
-               case SESS_SEND_CONTINUATION: {
-                  SessionSendContinuationMessage message = 
(SessionSendContinuationMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.sendContinuations(message.getPacketSize(), 
message.getMessageBodySize(), message.getBody(), message.isContinues());
-                  if (requiresResponse) {
+                  case SESS_FLOWTOKEN: {
+                     SessionConsumerFlowCreditMessage message = 
(SessionConsumerFlowCreditMessage) packet;
+                     session.receiveConsumerCredits(message.getConsumerID(), 
message.getCredits());
+                     break;
+                  }
+                  case SESS_SEND: {
+                     SessionSendMessage message = (SessionSendMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.send((ServerMessage) message.getMessage(), 
direct);
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_SEND_LARGE: {
+                     SessionSendLargeMessage message = 
(SessionSendLargeMessage) packet;
+                     session.sendLarge(message.getLargeMessage());
+                     break;
+                  }
+                  case SESS_SEND_CONTINUATION: {
+                     SessionSendContinuationMessage message = 
(SessionSendContinuationMessage) packet;
+                     requiresResponse = message.isRequiresResponse();
+                     session.sendContinuations(message.getPacketSize(), 
message.getMessageBodySize(), message.getBody(), message.isContinues());
+                     if (requiresResponse) {
+                        response = new NullResponseMessage();
+                     }
+                     break;
+                  }
+                  case SESS_FORCE_CONSUMER_DELIVERY: {
+                     SessionForceConsumerDelivery message = 
(SessionForceConsumerDelivery) packet;
+                     session.forceConsumerDelivery(message.getConsumerID(), 
message.getSequence());
+                     break;
+                  }
+                  case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: {
+                     SessionRequestProducerCreditsMessage message = 
(SessionRequestProducerCreditsMessage) packet;
+                     session.requestProducerCredits(message.getAddress(), 
message.getCredits());
+                     break;
+                  }
+                  case PacketImpl.SESS_ADD_METADATA: {
                      response = new NullResponseMessage();
+                     SessionAddMetaDataMessage message = 
(SessionAddMetaDataMessage) packet;
+                     session.addMetaData(message.getKey(), message.getData());
+                     break;
+                  }
+                  case PacketImpl.SESS_ADD_METADATA2: {
+                     SessionAddMetaDataMessageV2 message = 
(SessionAddMetaDataMessageV2) packet;
+                     if (message.isRequiresConfirmations()) {
+                        response = new NullResponseMessage();
+                     }
+                     session.addMetaData(message.getKey(), message.getData());
+                     break;
+                  }
+                  case PacketImpl.SESS_UNIQUE_ADD_METADATA: {
+                     SessionUniqueAddMetaDataMessage message = 
(SessionUniqueAddMetaDataMessage) packet;
+                     if (session.addUniqueMetaData(message.getKey(), 
message.getData())) {
+                        response = new NullResponseMessage();
+                     } else {
+                        response = new 
ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(),
 message.getData()));
+                     }
+                     break;
                   }
-                  break;
-               }
-               case SESS_FORCE_CONSUMER_DELIVERY: {
-                  SessionForceConsumerDelivery message = 
(SessionForceConsumerDelivery) packet;
-                  session.forceConsumerDelivery(message.getConsumerID(), 
message.getSequence());
-                  break;
-               }
-               case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: {
-                  SessionRequestProducerCreditsMessage message = 
(SessionRequestProducerCreditsMessage) packet;
-                  session.requestProducerCredits(message.getAddress(), 
message.getCredits());
-                  break;
                }
-               case PacketImpl.SESS_ADD_METADATA: {
-                  response = new NullResponseMessage();
-                  SessionAddMetaDataMessage message = 
(SessionAddMetaDataMessage) packet;
-                  session.addMetaData(message.getKey(), message.getData());
-                  break;
+            } catch (ActiveMQIOErrorException e) {
+               getSession().markTXFailed(e);
+               if (requiresResponse) {
+                  logger.debug("Sending exception to client", e);
+                  response = new ActiveMQExceptionMessage(e);
+               } else {
+                  ActiveMQServerLogger.LOGGER.caughtException(e);
                }
-               case PacketImpl.SESS_ADD_METADATA2: {
-                  SessionAddMetaDataMessageV2 message = 
(SessionAddMetaDataMessageV2) packet;
-                  if (message.isRequiresConfirmations()) {
-                     response = new NullResponseMessage();
-                  }
-                  session.addMetaData(message.getKey(), message.getData());
-                  break;
+            } catch (ActiveMQXAException e) {
+               if (requiresResponse) {
+                  logger.debug("Sending exception to client", e);
+                  response = new SessionXAResponseMessage(true, e.errorCode, 
e.getMessage());
+               } else {
+                  ActiveMQServerLogger.LOGGER.caughtXaException(e);
                }
-               case PacketImpl.SESS_UNIQUE_ADD_METADATA: {
-                  SessionUniqueAddMetaDataMessage message = 
(SessionUniqueAddMetaDataMessage) packet;
-                  if (session.addUniqueMetaData(message.getKey(), 
message.getData())) {
-                     response = new NullResponseMessage();
+            } catch (ActiveMQException e) {
+               if (requiresResponse) {
+                  logger.debug("Sending exception to client", e);
+                  response = new ActiveMQExceptionMessage(e);
+               } else {
+                  if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
+                     logger.debug("Caught exception", e);
                   } else {
-                     response = new 
ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(),
 message.getData()));
+                     ActiveMQServerLogger.LOGGER.caughtException(e);
                   }
-                  break;
                }
-            }
-         } catch (ActiveMQIOErrorException e) {
-            getSession().markTXFailed(e);
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(e);
-            }
-         } catch (ActiveMQXAException e) {
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new SessionXAResponseMessage(true, e.errorCode, 
e.getMessage());
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtXaException(e);
-            }
-         } catch (ActiveMQException e) {
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
-                  logger.debug("Caught exception", e);
+            } catch (Throwable t) {
+               getSession().markTXFailed(t);
+               if (requiresResponse) {
+                  ActiveMQServerLogger.LOGGER.warn("Sending unexpected 
exception to the client", t);
+                  ActiveMQException activeMQInternalErrorException = new 
ActiveMQInternalErrorException();
+                  activeMQInternalErrorException.initCause(t);
+                  response = new 
ActiveMQExceptionMessage(activeMQInternalErrorException);
                } else {
-                  ActiveMQServerLogger.LOGGER.caughtException(e);
+                  ActiveMQServerLogger.LOGGER.caughtException(t);
                }
             }
-         } catch (Throwable t) {
-            getSession().markTXFailed(t);
-            if (requiresResponse) {
-               ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception 
to the client", t);
-               ActiveMQException activeMQInternalErrorException = new 
ActiveMQInternalErrorException();
-               activeMQInternalErrorException.initCause(t);
-               response = new 
ActiveMQExceptionMessage(activeMQInternalErrorException);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(t);
-            }
-         }
 
-         sendResponse(packet, response, flush, closeChannel);
+            sendResponse(packet, response, flush, closeChannel);
+         } finally {
+            storageManager.clearContext();
+         }
       } finally {
-         storageManager.clearContext();
+         inHandler.get().set(false);
       }
    }
 
@@ -583,6 +594,8 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
                                      final boolean flush,
                                      final boolean closeChannel) {
       if (confirmPacket != null) {
+         channel.confirm(confirmPacket);
+
          if (flush) {
             channel.flushConfirmations();
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index ad114e0..71e877f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -153,10 +153,15 @@ public class ActiveMQPacketHandler implements 
ChannelHandler {
 
          OperationContext sessionOperationContext = 
server.newOperationContext();
 
-         ServerSession session = server.createSession(request.getName(), 
activeMQPrincipal == null ? request.getUsername() : 
activeMQPrincipal.getUserName(), activeMQPrincipal == null ? 
request.getPassword() : activeMQPrincipal.getPassword(), 
request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), 
request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), 
request.getDefaultAddress(), new CoreSessionCallback(request.getName(), 
protocolManager, channel, connection), true, sessionOperationContext);
+         CoreSessionCallback sessionCallback = new 
CoreSessionCallback(request.getName(), protocolManager, channel, connection);
+
+         ServerSession session = server.createSession(request.getName(), 
activeMQPrincipal == null ? request.getUsername() : 
activeMQPrincipal.getUserName(), activeMQPrincipal == null ? 
request.getPassword() : activeMQPrincipal.getPassword(), 
request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), 
request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), 
request.getDefaultAddress(), sessionCallback, true, sessionOperationContext);
 
          ServerSessionPacketHandler handler = new 
ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), session, 
server.getStorageManager(), channel);
          channel.setHandler(handler);
+         sessionCallback.setSessionHandler(handler);
+
+         channel.setHandler(handler);
 
          // TODO - where is this removed?
          protocolManager.addSessionHandler(request.getName(), handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index e35771e..75a98b2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
+import 
org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
@@ -44,6 +45,8 @@ public final class CoreSessionCallback implements 
SessionCallback {
 
    private String name;
 
+   private ServerSessionPacketHandler handler;
+
    public CoreSessionCallback(String name,
                               ProtocolManager protocolManager,
                               Channel channel,
@@ -54,6 +57,21 @@ public final class CoreSessionCallback implements 
SessionCallback {
       this.connection = connection;
    }
 
+   public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler 
handler) {
+      this.handler = handler;
+      return this;
+   }
+
+   @Override
+   public void close(boolean failed) {
+      ServerSessionPacketHandler localHandler = handler;
+      if (localHandler != null) {
+         // We wait any pending tasks before we make this as closed
+         localHandler.flushExecutor();
+      }
+      this.handler = null;
+   }
+
    @Override
    public boolean isWritable(ReadyListener callback) {
       return connection.isWritable(callback);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 627b201..8b102bd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -335,6 +335,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    protected void doClose(final boolean failed) throws Exception {
+      callback.close(failed);
       synchronized (this) {
          this.setStarted(false);
          if (closed)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 891f1ad..bb18986 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -78,4 +78,8 @@ public interface SessionCallback {
     * Some protocols (Openwire) needs a special message with the browser is 
finished.
     */
    void browserFinished(ServerConsumer consumer);
-}
+
+   default void close(boolean failed) {
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 60187f0..797ac60 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -38,6 +38,13 @@ public class ActiveMQServerControlUsingCoreTest extends 
ActiveMQServerControlTes
       return names;
    }
 
+   // it doesn't make sense through the core
+   // the pool will be shutdown while a connection is being used
+   // makes no sense!
+   @Override
+   public void testForceFailover() throws Exception {
+   }
+
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------

Reply via email to