ARTEMIS-1333 Fix SendACK (fix copied manually from master.. not possible to cherry-pick)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e5ef406e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e5ef406e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e5ef406e Branch: refs/heads/1.x Commit: e5ef406e4e4fbb03b2ce1afdf19d7e9aff8367f4 Parents: 9724571 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Thu Aug 10 22:24:21 2017 -0400 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Aug 10 22:27:57 2017 -0400 ---------------------------------------------------------------------- .../core/ServerSessionPacketHandler.java | 621 ++++++++++--------- .../core/impl/ActiveMQPacketHandler.java | 7 +- .../protocol/core/impl/CoreSessionCallback.java | 18 + .../core/server/impl/ServerSessionImpl.java | 1 + .../spi/core/protocol/SessionCallback.java | 6 +- .../ActiveMQServerControlUsingCoreTest.java | 7 + 6 files changed, 354 insertions(+), 306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 5dbb4f1..c3fc01d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -20,6 +20,7 @@ import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -133,6 +134,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final boolean direct; + private static final ThreadLocal<AtomicBoolean> inHandler = ThreadLocal.withInitial(AtomicBoolean::new); + private final Executor callExecutor; public ServerSessionPacketHandler(final Executor callExecutor, @@ -170,18 +173,21 @@ public class ServerSessionPacketHandler implements ChannelHandler { public void connectionFailed(final ActiveMQException exception, boolean failedOver) { ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName()); + flushExecutor(); + try { session.close(true); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorClosingSession(e); } - flushExecutor(); ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName()); } - private void flushExecutor() { - OrderedExecutorFactory.flushExecutor(callExecutor); + public void flushExecutor() { + if (!inHandler.get().get()) { + OrderedExecutorFactory.flushExecutor(callExecutor); + } } public void close() { @@ -202,345 +208,350 @@ public class ServerSessionPacketHandler implements ChannelHandler { @Override public void handlePacket(final Packet packet) { - channel.confirm(packet); callExecutor.execute(() -> internalHandlePacket(packet)); } private void internalHandlePacket(final Packet packet) { - byte type = packet.getType(); - storageManager.setContext(session.getSessionContext()); + inHandler.get().set(true); + try { + byte type = packet.getType(); - Packet response = null; - boolean flush = false; - boolean closeChannel = false; - boolean requiresResponse = false; + storageManager.setContext(session.getSessionContext()); - if (logger.isTraceEnabled()) { - logger.trace("ServerSessionPacketHandler::handlePacket," + packet); - } + Packet response = null; + boolean flush = false; + boolean closeChannel = false; + boolean requiresResponse = false; + + if (logger.isTraceEnabled()) { + logger.trace("ServerSessionPacketHandler::handlePacket," + packet); + } - try { try { - switch (type) { - case SESS_CREATECONSUMER: { - SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet; - requiresResponse = request.isRequiresResponse(); - session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly()); - if (requiresResponse) { - // We send back queue information on the queue as a response- this allows the queue to - // be automatically recreated on failover - QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName()); + try { + switch (type) { + case SESS_CREATECONSUMER: { + SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet; + requiresResponse = request.isRequiresResponse(); + session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly()); + if (requiresResponse) { + // We send back queue information on the queue as a response- this allows the queue to + // be automatically recreated on failover + QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName()); + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { + response = new SessionQueueQueryResponseMessage_V2(queueQueryResult); + } else { + response = new SessionQueueQueryResponseMessage(queueQueryResult); + } + } + + break; + } + case CREATE_QUEUE: { + CreateQueueMessage request = (CreateQueueMessage) packet; + requiresResponse = request.isRequiresResponse(); + session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case CREATE_SHARED_QUEUE: { + CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet; + requiresResponse = request.isRequiresResponse(); + session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case DELETE_QUEUE: { + requiresResponse = true; + SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet; + session.deleteQueue(request.getQueueName()); + response = new NullResponseMessage(); + break; + } + case SESS_QUEUEQUERY: { + requiresResponse = true; + SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; + QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { - response = new SessionQueueQueryResponseMessage_V2(queueQueryResult); + response = new SessionQueueQueryResponseMessage_V2(result); } else { - response = new SessionQueueQueryResponseMessage(queueQueryResult); + response = new SessionQueueQueryResponseMessage(result); } + break; } - - break; - } - case CREATE_QUEUE: { - CreateQueueMessage request = (CreateQueueMessage) packet; - requiresResponse = request.isRequiresResponse(); - session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable()); - if (requiresResponse) { + case SESS_BINDINGQUERY: { + requiresResponse = true; + SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; + BindingQueryResult result = session.executeBindingQuery(request.getAddress()); + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { + response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics()); + } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) { + response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues()); + } else { + response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames()); + } + break; + } + case SESS_ACKNOWLEDGE: { + SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet; + requiresResponse = message.isRequiresResponse(); + session.acknowledge(message.getConsumerID(), message.getMessageID()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case SESS_EXPIRED: { + SessionExpireMessage message = (SessionExpireMessage) packet; + session.expire(message.getConsumerID(), message.getMessageID()); + break; + } + case SESS_COMMIT: { + requiresResponse = true; + session.commit(); response = new NullResponseMessage(); + break; } - break; - } - case CREATE_SHARED_QUEUE: { - CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet; - requiresResponse = request.isRequiresResponse(); - session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); - if (requiresResponse) { + case SESS_ROLLBACK: { + requiresResponse = true; + session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered()); response = new NullResponseMessage(); + break; } - break; - } - case DELETE_QUEUE: { - requiresResponse = true; - SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet; - session.deleteQueue(request.getQueueName()); - response = new NullResponseMessage(); - break; - } - case SESS_QUEUEQUERY: { - requiresResponse = true; - SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; - QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); - if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { - response = new SessionQueueQueryResponseMessage_V2(result); - } else { - response = new SessionQueueQueryResponseMessage(result); + case SESS_XA_COMMIT: { + requiresResponse = true; + SessionXACommitMessage message = (SessionXACommitMessage) packet; + session.xaCommit(message.getXid(), message.isOnePhase()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; } - break; - } - case SESS_BINDINGQUERY: { - requiresResponse = true; - SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; - BindingQueryResult result = session.executeBindingQuery(request.getAddress()); - if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { - response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics()); - } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) { - response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues()); - } else { - response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames()); + case SESS_XA_END: { + requiresResponse = true; + SessionXAEndMessage message = (SessionXAEndMessage) packet; + session.xaEnd(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; } - break; - } - case SESS_ACKNOWLEDGE: { - SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet; - requiresResponse = message.isRequiresResponse(); - session.acknowledge(message.getConsumerID(), message.getMessageID()); - if (requiresResponse) { + case SESS_XA_FORGET: { + requiresResponse = true; + SessionXAForgetMessage message = (SessionXAForgetMessage) packet; + session.xaForget(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_JOIN: { + requiresResponse = true; + SessionXAJoinMessage message = (SessionXAJoinMessage) packet; + session.xaJoin(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_RESUME: { + requiresResponse = true; + SessionXAResumeMessage message = (SessionXAResumeMessage) packet; + session.xaResume(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_ROLLBACK: { + requiresResponse = true; + SessionXARollbackMessage message = (SessionXARollbackMessage) packet; + session.xaRollback(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_START: { + requiresResponse = true; + SessionXAStartMessage message = (SessionXAStartMessage) packet; + session.xaStart(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_FAILED: { + requiresResponse = true; + SessionXAAfterFailedMessage message = (SessionXAAfterFailedMessage) packet; + session.xaFailed(message.getXid()); + // no response on this case + break; + } + case SESS_XA_SUSPEND: { + requiresResponse = true; + session.xaSuspend(); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_PREPARE: { + requiresResponse = true; + SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet; + session.xaPrepare(message.getXid()); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + break; + } + case SESS_XA_INDOUBT_XIDS: { + requiresResponse = true; + List<Xid> xids = session.xaGetInDoubtXids(); + response = new SessionXAGetInDoubtXidsResponseMessage(xids); + break; + } + case SESS_XA_GET_TIMEOUT: { + requiresResponse = true; + int timeout = session.xaGetTimeout(); + response = new SessionXAGetTimeoutResponseMessage(timeout); + break; + } + case SESS_XA_SET_TIMEOUT: { + requiresResponse = true; + SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet; + session.xaSetTimeout(message.getTimeoutSeconds()); + response = new SessionXASetTimeoutResponseMessage(true); + break; + } + case SESS_START: { + session.start(); + break; + } + case SESS_STOP: { + requiresResponse = true; + session.stop(); response = new NullResponseMessage(); + break; } - break; - } - case SESS_EXPIRED: { - SessionExpireMessage message = (SessionExpireMessage) packet; - session.expire(message.getConsumerID(), message.getMessageID()); - break; - } - case SESS_COMMIT: { - requiresResponse = true; - session.commit(); - response = new NullResponseMessage(); - break; - } - case SESS_ROLLBACK: { - requiresResponse = true; - session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered()); - response = new NullResponseMessage(); - break; - } - case SESS_XA_COMMIT: { - requiresResponse = true; - SessionXACommitMessage message = (SessionXACommitMessage) packet; - session.xaCommit(message.getXid(), message.isOnePhase()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_END: { - requiresResponse = true; - SessionXAEndMessage message = (SessionXAEndMessage) packet; - session.xaEnd(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_FORGET: { - requiresResponse = true; - SessionXAForgetMessage message = (SessionXAForgetMessage) packet; - session.xaForget(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_JOIN: { - requiresResponse = true; - SessionXAJoinMessage message = (SessionXAJoinMessage) packet; - session.xaJoin(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_RESUME: { - requiresResponse = true; - SessionXAResumeMessage message = (SessionXAResumeMessage) packet; - session.xaResume(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_ROLLBACK: { - requiresResponse = true; - SessionXARollbackMessage message = (SessionXARollbackMessage) packet; - session.xaRollback(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_START: { - requiresResponse = true; - SessionXAStartMessage message = (SessionXAStartMessage) packet; - session.xaStart(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_FAILED: { - requiresResponse = true; - SessionXAAfterFailedMessage message = (SessionXAAfterFailedMessage) packet; - session.xaFailed(message.getXid()); - // no response on this case - break; - } - case SESS_XA_SUSPEND: { - requiresResponse = true; - session.xaSuspend(); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_PREPARE: { - requiresResponse = true; - SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet; - session.xaPrepare(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - break; - } - case SESS_XA_INDOUBT_XIDS: { - requiresResponse = true; - List<Xid> xids = session.xaGetInDoubtXids(); - response = new SessionXAGetInDoubtXidsResponseMessage(xids); - break; - } - case SESS_XA_GET_TIMEOUT: { - requiresResponse = true; - int timeout = session.xaGetTimeout(); - response = new SessionXAGetTimeoutResponseMessage(timeout); - break; - } - case SESS_XA_SET_TIMEOUT: { - requiresResponse = true; - SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet; - session.xaSetTimeout(message.getTimeoutSeconds()); - response = new SessionXASetTimeoutResponseMessage(true); - break; - } - case SESS_START: { - session.start(); - break; - } - case SESS_STOP: { - requiresResponse = true; - session.stop(); - response = new NullResponseMessage(); - break; - } - case SESS_CLOSE: { - requiresResponse = true; - session.close(false); - // removeConnectionListeners(); - response = new NullResponseMessage(); - flush = true; - closeChannel = true; - break; - } - case SESS_INDIVIDUAL_ACKNOWLEDGE: { - SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage) packet; - requiresResponse = message.isRequiresResponse(); - session.individualAcknowledge(message.getConsumerID(), message.getMessageID()); - if (requiresResponse) { + case SESS_CLOSE: { + requiresResponse = true; + session.close(false); + // removeConnectionListeners(); response = new NullResponseMessage(); + flush = true; + closeChannel = true; + break; } - break; - } - case SESS_CONSUMER_CLOSE: { - requiresResponse = true; - SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet; - session.closeConsumer(message.getConsumerID()); - response = new NullResponseMessage(); - break; - } - case SESS_FLOWTOKEN: { - SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; - session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); - break; - } - case SESS_SEND: { - SessionSendMessage message = (SessionSendMessage) packet; - requiresResponse = message.isRequiresResponse(); - session.send((ServerMessage) message.getMessage(), direct); - if (requiresResponse) { + case SESS_INDIVIDUAL_ACKNOWLEDGE: { + SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage) packet; + requiresResponse = message.isRequiresResponse(); + session.individualAcknowledge(message.getConsumerID(), message.getMessageID()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case SESS_CONSUMER_CLOSE: { + requiresResponse = true; + SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet; + session.closeConsumer(message.getConsumerID()); response = new NullResponseMessage(); + break; } - break; - } - case SESS_SEND_LARGE: { - SessionSendLargeMessage message = (SessionSendLargeMessage) packet; - session.sendLarge(message.getLargeMessage()); - break; - } - case SESS_SEND_CONTINUATION: { - SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet; - requiresResponse = message.isRequiresResponse(); - session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); - if (requiresResponse) { + case SESS_FLOWTOKEN: { + SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; + session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); + break; + } + case SESS_SEND: { + SessionSendMessage message = (SessionSendMessage) packet; + requiresResponse = message.isRequiresResponse(); + session.send((ServerMessage) message.getMessage(), direct); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case SESS_SEND_LARGE: { + SessionSendLargeMessage message = (SessionSendLargeMessage) packet; + session.sendLarge(message.getLargeMessage()); + break; + } + case SESS_SEND_CONTINUATION: { + SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet; + requiresResponse = message.isRequiresResponse(); + session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); + if (requiresResponse) { + response = new NullResponseMessage(); + } + break; + } + case SESS_FORCE_CONSUMER_DELIVERY: { + SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet; + session.forceConsumerDelivery(message.getConsumerID(), message.getSequence()); + break; + } + case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: { + SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet; + session.requestProducerCredits(message.getAddress(), message.getCredits()); + break; + } + case PacketImpl.SESS_ADD_METADATA: { response = new NullResponseMessage(); + SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet; + session.addMetaData(message.getKey(), message.getData()); + break; + } + case PacketImpl.SESS_ADD_METADATA2: { + SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet; + if (message.isRequiresConfirmations()) { + response = new NullResponseMessage(); + } + session.addMetaData(message.getKey(), message.getData()); + break; + } + case PacketImpl.SESS_UNIQUE_ADD_METADATA: { + SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet; + if (session.addUniqueMetaData(message.getKey(), message.getData())) { + response = new NullResponseMessage(); + } else { + response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData())); + } + break; } - break; - } - case SESS_FORCE_CONSUMER_DELIVERY: { - SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet; - session.forceConsumerDelivery(message.getConsumerID(), message.getSequence()); - break; - } - case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: { - SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet; - session.requestProducerCredits(message.getAddress(), message.getCredits()); - break; } - case PacketImpl.SESS_ADD_METADATA: { - response = new NullResponseMessage(); - SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet; - session.addMetaData(message.getKey(), message.getData()); - break; + } catch (ActiveMQIOErrorException e) { + getSession().markTXFailed(e); + if (requiresResponse) { + logger.debug("Sending exception to client", e); + response = new ActiveMQExceptionMessage(e); + } else { + ActiveMQServerLogger.LOGGER.caughtException(e); } - case PacketImpl.SESS_ADD_METADATA2: { - SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet; - if (message.isRequiresConfirmations()) { - response = new NullResponseMessage(); - } - session.addMetaData(message.getKey(), message.getData()); - break; + } catch (ActiveMQXAException e) { + if (requiresResponse) { + logger.debug("Sending exception to client", e); + response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); + } else { + ActiveMQServerLogger.LOGGER.caughtXaException(e); } - case PacketImpl.SESS_UNIQUE_ADD_METADATA: { - SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet; - if (session.addUniqueMetaData(message.getKey(), message.getData())) { - response = new NullResponseMessage(); + } catch (ActiveMQException e) { + if (requiresResponse) { + logger.debug("Sending exception to client", e); + response = new ActiveMQExceptionMessage(e); + } else { + if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) { + logger.debug("Caught exception", e); } else { - response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData())); + ActiveMQServerLogger.LOGGER.caughtException(e); } - break; } - } - } catch (ActiveMQIOErrorException e) { - getSession().markTXFailed(e); - if (requiresResponse) { - logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); - } else { - ActiveMQServerLogger.LOGGER.caughtException(e); - } - } catch (ActiveMQXAException e) { - if (requiresResponse) { - logger.debug("Sending exception to client", e); - response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); - } else { - ActiveMQServerLogger.LOGGER.caughtXaException(e); - } - } catch (ActiveMQException e) { - if (requiresResponse) { - logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); - } else { - if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) { - logger.debug("Caught exception", e); + } catch (Throwable t) { + getSession().markTXFailed(t); + if (requiresResponse) { + ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t); + ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); + activeMQInternalErrorException.initCause(t); + response = new ActiveMQExceptionMessage(activeMQInternalErrorException); } else { - ActiveMQServerLogger.LOGGER.caughtException(e); + ActiveMQServerLogger.LOGGER.caughtException(t); } } - } catch (Throwable t) { - getSession().markTXFailed(t); - if (requiresResponse) { - ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t); - ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); - activeMQInternalErrorException.initCause(t); - response = new ActiveMQExceptionMessage(activeMQInternalErrorException); - } else { - ActiveMQServerLogger.LOGGER.caughtException(t); - } - } - sendResponse(packet, response, flush, closeChannel); + sendResponse(packet, response, flush, closeChannel); + } finally { + storageManager.clearContext(); + } } finally { - storageManager.clearContext(); + inHandler.get().set(false); } } @@ -583,6 +594,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { final boolean flush, final boolean closeChannel) { if (confirmPacket != null) { + channel.confirm(confirmPacket); + if (flush) { channel.flushConfirmations(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index ad114e0..71e877f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -153,10 +153,15 @@ public class ActiveMQPacketHandler implements ChannelHandler { OperationContext sessionOperationContext = server.newOperationContext(); - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext); + CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection); + + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), session, server.getStorageManager(), channel); channel.setHandler(handler); + sessionCallback.setSessionHandler(handler); + + channel.setHandler(handler); // TODO - where is this removed? protocolManager.addSessionHandler(request.getName(), handler); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index e35771e..75a98b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; @@ -44,6 +45,8 @@ public final class CoreSessionCallback implements SessionCallback { private String name; + private ServerSessionPacketHandler handler; + public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, @@ -54,6 +57,21 @@ public final class CoreSessionCallback implements SessionCallback { this.connection = connection; } + public CoreSessionCallback setSessionHandler(ServerSessionPacketHandler handler) { + this.handler = handler; + return this; + } + + @Override + public void close(boolean failed) { + ServerSessionPacketHandler localHandler = handler; + if (localHandler != null) { + // We wait any pending tasks before we make this as closed + localHandler.flushExecutor(); + } + this.handler = null; + } + @Override public boolean isWritable(ReadyListener callback) { return connection.isWritable(callback); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 627b201..8b102bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -335,6 +335,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } protected void doClose(final boolean failed) throws Exception { + callback.close(failed); synchronized (this) { this.setStarted(false); if (closed) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 891f1ad..bb18986 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -78,4 +78,8 @@ public interface SessionCallback { * Some protocols (Openwire) needs a special message with the browser is finished. */ void browserFinished(ServerConsumer consumer); -} + + default void close(boolean failed) { + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5ef406e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 60187f0..797ac60 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -38,6 +38,13 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return names; } + // it doesn't make sense through the core + // the pool will be shutdown while a connection is being used + // makes no sense! + @Override + public void testForceFailover() throws Exception { + } + // Constructors -------------------------------------------------- // Public --------------------------------------------------------